From 36b452281f64756516c871fced8f1ddd23baebd0 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 13:42:43 +0200 Subject: [PATCH 01/12] Very promising async commit --- src/TinyMqtt.cpp | 66 ++++++++++++++++++++++++++++++------------------ src/TinyMqtt.h | 18 ++++++------- 2 files changed, 50 insertions(+), 34 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 24fb984..da96ee1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -9,8 +9,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) Serial << '\'' << endl; } -MqttBroker::MqttBroker(uint16_t port) : server(port) +MqttBroker::MqttBroker(uint16_t port) { + server = new AsyncServer(port); + server->onClient(onClient, this); } MqttBroker::~MqttBroker() @@ -19,14 +21,16 @@ MqttBroker::~MqttBroker() { delete clients[0]; } - server.close(); + delete server; } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) - : parent(parent) +MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) + : parent(parent), client(new_client) { - client = new WiFiClient(new_client); + client->onData(onData, this); + // client->onConnect() TODO + // client->onDisconnect() TODO alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -70,8 +74,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) debug("cnx: closing"); close(); if (client) delete client; - client = new WiFiClient; + client = new AsyncClient; debug("Trying to connect to " << broker.c_str() << ':' << port); + // TODO This may return immediately !!! + // TODO so I have to add onConnect and move this code to onConnect + // TODO also, as this is async now, I must take care of + // TODO the broker that may disconnect and delete the client immediately if (client->connect(broker.c_str(), port)) { debug("cnx: connecting"); @@ -126,10 +134,16 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } +void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +{ + MqttBroker* broker = static_cast(broker_ptr); + + broker->addClient(new MqttClient(broker, client)); + debug("New client #" << broker->clients->size()); +} + void MqttBroker::loop() { - WiFiClient client = server.available(); - if (broker) { // TODO should monitor broker's activity. @@ -137,11 +151,6 @@ void MqttBroker::loop() broker->loop(); } - if (client) - { - addClient(new MqttClient(this, client)); - debug("New client (" << clients.size() << ')'); - } // for(auto it=clients.begin(); it!=clients.end(); it++) // use index because size can change during the loop @@ -168,6 +177,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) { return broker->subscribe(topic, qos); } + return MqttNowhereToSend; } MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const @@ -179,7 +189,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons for(auto client: clients) { i++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif @@ -200,7 +210,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons { doit = true; } -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << ", doit=" << doit << ' '; #endif @@ -250,22 +260,28 @@ void MqttClient::loop() { debug("pingreq"); uint16_t pingreq = MqttMessage::Type::PingReq; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); clientAlive(0); // TODO when many MqttClient passes through a local browser // there is no need to send one PingReq per instance. } } +} - while(client && client->available()>0) +void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) +{ + char* char_ptr = static_cast(data); + MqttClient* client=static_cast(client_ptr); + while(len>0) { - message.incoming(client->read()); - if (message.type()) + client->message.incoming(*char_ptr++); + if (client->message.type()) { - processMessage(&message); - message.reset(); + client->processMessage(&client->message); + client->message.reset(); } + len--; } } @@ -341,7 +357,7 @@ long MqttClient::counter=0; void MqttClient::processMessage(const MqttMessage* mesg) { counter++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; @@ -438,7 +454,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T if (client) { uint16_t pingreq = MqttMessage::Type::PingResp; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); bclose = false; } else @@ -470,7 +486,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T subscriptions.erase(it); } payload += len; - uint8_t qos = *payload++; + /* uint8_t qos =*/ *payload++; debug(" qos=" << qos); } debug("end loop"); @@ -488,7 +504,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T Topic published(payload, len); payload += len; // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len - // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; if (qos) payload+=2; // ignore packet identifier if any len=mesg->end()-payload; // TODO reset DUP diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index f2e52ca..ee9692e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,15 +1,15 @@ +#pragma once #include +#include #include #include #include #include "StringIndexer.h" #include -#if 0 +#ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } - #define TINY_MQTT_DEBUG 1 #else - #define TINY_MQTT_DEBUG 0 #define debug(what) {} #endif @@ -189,11 +189,12 @@ class MqttClient static long counter; private: + static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, WiFiClient& client); + MqttClient(MqttBroker* parent, AsyncClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -211,7 +212,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - WiFiClient* client=nullptr; // connection to mqtt client or to remote broker + AsyncClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -230,11 +231,9 @@ class MqttBroker MqttBroker(uint16_t port); ~MqttBroker(); - void begin() { server.begin(); } + void begin() { server->begin(); } void loop(); - uint16_t port() const { return server.port(); } - void connect(const std::string& host, uint16_t port=1883); bool connected() const { return state == Connected; } @@ -253,6 +252,7 @@ class MqttBroker private: friend class MqttClient; + static void onClient(void*, AsyncClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -270,7 +270,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - WiFiServer server; + AsyncServer* server; const char* auth_user = "guest"; const char* auth_password = "guest"; From f42464c17310c02c19c99bc64507dd9e4169bbf2 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 13:58:23 +0200 Subject: [PATCH 02/12] AsyncTCP (to be continued) --- library.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/library.properties b/library.properties index 687d572..fc85179 100644 --- a/library.properties +++ b/library.properties @@ -8,4 +8,4 @@ category=Communication url=https://github.com/hsaturn/TinyMqtt architectures=* includes=TinyMqtt.h -depends= +depends=AsyncTCP From e90076d010249f5a8fb34514d49dce828596886f Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 14:03:36 +0200 Subject: [PATCH 03/12] AsyncTcp --- src/TinyMqtt.cpp | 1 + src/TinyMqtt.h | 10 +--------- tests/Makefile | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index da96ee1..74ca4b1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -672,6 +672,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength) { if (addLength) { + buffer.reserve(buffer.length()+addLength+2); incoming(len>>8); incoming(len & 0xFF); } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index ee9692e..d6dc2db 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -80,23 +80,15 @@ class MqttMessage // output buff+=2, len=length(str) static void getString(const char* &buff, uint16_t& len); - Type type() const { return state == Complete ? static_cast(buffer[0]) : Unknown; } - // shouldn't exist because it breaks constness :-( - // but this saves memory so ... - void changeType(Type type) const - { - buffer[0] = type; - } - void create(Type type) { buffer=(char)type; - buffer+='\0'; + buffer+='\0'; // reserved for msg length vheader=2; size=0; state=Create; diff --git a/tests/Makefile b/tests/Makefile index bfea400..e2d6bac 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -5,7 +5,7 @@ tests: $(MAKE) -C $$(dirname $$i) -j; \ done -runtests: +runtests: tests set -e; \ for i in *-tests/Makefile; do \ echo '==== Running:' $$(dirname $$i); \ From 3e8d34e4e7feec16b7be0e97ffadf8a834235325 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 13:42:43 +0200 Subject: [PATCH 04/12] Very promising async commit Very promising async commit --- src/TinyMqtt.cpp | 66 ++++++++++++++++++++++++++++++------------------ src/TinyMqtt.h | 17 ++++++------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 24fb984..da96ee1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -9,8 +9,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) Serial << '\'' << endl; } -MqttBroker::MqttBroker(uint16_t port) : server(port) +MqttBroker::MqttBroker(uint16_t port) { + server = new AsyncServer(port); + server->onClient(onClient, this); } MqttBroker::~MqttBroker() @@ -19,14 +21,16 @@ MqttBroker::~MqttBroker() { delete clients[0]; } - server.close(); + delete server; } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) - : parent(parent) +MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) + : parent(parent), client(new_client) { - client = new WiFiClient(new_client); + client->onData(onData, this); + // client->onConnect() TODO + // client->onDisconnect() TODO alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -70,8 +74,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) debug("cnx: closing"); close(); if (client) delete client; - client = new WiFiClient; + client = new AsyncClient; debug("Trying to connect to " << broker.c_str() << ':' << port); + // TODO This may return immediately !!! + // TODO so I have to add onConnect and move this code to onConnect + // TODO also, as this is async now, I must take care of + // TODO the broker that may disconnect and delete the client immediately if (client->connect(broker.c_str(), port)) { debug("cnx: connecting"); @@ -126,10 +134,16 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } +void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +{ + MqttBroker* broker = static_cast(broker_ptr); + + broker->addClient(new MqttClient(broker, client)); + debug("New client #" << broker->clients->size()); +} + void MqttBroker::loop() { - WiFiClient client = server.available(); - if (broker) { // TODO should monitor broker's activity. @@ -137,11 +151,6 @@ void MqttBroker::loop() broker->loop(); } - if (client) - { - addClient(new MqttClient(this, client)); - debug("New client (" << clients.size() << ')'); - } // for(auto it=clients.begin(); it!=clients.end(); it++) // use index because size can change during the loop @@ -168,6 +177,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) { return broker->subscribe(topic, qos); } + return MqttNowhereToSend; } MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const @@ -179,7 +189,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons for(auto client: clients) { i++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif @@ -200,7 +210,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons { doit = true; } -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << ", doit=" << doit << ' '; #endif @@ -250,22 +260,28 @@ void MqttClient::loop() { debug("pingreq"); uint16_t pingreq = MqttMessage::Type::PingReq; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); clientAlive(0); // TODO when many MqttClient passes through a local browser // there is no need to send one PingReq per instance. } } +} - while(client && client->available()>0) +void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) +{ + char* char_ptr = static_cast(data); + MqttClient* client=static_cast(client_ptr); + while(len>0) { - message.incoming(client->read()); - if (message.type()) + client->message.incoming(*char_ptr++); + if (client->message.type()) { - processMessage(&message); - message.reset(); + client->processMessage(&client->message); + client->message.reset(); } + len--; } } @@ -341,7 +357,7 @@ long MqttClient::counter=0; void MqttClient::processMessage(const MqttMessage* mesg) { counter++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; @@ -438,7 +454,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T if (client) { uint16_t pingreq = MqttMessage::Type::PingResp; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); bclose = false; } else @@ -470,7 +486,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T subscriptions.erase(it); } payload += len; - uint8_t qos = *payload++; + /* uint8_t qos =*/ *payload++; debug(" qos=" << qos); } debug("end loop"); @@ -488,7 +504,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T Topic published(payload, len); payload += len; // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len - // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; if (qos) payload+=2; // ignore packet identifier if any len=mesg->end()-payload; // TODO reset DUP diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 4c2c707..ee9692e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -6,11 +7,9 @@ #include "StringIndexer.h" #include -#if 0 +#ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } - #define TINY_MQTT_DEBUG 1 #else - #define TINY_MQTT_DEBUG 0 #define debug(what) {} #endif @@ -190,11 +189,12 @@ class MqttClient static long counter; private: + static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, WiFiClient& client); + MqttClient(MqttBroker* parent, AsyncClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -212,7 +212,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - WiFiClient* client=nullptr; // connection to mqtt client or to remote broker + AsyncClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -231,11 +231,9 @@ class MqttBroker MqttBroker(uint16_t port); ~MqttBroker(); - void begin() { server.begin(); } + void begin() { server->begin(); } void loop(); - uint16_t port() const { return server.port(); } - void connect(const std::string& host, uint16_t port=1883); bool connected() const { return state == Connected; } @@ -254,6 +252,7 @@ class MqttBroker private: friend class MqttClient; + static void onClient(void*, AsyncClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -271,7 +270,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - WiFiServer server; + AsyncServer* server; const char* auth_user = "guest"; const char* auth_password = "guest"; From 6711f30ad0c8199b6c202f93130beba5491924ef Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 14:03:36 +0200 Subject: [PATCH 05/12] AsyncTcp AsyncTcp --- src/TinyMqtt.cpp | 1 + src/TinyMqtt.h | 10 +--------- tests/Makefile | 2 +- tests/local-tests/Makefile | 2 +- tests/nowifi-tests/Makefile | 2 +- tests/string-indexer-tests/Makefile | 2 +- 6 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index da96ee1..74ca4b1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -672,6 +672,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength) { if (addLength) { + buffer.reserve(buffer.length()+addLength+2); incoming(len>>8); incoming(len & 0xFF); } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index ee9692e..d6dc2db 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -80,23 +80,15 @@ class MqttMessage // output buff+=2, len=length(str) static void getString(const char* &buff, uint16_t& len); - Type type() const { return state == Complete ? static_cast(buffer[0]) : Unknown; } - // shouldn't exist because it breaks constness :-( - // but this saves memory so ... - void changeType(Type type) const - { - buffer[0] = type; - } - void create(Type type) { buffer=(char)type; - buffer+='\0'; + buffer+='\0'; // reserved for msg length vheader=2; size=0; state=Create; diff --git a/tests/Makefile b/tests/Makefile index bfea400..e2d6bac 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -5,7 +5,7 @@ tests: $(MAKE) -C $$(dirname $$i) -j; \ done -runtests: +runtests: tests set -e; \ for i in *-tests/Makefile; do \ echo '==== Running:' $$(dirname $$i); \ diff --git a/tests/local-tests/Makefile b/tests/local-tests/Makefile index 8781b42..232112d 100644 --- a/tests/local-tests/Makefile +++ b/tests/local-tests/Makefile @@ -3,5 +3,5 @@ APP_NAME := local-tests ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock -ESP_LIBS = ESP8266WiFi +ESP_LIBS = ESP8266WiFi ESPAsyncTCP include ../../../EspMock/EspMock.mk diff --git a/tests/nowifi-tests/Makefile b/tests/nowifi-tests/Makefile index 3733f64..b5913b1 100644 --- a/tests/nowifi-tests/Makefile +++ b/tests/nowifi-tests/Makefile @@ -3,5 +3,5 @@ APP_NAME := nowifi-tests ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock -ESP_LIBS = ESP8266WiFi +ESP_LIBS = ESP8266WiFi ESPAsyncTCP include ../../../EspMock/EspMock.mk diff --git a/tests/string-indexer-tests/Makefile b/tests/string-indexer-tests/Makefile index acb264f..9673cca 100644 --- a/tests/string-indexer-tests/Makefile +++ b/tests/string-indexer-tests/Makefile @@ -3,5 +3,5 @@ APP_NAME := string-indexer-tests ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock -ESP_LIBS = ESP8266WiFi +ESP_LIBS = ESP8266WiFi ESPAsyncTCP include ../../../EspMock/EspMock.mk From 9c939a5667d973419d3fc6a2b9f1da0311088a30 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 16:50:14 +0200 Subject: [PATCH 06/12] Added mqDns to tinytest --- examples/tinymqtt-test/tinymqtt-test.ino | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino index 2f6f9d0..a3310f6 100644 --- a/examples/tinymqtt-test/tinymqtt-test.ino +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -1,10 +1,11 @@ -#define TINY_MQTT_DEBUG #include // https://github.com/hsaturn/TinyMqtt #include +#include + #include #include -/** +/** * Console allowing to make any kind of test. * * pros - Reduces internal latency (when publish is received by the same ESP) @@ -40,7 +41,7 @@ std::map brokers; void setup() { - WiFi.persistent(false); // https://github.com/esp8266/Arduino/issues/1054 + WiFi.persistent(false); // https://github.com/esp8266/Arduino/issues/1054 Serial.begin(115200); delay(500); Serial << endl << endl << endl @@ -55,6 +56,14 @@ void setup() Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Type help for more..." << endl; + const char* name="tinytest"; + Serial << "Starting MDNS, name= " << name; + if (!MDNS.begin(name)) + Serial << " error, not available." << endl; + else + Serial << " ok." << endl; + + MqttBroker* broker = new MqttBroker(1883); broker->begin(); brokers["broker"] = broker; @@ -344,6 +353,8 @@ void loop() } static long count; + MDNS.update(); + if (MqttClient::counter != count) { Serial << "# " << MqttClient::counter << endl; @@ -383,7 +394,7 @@ void loop() // client.function notation // ("a.fun " becomes "fun a ") - if (cmd.find('.') != std::string::npos && + if (cmd.find('.') != std::string::npos && cmd.find('.') < cmd.find(' ')) { s=getword(cmd, nullptr, '.'); From d96143f185a65632834ff2dca3f691728debd681 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 16:50:45 +0200 Subject: [PATCH 07/12] Fix warning --- src/StringIndexer.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/StringIndexer.h b/src/StringIndexer.h index 17d5ac1..5b6ebfe 100644 --- a/src/StringIndexer.h +++ b/src/StringIndexer.h @@ -28,7 +28,7 @@ class StringIndexer public: using index_t=uint8_t; - static const index_t strToIndex(const char* str, uint8_t len) + static index_t strToIndex(const char* str, uint8_t len) { for(auto it=strings.begin(); it!=strings.end(); it++) { From afc9370e3e9d77c363d45f05c07fbc07a9c18b2b Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 16:51:35 +0200 Subject: [PATCH 08/12] Fix compilation in DEBUG mode --- src/TinyMqtt.cpp | 2 +- src/TinyMqtt.h | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 74ca4b1..c2100f2 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -139,7 +139,7 @@ void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) MqttBroker* broker = static_cast(broker_ptr); broker->addClient(new MqttClient(broker, client)); - debug("New client #" << broker->clients->size()); + debug("New client #" << broker->clients.size()); } void MqttBroker::loop() diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index d6dc2db..5c4560a 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,5 +1,4 @@ #pragma once -#include #include #include #include @@ -7,6 +6,8 @@ #include "StringIndexer.h" #include +// #define TINY_MQTT_DEBUG + #ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #else From ad602194cf1f042d770c41b115654786c5752b26 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 16:51:56 +0200 Subject: [PATCH 09/12] Fix bug in unsubscription list --- src/TinyMqtt.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index c2100f2..67f08b6 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -469,25 +469,27 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T if (!mqtt_connected) break; payload = header+2; - debug("subscribe loop"); + debug("un/subscribe loop"); while(payload < mesg->end()) { mesg->getString(payload, len); // Topic debug( " topic (" << std::string(payload, len) << ')'); - outstring("Subscribes", payload, len); + outstring(" un/subscribes", payload, len); // subscribe(Topic(payload, len)); Topic topic(payload, len); + payload += len; if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe) + { + uint8_t qos = *payload++; + if (qos != 0) debug("Unsupported QOS" << qos << endl); subscriptions.insert(topic); + } else { auto it=subscriptions.find(topic); if (it != subscriptions.end()) subscriptions.erase(it); } - payload += len; - /* uint8_t qos =*/ *payload++; - debug(" qos=" << qos); } debug("end loop"); bclose = false; From aa0ed9a7a7ec1db6f8203528311cfe0ed76ed614 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 16:06:45 +0200 Subject: [PATCH 10/12] Bad merge fix --- src/TinyMqtt.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 5c4560a..6908bd8 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,5 +1,5 @@ #pragma once -#include +#include #include #include #include From d1c7ebe134db5c036dcdcc5891ae677c2490cab7 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 17:14:57 +0200 Subject: [PATCH 11/12] Added unsubscribe to tinytest --- examples/tinymqtt-test/tinymqtt-test.ino | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino index a3310f6..f0b8785 100644 --- a/examples/tinymqtt-test/tinymqtt-test.ino +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -138,7 +138,7 @@ std::set commands = { "auto", "broker", "blink", "client", "connect", "create", "delete", "help", "interval", "ls", "ip", "off", "on", "set", - "publish", "reset", "subscribe", "view" + "publish", "reset", "subscribe", "unsubscribe", "view" }; void getCommand(std::string& search) @@ -492,6 +492,10 @@ void loop() { client->subscribe(getword(cmd, topic.c_str())); } + else if (compare(s, "unsubscribe")) + { + client->unsubscribe(getword(cmd, topic.c_str())); + } else if (compare(s, "view")) { client->dump(); @@ -622,7 +626,7 @@ void loop() Serial << " MqttClient:" << endl; Serial << " client {name} {parent broker} : create a client then" << endl; Serial << " name.connect [ip] [port] [alive]" << endl; - Serial << " name.subscribe [topic]" << endl; + Serial << " name.[un]subscribe [topic]" << endl; Serial << " name.publish [topic][payload]" << endl; Serial << " name.view" << endl; Serial << " name.delete" << endl; From fe3f8d7b3285de0162483aaf4354b65484032d8f Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 13:42:43 +0200 Subject: [PATCH 12/12] Very promising async commit --- src/TinyMqtt.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 6908bd8..63ed8ee 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,5 +1,6 @@ #pragma once #include +#include #include #include #include