diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 24fb984..da96ee1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -9,8 +9,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) Serial << '\'' << endl; } -MqttBroker::MqttBroker(uint16_t port) : server(port) +MqttBroker::MqttBroker(uint16_t port) { + server = new AsyncServer(port); + server->onClient(onClient, this); } MqttBroker::~MqttBroker() @@ -19,14 +21,16 @@ MqttBroker::~MqttBroker() { delete clients[0]; } - server.close(); + delete server; } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) - : parent(parent) +MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) + : parent(parent), client(new_client) { - client = new WiFiClient(new_client); + client->onData(onData, this); + // client->onConnect() TODO + // client->onDisconnect() TODO alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -70,8 +74,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) debug("cnx: closing"); close(); if (client) delete client; - client = new WiFiClient; + client = new AsyncClient; debug("Trying to connect to " << broker.c_str() << ':' << port); + // TODO This may return immediately !!! + // TODO so I have to add onConnect and move this code to onConnect + // TODO also, as this is async now, I must take care of + // TODO the broker that may disconnect and delete the client immediately if (client->connect(broker.c_str(), port)) { debug("cnx: connecting"); @@ -126,10 +134,16 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } +void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +{ + MqttBroker* broker = static_cast(broker_ptr); + + broker->addClient(new MqttClient(broker, client)); + debug("New client #" << broker->clients->size()); +} + void MqttBroker::loop() { - WiFiClient client = server.available(); - if (broker) { // TODO should monitor broker's activity. @@ -137,11 +151,6 @@ void MqttBroker::loop() broker->loop(); } - if (client) - { - addClient(new MqttClient(this, client)); - debug("New client (" << clients.size() << ')'); - } // for(auto it=clients.begin(); it!=clients.end(); it++) // use index because size can change during the loop @@ -168,6 +177,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) { return broker->subscribe(topic, qos); } + return MqttNowhereToSend; } MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const @@ -179,7 +189,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons for(auto client: clients) { i++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif @@ -200,7 +210,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons { doit = true; } -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << ", doit=" << doit << ' '; #endif @@ -250,22 +260,28 @@ void MqttClient::loop() { debug("pingreq"); uint16_t pingreq = MqttMessage::Type::PingReq; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); clientAlive(0); // TODO when many MqttClient passes through a local browser // there is no need to send one PingReq per instance. } } +} - while(client && client->available()>0) +void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) +{ + char* char_ptr = static_cast(data); + MqttClient* client=static_cast(client_ptr); + while(len>0) { - message.incoming(client->read()); - if (message.type()) + client->message.incoming(*char_ptr++); + if (client->message.type()) { - processMessage(&message); - message.reset(); + client->processMessage(&client->message); + client->message.reset(); } + len--; } } @@ -341,7 +357,7 @@ long MqttClient::counter=0; void MqttClient::processMessage(const MqttMessage* mesg) { counter++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; @@ -438,7 +454,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T if (client) { uint16_t pingreq = MqttMessage::Type::PingResp; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); bclose = false; } else @@ -470,7 +486,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T subscriptions.erase(it); } payload += len; - uint8_t qos = *payload++; + /* uint8_t qos =*/ *payload++; debug(" qos=" << qos); } debug("end loop"); @@ -488,7 +504,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T Topic published(payload, len); payload += len; // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len - // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; if (qos) payload+=2; // ignore packet identifier if any len=mesg->end()-payload; // TODO reset DUP diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 7a0d550..ee9692e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,16 +1,15 @@ #pragma once #include +#include #include #include #include #include "StringIndexer.h" #include -#if 0 +#ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } - #define TINY_MQTT_DEBUG 1 #else - #define TINY_MQTT_DEBUG 0 #define debug(what) {} #endif @@ -190,11 +189,12 @@ class MqttClient static long counter; private: + static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, WiFiClient& client); + MqttClient(MqttBroker* parent, AsyncClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -212,7 +212,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - WiFiClient* client=nullptr; // connection to mqtt client or to remote broker + AsyncClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -231,11 +231,9 @@ class MqttBroker MqttBroker(uint16_t port); ~MqttBroker(); - void begin() { server.begin(); } + void begin() { server->begin(); } void loop(); - uint16_t port() const { return server.port(); } - void connect(const std::string& host, uint16_t port=1883); bool connected() const { return state == Connected; } @@ -254,6 +252,7 @@ class MqttBroker private: friend class MqttClient; + static void onClient(void*, AsyncClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -271,7 +270,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - WiFiServer server; + AsyncServer* server; const char* auth_user = "guest"; const char* auth_password = "guest";