diff --git a/src/StringIndexer.h b/src/StringIndexer.h index 5b6ebfe..020dd3b 100644 --- a/src/StringIndexer.h +++ b/src/StringIndexer.h @@ -9,121 +9,121 @@ */ class StringIndexer { - class StringCounter - { - std::string str; - uint8_t used=0; - friend class StringIndexer; + class StringCounter + { + std::string str; + uint8_t used=0; + friend class StringIndexer; - #if EPOXY_DUINO - public: - // Workaround to avoid coredump in Indexer::release - // when destroying a Topic after the deletion of - // StringIndexer::strings map (which can occurs only with AUnit, - // never in the ESP itself, because ESP never ends) - // (I hate static vars) - ~StringCounter() { used=255; } - #endif - }; - public: - using index_t=uint8_t; + #if EPOXY_DUINO + public: + // Workaround to avoid coredump in Indexer::release + // when destroying a Topic after the deletion of + // StringIndexer::strings map (which can occurs only with AUnit, + // never in the ESP itself, because ESP never ends) + // (I hate static vars) + ~StringCounter() { used=255; } + #endif + }; + public: + using index_t=uint8_t; - static index_t strToIndex(const char* str, uint8_t len) - { - for(auto it=strings.begin(); it!=strings.end(); it++) - { - if (strncmp(it->second.str.c_str(), str, len)==0) - { - it->second.used++; - return it->first; - } - } - for(index_t index=1; index; index++) - { - if (strings.find(index)==strings.end()) - { - strings[index].str = std::string(str, len); - strings[index].used++; - // Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl; - return index; - } - } - return 0; // TODO out of indexes - } + static index_t strToIndex(const char* str, uint8_t len) + { + for(auto it=strings.begin(); it!=strings.end(); it++) + { + if (strncmp(it->second.str.c_str(), str, len)==0) + { + it->second.used++; + return it->first; + } + } + for(index_t index=1; index; index++) + { + if (strings.find(index)==strings.end()) + { + strings[index].str = std::string(str, len); + strings[index].used++; + // Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl; + return index; + } + } + return 0; // TODO out of indexes + } - static const std::string& str(const index_t& index) - { - static std::string dummy; - const auto& it=strings.find(index); - if (it == strings.end()) return dummy; - return it->second.str; - } + static const std::string& str(const index_t& index) + { + static std::string dummy; + const auto& it=strings.find(index); + if (it == strings.end()) return dummy; + return it->second.str; + } - static void use(const index_t& index) - { - auto it=strings.find(index); - if (it != strings.end()) it->second.used++; - } + static void use(const index_t& index) + { + auto it=strings.find(index); + if (it != strings.end()) it->second.used++; + } - static void release(const index_t& index) - { - auto it=strings.find(index); - if (it != strings.end()) - { - it->second.used--; - if (it->second.used == 0) - { - strings.erase(it); - // Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl; - } - } - } + static void release(const index_t& index) + { + auto it=strings.find(index); + if (it != strings.end()) + { + it->second.used--; + if (it->second.used == 0) + { + strings.erase(it); + // Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl; + } + } + } - static uint16_t count() { return strings.size(); } + static uint16_t count() { return strings.size(); } - private: - static std::map strings; + private: + static std::map strings; }; class IndexedString { - public: - IndexedString(const IndexedString& source) - { - StringIndexer::use(source.index); - index = source.index; - } + public: + IndexedString(const IndexedString& source) + { + StringIndexer::use(source.index); + index = source.index; + } - IndexedString(const char* str, uint8_t len) - { - index=StringIndexer::strToIndex(str, len); - } + IndexedString(const char* str, uint8_t len) + { + index=StringIndexer::strToIndex(str, len); + } - IndexedString(const std::string& str) : IndexedString(str.c_str(), str.length()) {}; + IndexedString(const std::string& str) : IndexedString(str.c_str(), str.length()) {}; - ~IndexedString() { StringIndexer::release(index); } + ~IndexedString() { StringIndexer::release(index); } - IndexedString& operator=(const IndexedString& source) - { - StringIndexer::use(source.index); - index = source.index; - return *this; - } + IndexedString& operator=(const IndexedString& source) + { + StringIndexer::use(source.index); + index = source.index; + return *this; + } - friend bool operator<(const IndexedString& i1, const IndexedString& i2) - { - return i1.index < i2.index; - } + friend bool operator<(const IndexedString& i1, const IndexedString& i2) + { + return i1.index < i2.index; + } - friend bool operator==(const IndexedString& i1, const IndexedString& i2) - { - return i1.index == i2.index; - } + friend bool operator==(const IndexedString& i1, const IndexedString& i2) + { + return i1.index == i2.index; + } - const std::string& str() const { return StringIndexer::str(index); } + const std::string& str() const { return StringIndexer::str(index); } - const StringIndexer::index_t& getIndex() const { return index; } + const StringIndexer::index_t& getIndex() const { return index; } - private: - StringIndexer::index_t index; + private: + StringIndexer::index_t index; }; diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 264ba05..cc1a488 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -7,141 +7,141 @@ MqttBroker::MqttBroker(uint16_t port) { - server = new TcpServer(port); + server = new TcpServer(port); #ifdef TCP_ASYNC - server->onClient(onClient, this); + server->onClient(onClient, this); #endif } MqttBroker::~MqttBroker() { - while(clients.size()) - { - delete clients[0]; - } - delete server; + while(clients.size()) + { + delete clients[0]; + } + delete server; } // private constructor used by broker only MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client) - : parent(parent) + : parent(parent) { #ifdef TCP_ASYNC - client = new_client; - client->onData(onData, this); - // client->onConnect() TODO - // client->onDisconnect() TODO + client = new_client; + client->onData(onData, this); + // client->onConnect() TODO + // client->onDisconnect() TODO #else - client = new WiFiClient(*new_client); + client = new WiFiClient(*new_client); #endif #ifdef EPOXY_DUINO - alive = millis()+500000; + alive = millis()+500000; #else - alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg + alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg #endif } MqttClient::MqttClient(MqttBroker* parent, const std::string& id) - : parent(parent), clientId(id) + : parent(parent), clientId(id) { - client = nullptr; + client = nullptr; - if (parent) parent->addClient(this); + if (parent) parent->addClient(this); } MqttClient::~MqttClient() { - close(); - delete client; + close(); + delete client; } void MqttClient::close(bool bSendDisconnect) { - debug("close " << id().c_str()); - mqtt_connected = false; - if (client) // connected to a remote broker - { - if (bSendDisconnect and client->connected()) - { - message.create(MqttMessage::Type::Disconnect); - message.sendTo(this); - } - client->stop(); - } + debug("close " << id().c_str()); + mqtt_connected = false; + if (client) // connected to a remote broker + { + if (bSendDisconnect and client->connected()) + { + message.create(MqttMessage::Type::Disconnect); + message.sendTo(this); + } + client->stop(); + } - if (parent) - { - parent->removeClient(this); - parent = nullptr; - } + if (parent) + { + parent->removeClient(this); + parent = nullptr; + } } void MqttClient::connect(MqttBroker* parentBroker) { - close(); - parent = parentBroker; + close(); + parent = parentBroker; } void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) { - debug("MqttClient::connect"); - keep_alive = ka; - close(); - if (client) delete client; - client = new TcpClient; + debug("MqttClient::connect"); + keep_alive = ka; + close(); + if (client) delete client; + client = new TcpClient; - debug("Trying to connect to " << broker.c_str() << ':' << port); + debug("Trying to connect to " << broker.c_str() << ':' << port); #ifdef TCP_ASYNC - client->onData(onData, this); - client->onConnect(onConnect, this); - client->connect(broker.c_str(), port); + client->onData(onData, this); + client->onConnect(onConnect, this); + client->connect(broker.c_str(), port); #else - if (client->connect(broker.c_str(), port)) - { - onConnect(this, client); - } + if (client->connect(broker.c_str(), port)) + { + onConnect(this, client); + } #endif } void MqttBroker::addClient(MqttClient* client) { - clients.push_back(client); + clients.push_back(client); } void MqttBroker::connect(const std::string& host, uint16_t port) { - if (broker == nullptr) broker = new MqttClient; - broker->connect(host, port); - broker->parent = this; // Because connect removed the link + if (broker == nullptr) broker = new MqttClient; + broker->connect(host, port); + broker->parent = this; // Because connect removed the link } void MqttBroker::removeClient(MqttClient* remove) { for(auto it=clients.begin(); it!=clients.end(); it++) - { - auto client=*it; - if (client==remove) - { - // TODO if this broker is connected to an external broker - // we have to unsubscribe remove's topics. - // (but doing this, check that other clients are not subscribed...) - // Unless -> we could receive useless messages - // -> we are using (memory) one IndexedString plus its string for nothing. - debug("Remove " << clients.size()); - clients.erase(it); - debug("Client removed " << clients.size()); - return; - } - } - debug("Error cannot remove client"); // TODO should not occur + { + auto client=*it; + if (client==remove) + { + // TODO if this broker is connected to an external broker + // we have to unsubscribe remove's topics. + // (but doing this, check that other clients are not subscribed...) + // Unless -> we could receive useless messages + // -> we are using (memory) one IndexedString plus its string for nothing. + debug("Remove " << clients.size()); + clients.erase(it); + debug("Client removed " << clients.size()); + return; + } + } + debug("Error cannot remove client"); // TODO should not occur } void MqttBroker::onClient(void* broker_ptr, TcpClient* client) { - MqttBroker* broker = static_cast(broker_ptr); + MqttBroker* broker = static_cast(broker_ptr); - broker->addClient(new MqttClient(broker, client)); - debug("New client"); + broker->addClient(new MqttClient(broker, client)); + debug("New client"); } void MqttBroker::loop() @@ -150,253 +150,253 @@ void MqttBroker::loop() WiFiClient client = server->available(); if (client) - { - onClient(this, &client); - } + { + onClient(this, &client); + } #endif - if (broker) - { - // TODO should monitor broker's activity. - // 1 When broker disconnect and reconnect we have to re-subscribe - broker->loop(); - } + if (broker) + { + // TODO should monitor broker's activity. + // 1 When broker disconnect and reconnect we have to re-subscribe + broker->loop(); + } // for(auto it=clients.begin(); it!=clients.end(); it++) - // use index because size can change during the loop - for(size_t i=0; iconnected()) { - client->loop(); - } - else - { - debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent); - // Note: deleting a client not added by the broker itself will probably crash later. - delete client; - break; + client->loop(); } - } + else + { + debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent); + // Note: deleting a client not added by the broker itself will probably crash later. + delete client; + break; + } + } } MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) { - if (broker && broker->connected()) - { - return broker->subscribe(topic, qos); - } - return MqttNowhereToSend; + if (broker && broker->connected()) + { + return broker->subscribe(topic, qos); + } + return MqttNowhereToSend; } MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const { - MqttError retval = MqttOk; + MqttError retval = MqttOk; - debug("publish "); - int i=0; - for(auto client: clients) - { - i++; + debug("publish "); + int i=0; + for(auto client: clients) + { + i++; #ifdef TINY_MQTT_DEBUG - Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << - " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; + Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << + " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif - bool doit = false; - if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker) - { - // ext_broker -> clients or clients -> ext_broker - if (source == broker) // external broker -> internal clients - doit = true; - else // external clients -> this broker - { - // As this broker is connected to another broker, simply forward the msg - MqttError ret = broker->publishIfSubscribed(topic, msg); - if (ret != MqttOk) retval = ret; - } - } - else // Disconnected - { - doit = true; - } + bool doit = false; + if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker) + { + // ext_broker -> clients or clients -> ext_broker + if (source == broker) // external broker -> internal clients + doit = true; + else // external clients -> this broker + { + // As this broker is connected to another broker, simply forward the msg + MqttError ret = broker->publishIfSubscribed(topic, msg); + if (ret != MqttOk) retval = ret; + } + } + else // Disconnected + { + doit = true; + } #ifdef TINY_MQTT_DEBUG - Serial << ", doit=" << doit << ' '; + Serial << ", doit=" << doit << ' '; #endif - if (doit) retval = client->publishIfSubscribed(topic, msg); - debug(""); - } - return retval; + if (doit) retval = client->publishIfSubscribed(topic, msg); + debug(""); + } + return retval; } bool MqttBroker::compareString( - const char* good, - const char* str, - uint8_t len) const + const char* good, + const char* str, + uint8_t len) const { - while(len-- and *good++==*str++); + while(len-- and *good++==*str++); - return *good==0; + return *good==0; } void MqttMessage::getString(const char* &buff, uint16_t& len) { - len = (buff[0]<<8)|(buff[1]); - buff+=2; + len = (buff[0]<<8)|(buff[1]); + buff+=2; } void MqttClient::clientAlive(uint32_t more_seconds) { - if (keep_alive) - { + if (keep_alive) + { #ifdef EPOXY_DUINO - alive=millis()+500000; + alive=millis()+500000; #else - alive=millis()+1000*(keep_alive+more_seconds); + alive=millis()+1000*(keep_alive+more_seconds); #endif - } - else - alive=0; + } + else + alive=0; } void MqttClient::loop() { - if (alive && (millis() > alive)) - { - if (parent) - { - debug("timeout client"); - close(); - debug("closed"); - } - else if (client && client->connected()) - { - debug("pingreq"); - uint16_t pingreq = MqttMessage::Type::PingReq; - client->write((const char*)(&pingreq), 2); - clientAlive(0); + if (alive && (millis() > alive)) + { + if (parent) + { + debug("timeout client"); + close(); + debug("closed"); + } + else if (client && client->connected()) + { + debug("pingreq"); + uint16_t pingreq = MqttMessage::Type::PingReq; + client->write((const char*)(&pingreq), 2); + clientAlive(0); - // TODO when many MqttClient passes through a local broker - // there is no need to send one PingReq per instance. - } - } + // TODO when many MqttClient passes through a local broker + // there is no need to send one PingReq per instance. + } + } #ifndef TCP_ASYNC - while(client && client->available()>0) - { - message.incoming(client->read()); - if (message.type()) - { - processMessage(&message); - message.reset(); - } - } + while(client && client->available()>0) + { + message.incoming(client->read()); + if (message.type()) + { + processMessage(&message); + message.reset(); + } + } #endif } void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) { - MqttClient* mqtt = static_cast(mqttclient_ptr); - debug("cnx: connecting"); - MqttMessage msg(MqttMessage::Type::Connect); - msg.add("MQTT",4); - msg.add(0x4); // Mqtt protocol version 3.1.1 - msg.add(0x0); // Connect flags TODO user / name + MqttClient* mqtt = static_cast(mqttclient_ptr); + debug("cnx: connecting"); + MqttMessage msg(MqttMessage::Type::Connect); + msg.add("MQTT",4); + msg.add(0x4); // Mqtt protocol version 3.1.1 + msg.add(0x0); // Connect flags TODO user / name - msg.add(0x00); // keep_alive - msg.add((char)mqtt->keep_alive); - msg.add(mqtt->clientId); - debug("cnx: mqtt connecting"); - msg.sendTo(mqtt); - msg.reset(); - debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent); + msg.add(0x00); // keep_alive + msg.add((char)mqtt->keep_alive); + msg.add(mqtt->clientId); + debug("cnx: mqtt connecting"); + msg.sendTo(mqtt); + msg.reset(); + debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent); - mqtt->clientAlive(0); + mqtt->clientAlive(0); } #ifdef TCP_ASYNC void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len) { - char* char_ptr = static_cast(data); - MqttClient* client=static_cast(client_ptr); - while(len>0) - { - client->message.incoming(*char_ptr++); - if (client->message.type()) - { - client->processMessage(&client->message); - client->message.reset(); - } - len--; - } + char* char_ptr = static_cast(data); + MqttClient* client=static_cast(client_ptr); + while(len>0) + { + client->message.incoming(*char_ptr++); + if (client->message.type()) + { + client->processMessage(&client->message); + client->message.reset(); + } + len--; + } } #endif void MqttClient::resubscribe() { - // TODO resubscription limited to 256 bytes - if (subscriptions.size()) - { - MqttMessage msg(MqttMessage::Type::Subscribe, 2); + // TODO resubscription limited to 256 bytes + if (subscriptions.size()) + { + MqttMessage msg(MqttMessage::Type::Subscribe, 2); - // TODO manage packet identifier - msg.add(0); - msg.add(0); + // TODO manage packet identifier + msg.add(0); + msg.add(0); - for(auto topic: subscriptions) - { - msg.add(topic); - msg.add(0); // TODO qos - } - msg.sendTo(this); // TODO return value - } + for(auto topic: subscriptions) + { + msg.add(topic); + msg.add(0); // TODO qos + } + msg.sendTo(this); // TODO return value + } } MqttError MqttClient::subscribe(Topic topic, uint8_t qos) { - debug("subsribe(" << topic.c_str() << ")"); - MqttError ret = MqttOk; + debug("subsribe(" << topic.c_str() << ")"); + MqttError ret = MqttOk; - subscriptions.insert(topic); + subscriptions.insert(topic); - if (parent==nullptr) // remote broker - { - return sendTopic(topic, MqttMessage::Type::Subscribe, qos); - } - else - { - return parent->subscribe(topic, qos); - } - return ret; + if (parent==nullptr) // remote broker + { + return sendTopic(topic, MqttMessage::Type::Subscribe, qos); + } + else + { + return parent->subscribe(topic, qos); + } + return ret; } MqttError MqttClient::unsubscribe(Topic topic) { - auto it=subscriptions.find(topic); - if (it != subscriptions.end()) - { - subscriptions.erase(it); - if (parent==nullptr) // remote broker - { - return sendTopic(topic, MqttMessage::Type::UnSubscribe, 0); - } - } - return MqttOk; + auto it=subscriptions.find(topic); + if (it != subscriptions.end()) + { + subscriptions.erase(it); + if (parent==nullptr) // remote broker + { + return sendTopic(topic, MqttMessage::Type::UnSubscribe, 0); + } + } + return MqttOk; } MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos) { - MqttMessage msg(type, 2); + MqttMessage msg(type, 2); - // TODO manage packet identifier - msg.add(0); - msg.add(0); + // TODO manage packet identifier + msg.add(0); + msg.add(0); - msg.add(topic); - msg.add(qos); + msg.add(topic); + msg.add(qos); - // TODO instead we should wait (state machine) for SUBACK / UNSUBACK ? - return msg.sendTo(this); + // TODO instead we should wait (state machine) for SUBACK / UNSUBACK ? + return msg.sendTo(this); } void MqttClient::processMessage(MqttMessage* mesg) @@ -404,373 +404,373 @@ void MqttClient::processMessage(MqttMessage* mesg) #ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { - #ifdef NOT_ESP_CORE - Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << " ESP.getFreeHeap() "<< endl; - #else - Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; - #endif - // mesg->hexdump("Incoming"); - mesg->hexdump("Incoming"); + #ifdef NOT_ESP_CORE + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << " ESP.getFreeHeap() "<< endl; + #else + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; + #endif + // mesg->hexdump("Incoming"); + mesg->hexdump("Incoming"); } #endif auto header = mesg->getVHeader(); const char* payload; uint16_t len; - bool bclose=true; + bool bclose=true; #ifdef EPOXY_DUINO counters[mesg->type()]++; #endif - switch(mesg->type()) - { - case MqttMessage::Type::Connect: - if (mqtt_connected) - { - debug("already connected"); - break; - } - payload = header+10; - mqtt_flags = header[7]; - keep_alive = (header[8]<<8)|(header[9]); - if (strncmp("MQTT", header+2,4)) - { - debug("bad mqtt header"); - break; - } - if (header[6]!=0x04) - { - debug("unknown level"); - break; // Level 3.1.1 - } + switch(mesg->type()) + { + case MqttMessage::Type::Connect: + if (mqtt_connected) + { + debug("already connected"); + break; + } + payload = header+10; + mqtt_flags = header[7]; + keep_alive = (header[8]<<8)|(header[9]); + if (strncmp("MQTT", header+2,4)) + { + debug("bad mqtt header"); + break; + } + if (header[6]!=0x04) + { + debug("unknown level"); + break; // Level 3.1.1 + } - // ClientId - mesg->getString(payload, len); - clientId = std::string(payload, len); - payload += len; + // ClientId + mesg->getString(payload, len); + clientId = std::string(payload, len); + payload += len; - if (mqtt_flags & FlagWill) // Will topic - { - mesg->getString(payload, len); // Will Topic - payload += len; + if (mqtt_flags & FlagWill) // Will topic + { + mesg->getString(payload, len); // Will Topic + payload += len; - mesg->getString(payload, len); // Will Message - payload += len; - } - // FIXME forgetting credential is allowed (security hole) - if (mqtt_flags & FlagUserName) - { - mesg->getString(payload, len); - if (!parent->checkUser(payload, len)) break; - payload += len; - } - if (mqtt_flags & FlagPassword) - { - mesg->getString(payload, len); - if (!parent->checkPassword(payload, len)) break; - payload += len; - } + mesg->getString(payload, len); // Will Message + payload += len; + } + // FIXME forgetting credential is allowed (security hole) + if (mqtt_flags & FlagUserName) + { + mesg->getString(payload, len); + if (!parent->checkUser(payload, len)) break; + payload += len; + } + if (mqtt_flags & FlagPassword) + { + mesg->getString(payload, len); + if (!parent->checkPassword(payload, len)) break; + payload += len; + } #ifdef TINY_MQTT_DEBUG - Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl; + Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl; #endif - bclose = false; - mqtt_connected=true; - { - MqttMessage msg(MqttMessage::Type::ConnAck); - msg.add(0); // Session present (not implemented) - msg.add(0); // Connection accepted - msg.sendTo(this); - } - break; + bclose = false; + mqtt_connected=true; + { + MqttMessage msg(MqttMessage::Type::ConnAck); + msg.add(0); // Session present (not implemented) + msg.add(0); // Connection accepted + msg.sendTo(this); + } + break; - case MqttMessage::Type::ConnAck: - mqtt_connected = true; - bclose = false; - resubscribe(); - break; + case MqttMessage::Type::ConnAck: + mqtt_connected = true; + bclose = false; + resubscribe(); + break; - case MqttMessage::Type::SubAck: - case MqttMessage::Type::PubAck: - if (!mqtt_connected) break; - // Ignore acks - bclose = false; - break; + case MqttMessage::Type::SubAck: + case MqttMessage::Type::PubAck: + if (!mqtt_connected) break; + // Ignore acks + bclose = false; + break; - case MqttMessage::Type::PingResp: - // TODO: no PingResp is suspicious (server dead) - bclose = false; - break; + case MqttMessage::Type::PingResp: + // TODO: no PingResp is suspicious (server dead) + bclose = false; + break; - case MqttMessage::Type::PingReq: - if (!mqtt_connected) break; - if (client) - { - uint16_t pingreq = MqttMessage::Type::PingResp; - client->write((const char*)(&pingreq), 2); - bclose = false; - } - else - { - debug("internal pingreq ?"); - } - break; + case MqttMessage::Type::PingReq: + if (!mqtt_connected) break; + if (client) + { + uint16_t pingreq = MqttMessage::Type::PingResp; + client->write((const char*)(&pingreq), 2); + bclose = false; + } + else + { + debug("internal pingreq ?"); + } + break; - case MqttMessage::Type::Subscribe: - case MqttMessage::Type::UnSubscribe: - { - if (!mqtt_connected) break; - payload = header+2; - - debug("un/subscribe loop"); - std::string qoss; - while(payload < mesg->end()) - { - mesg->getString(payload, len); // Topic - debug( " topic (" << std::string(payload, len) << ')'); - // subscribe(Topic(payload, len)); - Topic topic(payload, len); + case MqttMessage::Type::Subscribe: + case MqttMessage::Type::UnSubscribe: + { + if (!mqtt_connected) break; + payload = header+2; + + debug("un/subscribe loop"); + std::string qoss; + while(payload < mesg->end()) + { + mesg->getString(payload, len); // Topic + debug( " topic (" << std::string(payload, len) << ')'); + // subscribe(Topic(payload, len)); + Topic topic(payload, len); - payload += len; - if (mesg->type() == MqttMessage::Type::Subscribe) - { - uint8_t qos = *payload++; - if (qos != 0) - { - debug("Unsupported QOS" << qos << endl); - qoss.push_back(0x80); - } - else - qoss.push_back(qos); - subscriptions.insert(topic); - } - else - { - auto it=subscriptions.find(topic); - if (it != subscriptions.end()) - subscriptions.erase(it); - } - } - debug("end loop"); - bclose = false; + payload += len; + if (mesg->type() == MqttMessage::Type::Subscribe) + { + uint8_t qos = *payload++; + if (qos != 0) + { + debug("Unsupported QOS" << qos << endl); + qoss.push_back(0x80); + } + else + qoss.push_back(qos); + subscriptions.insert(topic); + } + else + { + auto it=subscriptions.find(topic); + if (it != subscriptions.end()) + subscriptions.erase(it); + } + } + debug("end loop"); + bclose = false; MqttMessage ack(mesg->type() == MqttMessage::Type::Subscribe ? MqttMessage::Type::SubAck : MqttMessage::Type::UnSuback); ack.add(header[0]); ack.add(header[1]); ack.add(qoss.c_str(), qoss.size(), false); ack.sendTo(this); - } - break; + } + break; - case MqttMessage::Type::UnSuback: - if (!mqtt_connected) break; - bclose = false; - break; + case MqttMessage::Type::UnSuback: + if (!mqtt_connected) break; + bclose = false; + break; - case MqttMessage::Type::Publish: - #ifdef TINY_MQTT_DEBUG - Serial << "publish " << mqtt_connected << '/' << (long) client << endl; - #endif - if (mqtt_connected or client == nullptr) - { - uint8_t qos = mesg->flags(); - payload = header; - mesg->getString(payload, len); - Topic published(payload, len); - payload += len; - // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len - // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; - if (qos) payload+=2; // ignore packet identifier if any - len=mesg->end()-payload; - // TODO reset DUP - // TODO reset RETAIN + case MqttMessage::Type::Publish: + #ifdef TINY_MQTT_DEBUG + Serial << "publish " << mqtt_connected << '/' << (long) client << endl; + #endif + if (mqtt_connected or client == nullptr) + { + uint8_t qos = mesg->flags(); + payload = header; + mesg->getString(payload, len); + Topic published(payload, len); + payload += len; + // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; + if (qos) payload+=2; // ignore packet identifier if any + len=mesg->end()-payload; + // TODO reset DUP + // TODO reset RETAIN - if (parent==nullptr or client==nullptr) // internal MqttClient receives publish - { - #ifdef TINY_MQTT_DEBUG - Serial << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n"; - Serial << "has " << (callback ? "" : "no ") << " callback.\n"; - #endif - if (callback and isSubscribedTo(published)) - { - callback(this, published, payload, len); // TODO send the real payload - } - } - else if (parent) // from outside to inside - { - debug("publishing to parent"); - parent->publish(this, published, *mesg); - } - bclose = false; - } - break; + if (parent==nullptr or client==nullptr) // internal MqttClient receives publish + { + #ifdef TINY_MQTT_DEBUG + Serial << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n"; + Serial << "has " << (callback ? "" : "no ") << " callback.\n"; + #endif + if (callback and isSubscribedTo(published)) + { + callback(this, published, payload, len); // TODO send the real payload + } + } + else if (parent) // from outside to inside + { + debug("publishing to parent"); + parent->publish(this, published, *mesg); + } + bclose = false; + } + break; - case MqttMessage::Type::Disconnect: - // TODO should discard any will msg - if (!mqtt_connected) break; - mqtt_connected = false; - close(false); - bclose=false; - break; + case MqttMessage::Type::Disconnect: + // TODO should discard any will msg + if (!mqtt_connected) break; + mqtt_connected = false; + close(false); + bclose=false; + break; - default: - bclose=true; - break; - }; - if (bclose) - { + default: + bclose=true; + break; + }; + if (bclose) + { #ifdef TINY_MQTT_DEBUG - Serial << "*************** Error msg 0x" << _HEX(mesg->type()); - mesg->hexdump("-------ERROR ------"); - dump(); - Serial << endl; + Serial << "*************** Error msg 0x" << _HEX(mesg->type()); + mesg->hexdump("-------ERROR ------"); + dump(); + Serial << endl; #endif - close(); + close(); + } + else + { + clientAlive(parent ? 5 : 0); } - else - { - clientAlive(parent ? 5 : 0); - } } bool Topic::matches(const Topic& topic) const { - if (getIndex() == topic.getIndex()) return true; - if (str() == topic.str()) return true; - return false; + if (getIndex() == topic.getIndex()) return true; + if (str() == topic.str()) return true; + return false; } // publish from local client MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) { - MqttMessage msg(MqttMessage::Publish); - msg.add(topic); - msg.add(payload, pay_length, false); - msg.complete(); + MqttMessage msg(MqttMessage::Publish); + msg.add(topic); + msg.add(payload, pay_length, false); + msg.complete(); - if (parent) - { - return parent->publish(this, topic, msg); - } - else if (client) - return msg.sendTo(this); - else - return MqttNowhereToSend; + if (parent) + { + return parent->publish(this, topic, msg); + } + else if (client) + return msg.sendTo(this); + else + return MqttNowhereToSend; } // republish a received publish if it matches any in subscriptions MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg) { - MqttError retval=MqttOk; + MqttError retval=MqttOk; - debug("mqttclient publish " << subscriptions.size()); - if (isSubscribedTo(topic)) - { - if (client) - retval = msg.sendTo(this); - else - { - processMessage(&msg); + debug("mqttclient publish " << subscriptions.size()); + if (isSubscribedTo(topic)) + { + if (client) + retval = msg.sendTo(this); + else + { + processMessage(&msg); - #ifdef TINY_MQTT_DEBUG - Serial << "Should call the callback ?\n"; - #endif - // callback(this, topic, nullptr, 0); // TODO Payload - } - } - return retval; + #ifdef TINY_MQTT_DEBUG + Serial << "Should call the callback ?\n"; + #endif + // callback(this, topic, nullptr, 0); // TODO Payload + } + } + return retval; } bool MqttClient::isSubscribedTo(const Topic& topic) const { - for(const auto& subscription: subscriptions) - if (subscription.matches(topic)) - return true; + for(const auto& subscription: subscriptions) + if (subscription.matches(topic)) + return true; - return false; + return false; } void MqttMessage::reset() { - buffer.clear(); - state=FixedHeader; - size=0; + buffer.clear(); + state=FixedHeader; + size=0; } void MqttMessage::incoming(char in_byte) { - buffer += in_byte; - switch(state) - { - case FixedHeader: - size=MaxBufferLength; - state = Length; - break; - case Length: + buffer += in_byte; + switch(state) + { + case FixedHeader: + size=MaxBufferLength; + state = Length; + break; + case Length: - if (size==MaxBufferLength) + if (size==MaxBufferLength) size = in_byte & 0x7F; - else - size += static_cast(in_byte & 0x7F)<<7; + else + size += static_cast(in_byte & 0x7F)<<7; - if (size > MaxBufferLength) - state = Error; - else if ((in_byte & 0x80) == 0) - { - vheader = buffer.length(); - if (size==0) - state = Complete; - else - { - buffer.reserve(size); - state = VariableHeader; - } - } - break; - case VariableHeader: - case PayLoad: - --size; - if (size==0) - { - state=Complete; - // hexdump("rec"); - } - break; - case Create: - size++; - break; - case Complete: - default: - #ifdef TINY_MQTT_DEBUG - Serial << "Spurious " << _HEX(in_byte) << endl; - hexdump("spurious"); + if (size > MaxBufferLength) + state = Error; + else if ((in_byte & 0x80) == 0) + { + vheader = buffer.length(); + if (size==0) + state = Complete; + else + { + buffer.reserve(size); + state = VariableHeader; + } + } + break; + case VariableHeader: + case PayLoad: + --size; + if (size==0) + { + state=Complete; + // hexdump("rec"); + } + break; + case Create: + size++; + break; + case Complete: + default: + #ifdef TINY_MQTT_DEBUG + Serial << "Spurious " << _HEX(in_byte) << endl; + hexdump("spurious"); #endif - reset(); - break; - } - if (buffer.length() > MaxBufferLength) - { - debug("Too long " << state); - reset(); - } + reset(); + break; + } + if (buffer.length() > MaxBufferLength) + { + debug("Too long " << state); + reset(); + } } void MqttMessage::add(const char* p, size_t len, bool addLength) { - if (addLength) - { - buffer.reserve(buffer.length()+2); - incoming(len>>8); - incoming(len & 0xFF); - } - while(len--) incoming(*p++); + if (addLength) + { + buffer.reserve(buffer.length()+2); + incoming(len>>8); + incoming(len & 0xFF); + } + while(len--) incoming(*p++); } void MqttMessage::encodeLength() { if (state != Complete) { - int length = buffer.size()-3; // 3 = 1 byte for header + 2 bytes for pre-reserved length field. + int length = buffer.size()-3; // 3 = 1 byte for header + 2 bytes for pre-reserved length field. if (length <= 0x7F) { buffer.erase(1,1); @@ -791,59 +791,59 @@ void MqttMessage::encodeLength() MqttError MqttMessage::sendTo(MqttClient* client) { - if (buffer.size()) - { - debug("sending " << buffer.size() << " bytes"); - encodeLength(); - // hexdump("snd"); - client->write(&buffer[0], buffer.size()); - } - else - { - debug("??? Invalid send"); - return MqttInvalidMessage; - } - return MqttOk; + if (buffer.size()) + { + debug("sending " << buffer.size() << " bytes"); + encodeLength(); + // hexdump("snd"); + client->write(&buffer[0], buffer.size()); + } + else + { + debug("??? Invalid send"); + return MqttInvalidMessage; + } + return MqttOk; } void MqttMessage::hexdump(const char* prefix) const { (void)prefix; #ifdef TINY_MQTT_DEBUG - uint16_t addr=0; - const int bytes_per_row = 8; - const char* hex_to_str = " | "; - const char* separator = hex_to_str; - const char* half_sep = " - "; - std::string ascii; + uint16_t addr=0; + const int bytes_per_row = 8; + const char* hex_to_str = " | "; + const char* separator = hex_to_str; + const char* half_sep = " - "; + std::string ascii; - Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl; + Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl; - for(const char chr: buffer) - { - if ((addr % bytes_per_row) == 0) - { - if (ascii.length()) Serial << hex_to_str << ascii << separator << endl; - if (prefix) Serial << prefix << separator; - ascii.clear(); - } - addr++; - if (chr<16) Serial << '0'; - Serial << _HEX(chr) << ' '; + for(const char chr: buffer) + { + if ((addr % bytes_per_row) == 0) + { + if (ascii.length()) Serial << hex_to_str << ascii << separator << endl; + if (prefix) Serial << prefix << separator; + ascii.clear(); + } + addr++; + if (chr<16) Serial << '0'; + Serial << _HEX(chr) << ' '; - ascii += (chr<32 ? '.' : chr); - if (ascii.length() == (bytes_per_row/2)) ascii += half_sep; - } - if (ascii.length()) - { - while(ascii.length() < bytes_per_row+strlen(half_sep)) - { - Serial << " "; // spaces per hexa byte - ascii += ' '; - } - Serial << hex_to_str << ascii << separator; - } + ascii += (chr<32 ? '.' : chr); + if (ascii.length() == (bytes_per_row/2)) ascii += half_sep; + } + if (ascii.length()) + { + while(ascii.length() < bytes_per_row+strlen(half_sep)) + { + Serial << " "; // spaces per hexa byte + ascii += ' '; + } + Serial << hex_to_str << ascii << separator; + } - Serial << endl; + Serial << endl; #endif } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index bc36e7e..d852d48 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,18 +1,18 @@ #pragma once // TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC -// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx +// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx #if defined(ESP8266) || defined(EPOXY_DUINO) - #ifdef TCP_ASYNC - #include + #ifdef TCP_ASYNC + #include #else #include #endif #elif defined(ESP32) #include - #ifdef TCP_ASYNC - #include // https://github.com/me-no-dev/AsyncTCP + #ifdef TCP_ASYNC + #include // https://github.com/me-no-dev/AsyncTCP #endif #endif #ifdef EPOXY_DUINO @@ -44,165 +44,165 @@ enum __attribute__((packed)) MqttError { - MqttOk = 0, - MqttNowhereToSend=1, - MqttInvalidMessage=2, + MqttOk = 0, + MqttNowhereToSend=1, + MqttInvalidMessage=2, }; class Topic : public IndexedString { - public: - Topic(const char* s, uint8_t len) : IndexedString(s,len){} - Topic(const char* s) : Topic(s, strlen(s)) {} - Topic(const std::string s) : Topic(s.c_str(), s.length()){}; + public: + Topic(const char* s, uint8_t len) : IndexedString(s,len){} + Topic(const char* s) : Topic(s, strlen(s)) {} + Topic(const std::string s) : Topic(s.c_str(), s.length()){}; - const char* c_str() const { return str().c_str(); } + const char* c_str() const { return str().c_str(); } - bool matches(const Topic&) const; + bool matches(const Topic&) const; }; class MqttClient; class MqttMessage { - const uint16_t MaxBufferLength = 4096; //hard limit: 16k due to size decoding - public: - enum __attribute__((packed)) Type - { - Unknown = 0, - Connect = 0x10, - ConnAck = 0x20, - Publish = 0x30, - PubAck = 0x40, - Subscribe = 0x80, - SubAck = 0x90, - UnSubscribe = 0xA0, - UnSuback = 0xB0, - PingReq = 0xC0, - PingResp = 0xD0, - Disconnect = 0xE0 - }; - enum __attribute__((packed)) State - { - FixedHeader=0, - Length=1, - VariableHeader=2, - PayLoad=3, - Complete=4, - Error=5, - Create=6 - }; + const uint16_t MaxBufferLength = 4096; //hard limit: 16k due to size decoding + public: + enum __attribute__((packed)) Type + { + Unknown = 0, + Connect = 0x10, + ConnAck = 0x20, + Publish = 0x30, + PubAck = 0x40, + Subscribe = 0x80, + SubAck = 0x90, + UnSubscribe = 0xA0, + UnSuback = 0xB0, + PingReq = 0xC0, + PingResp = 0xD0, + Disconnect = 0xE0 + }; + enum __attribute__((packed)) State + { + FixedHeader=0, + Length=1, + VariableHeader=2, + PayLoad=3, + Complete=4, + Error=5, + Create=6 + }; - MqttMessage() { reset(); } - MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; } - void incoming(char byte); - void add(char byte) { incoming(byte); } - void add(const char* p, size_t len, bool addLength=true ); - void add(const std::string& s) { add(s.c_str(), s.length()); } - void add(const Topic& t) { add(t.str()); } - const char* end() const { return &buffer[0]+buffer.size(); } - const char* getVHeader() const { return &buffer[vheader]; } - void complete() { encodeLength(); } + MqttMessage() { reset(); } + MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; } + void incoming(char byte); + void add(char byte) { incoming(byte); } + void add(const char* p, size_t len, bool addLength=true ); + void add(const std::string& s) { add(s.c_str(), s.length()); } + void add(const Topic& t) { add(t.str()); } + const char* end() const { return &buffer[0]+buffer.size(); } + const char* getVHeader() const { return &buffer[vheader]; } + void complete() { encodeLength(); } - void reset(); + void reset(); - // buff is MSB/LSB/STRING - // output buff+=2, len=length(str) - static void getString(const char* &buff, uint16_t& len); + // buff is MSB/LSB/STRING + // output buff+=2, len=length(str) + static void getString(const char* &buff, uint16_t& len); - Type type() const - { - return state == Complete ? static_cast(buffer[0] & 0xF0) : Unknown; - } + Type type() const + { + return state == Complete ? static_cast(buffer[0] & 0xF0) : Unknown; + } - uint8_t flags() const { return static_cast(buffer[0] & 0x0F); } + uint8_t flags() const { return static_cast(buffer[0] & 0x0F); } - void create(Type type) - { - buffer=(decltype(buffer)::value_type)type; - buffer+='\0'; // reserved for msg length byte 1/2 - buffer+='\0'; // reserved for msg length byte 2/2 (fixed) - vheader=3; // Should never change - size=0; - state=Create; - } - MqttError sendTo(MqttClient*); - void hexdump(const char* prefix=nullptr) const; + void create(Type type) + { + buffer=(decltype(buffer)::value_type)type; + buffer+='\0'; // reserved for msg length byte 1/2 + buffer+='\0'; // reserved for msg length byte 2/2 (fixed) + vheader=3; // Should never change + size=0; + state=Create; + } + MqttError sendTo(MqttClient*); + void hexdump(const char* prefix=nullptr) const; - private: - void encodeLength(); + private: + void encodeLength(); - std::string buffer; - uint8_t vheader; - uint16_t size; // bytes left to receive - State state; + std::string buffer; + uint8_t vheader; + uint16_t size; // bytes left to receive + State state; }; class MqttBroker; class MqttClient { - using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length); - enum __attribute__((packed)) Flags - { - FlagUserName = 128, - FlagPassword = 64, - FlagWillRetain = 32, // unsupported - FlagWillQos = 16 | 8, // unsupported - FlagWill = 4, // unsupported - FlagCleanSession = 2, // unsupported - FlagReserved = 1 - }; - public: - /** Constructor. If broker is not null, this is the adress of a local broker. - If you want to connect elsewhere, leave broker null and use connect() **/ - MqttClient(MqttBroker* broker = nullptr, const std::string& id=""); - MqttClient(const std::string& id) : MqttClient(nullptr, id){} + using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length); + enum __attribute__((packed)) Flags + { + FlagUserName = 128, + FlagPassword = 64, + FlagWillRetain = 32, // unsupported + FlagWillQos = 16 | 8, // unsupported + FlagWill = 4, // unsupported + FlagCleanSession = 2, // unsupported + FlagReserved = 1 + }; + public: + /** Constructor. If broker is not null, this is the adress of a local broker. + If you want to connect elsewhere, leave broker null and use connect() **/ + MqttClient(MqttBroker* broker = nullptr, const std::string& id=""); + MqttClient(const std::string& id) : MqttClient(nullptr, id){} - ~MqttClient(); + ~MqttClient(); - void connect(MqttBroker* parent); - void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10); + void connect(MqttBroker* parent); + void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10); // TODO it seems that connected returns true in tcp mode even if // no negociation occured (only if tcp link is established) - bool connected() { return - (parent!=nullptr and client==nullptr) or - (client and client->connected()); } - void write(const char* buf, size_t length) - { if (client) client->write(buf, length); } + bool connected() { return + (parent!=nullptr and client==nullptr) or + (client and client->connected()); } + void write(const char* buf, size_t length) + { if (client) client->write(buf, length); } - const std::string& id() const { return clientId; } - void id(std::string& new_id) { clientId = new_id; } + const std::string& id() const { return clientId; } + void id(std::string& new_id) { clientId = new_id; } - /** Should be called in main loop() */ - void loop(); - void close(bool bSendDisconnect=true); - void setCallback(CallBack fun) - { - callback=fun; - #ifdef TINY_MQTT_DEBUG - Serial << "Callback set to " << (long)fun << endl; - if (callback) callback(this, "test/topic", "value", 5); - #endif - }; + /** Should be called in main loop() */ + void loop(); + void close(bool bSendDisconnect=true); + void setCallback(CallBack fun) + { + callback=fun; + #ifdef TINY_MQTT_DEBUG + Serial << "Callback set to " << (long)fun << endl; + if (callback) callback(this, "test/topic", "value", 5); + #endif + }; - // Publish from client to the world - MqttError publish(const Topic&, const char* payload, size_t pay_length); - MqttError publish(const Topic& t, const char* payload) { return publish(t, payload, strlen(payload)); } - MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); } - MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());} - MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; + // Publish from client to the world + MqttError publish(const Topic&, const char* payload, size_t pay_length); + MqttError publish(const Topic& t, const char* payload) { return publish(t, payload, strlen(payload)); } + MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); } + MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());} + MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; - MqttError subscribe(Topic topic, uint8_t qos=0); - MqttError unsubscribe(Topic topic); - bool isSubscribedTo(const Topic& topic) const; + MqttError subscribe(Topic topic, uint8_t qos=0); + MqttError unsubscribe(Topic topic); + bool isSubscribedTo(const Topic& topic) const; - // connected to local broker - // TODO seems to be useless - bool isLocal() const { return client == nullptr; } + // connected to local broker + // TODO seems to be useless + bool isLocal() const { return client == nullptr; } - void dump(std::string indent="") - { - (void)indent; + void dump(std::string indent="") + { + (void)indent; #ifdef TINY_MQTT_DEBUG uint32_t ms=millis(); Serial << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF"); @@ -223,100 +223,100 @@ class MqttClient } Serial << endl; #endif - } + } #ifdef EPOXY_DUINO - static std::map counters; // Number of processed messages + static std::map counters; // Number of processed messages #endif - private: + private: - // event when tcp/ip link established (real or fake) - static void onConnect(void * client_ptr, TcpClient*); + // event when tcp/ip link established (real or fake) + static void onConnect(void * client_ptr, TcpClient*); #ifdef TCP_ASYNC - static void onData(void* client_ptr, TcpClient*, void* data, size_t len); + static void onData(void* client_ptr, TcpClient*, void* data, size_t len); #endif - MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); - void resubscribe(); + MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); + void resubscribe(); - friend class MqttBroker; - MqttClient(MqttBroker* parent, TcpClient* client); - // republish a received publish if topic matches any in subscriptions - MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg); + friend class MqttBroker; + MqttClient(MqttBroker* parent, TcpClient* client); + // republish a received publish if topic matches any in subscriptions + MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg); - void clientAlive(uint32_t more_seconds); - void processMessage(MqttMessage* message); + void clientAlive(uint32_t more_seconds); + void processMessage(MqttMessage* message); - bool mqtt_connected = false; - char mqtt_flags; - uint32_t keep_alive = 60; - uint32_t alive; - MqttMessage message; + bool mqtt_connected = false; + char mqtt_flags; + uint32_t keep_alive = 60; + uint32_t alive; + MqttMessage message; - // TODO having a pointer on MqttBroker may produce larger binaries - // due to unecessary function linked if ever parent is not used - // (this is the case when MqttBroker isn't used except here) - MqttBroker* parent=nullptr; // connection to local broker + // TODO having a pointer on MqttBroker may produce larger binaries + // due to unecessary function linked if ever parent is not used + // (this is the case when MqttBroker isn't used except here) + MqttBroker* parent=nullptr; // connection to local broker - TcpClient* client=nullptr; // connection to remote broker - std::set subscriptions; - std::string clientId; - CallBack callback = nullptr; + TcpClient* client=nullptr; // connection to remote broker + std::set subscriptions; + std::string clientId; + CallBack callback = nullptr; }; class MqttBroker { - enum __attribute__((packed)) State - { - Disconnected, // Also the initial state - Connecting, // connect and sends a fake publish to avoid circular cnx - Connected, // this->broker is connected and circular cnx avoided - }; - public: - // TODO limit max number of clients - MqttBroker(uint16_t port); - ~MqttBroker(); + enum __attribute__((packed)) State + { + Disconnected, // Also the initial state + Connecting, // connect and sends a fake publish to avoid circular cnx + Connected, // this->broker is connected and circular cnx avoided + }; + public: + // TODO limit max number of clients + MqttBroker(uint16_t port); + ~MqttBroker(); - void begin() { server->begin(); } - void loop(); + void begin() { server->begin(); } + void loop(); - void connect(const std::string& host, uint16_t port=1883); - bool connected() const { return state == Connected; } + void connect(const std::string& host, uint16_t port=1883); + bool connected() const { return state == Connected; } - size_t clientsCount() const { return clients.size(); } + size_t clientsCount() const { return clients.size(); } - void dump(std::string indent="") - { - for(auto client: clients) - client->dump(indent); - } + void dump(std::string indent="") + { + for(auto client: clients) + client->dump(indent); + } - private: - friend class MqttClient; + private: + friend class MqttClient; - static void onClient(void*, TcpClient*); - bool checkUser(const char* user, uint8_t len) const - { return compareString(auth_user, user, len); } + static void onClient(void*, TcpClient*); + bool checkUser(const char* user, uint8_t len) const + { return compareString(auth_user, user, len); } - bool checkPassword(const char* password, uint8_t len) const - { return compareString(auth_password, password, len); } + bool checkPassword(const char* password, uint8_t len) const + { return compareString(auth_password, password, len); } - MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const; + MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const; - MqttError subscribe(const Topic& topic, uint8_t qos); + MqttError subscribe(const Topic& topic, uint8_t qos); - // For clients that are added not by the broker itself - void addClient(MqttClient* client); - void removeClient(MqttClient* client); + // For clients that are added not by the broker itself + void addClient(MqttClient* client); + void removeClient(MqttClient* client); - bool compareString(const char* good, const char* str, uint8_t str_len) const; - std::vector clients; - TcpServer* server; + bool compareString(const char* good, const char* str, uint8_t str_len) const; + std::vector clients; + TcpServer* server; - const char* auth_user = "guest"; - const char* auth_password = "guest"; - State state = Disconnected; + const char* auth_user = "guest"; + const char* auth_password = "guest"; + State state = Disconnected; - MqttClient* broker = nullptr; + MqttClient* broker = nullptr; };