From e90076d010249f5a8fb34514d49dce828596886f Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 14:03:36 +0200 Subject: [PATCH 1/4] AsyncTcp --- src/TinyMqtt.cpp | 1 + src/TinyMqtt.h | 10 +--------- tests/Makefile | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index da96ee1..74ca4b1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -672,6 +672,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength) { if (addLength) { + buffer.reserve(buffer.length()+addLength+2); incoming(len>>8); incoming(len & 0xFF); } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index ee9692e..d6dc2db 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -80,23 +80,15 @@ class MqttMessage // output buff+=2, len=length(str) static void getString(const char* &buff, uint16_t& len); - Type type() const { return state == Complete ? static_cast(buffer[0]) : Unknown; } - // shouldn't exist because it breaks constness :-( - // but this saves memory so ... - void changeType(Type type) const - { - buffer[0] = type; - } - void create(Type type) { buffer=(char)type; - buffer+='\0'; + buffer+='\0'; // reserved for msg length vheader=2; size=0; state=Create; diff --git a/tests/Makefile b/tests/Makefile index bfea400..e2d6bac 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -5,7 +5,7 @@ tests: $(MAKE) -C $$(dirname $$i) -j; \ done -runtests: +runtests: tests set -e; \ for i in *-tests/Makefile; do \ echo '==== Running:' $$(dirname $$i); \ From 67a296eb287b81106eb1058145fed382dc1c5416 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 15:29:17 +0200 Subject: [PATCH 2/4] Fix too many things in StringIndexer test --- src/StringIndexer.h | 1 - src/TinyMqtt.h | 3 ++- .../string-indexer-tests.ino | 18 ++---------------- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/src/StringIndexer.h b/src/StringIndexer.h index 8d26de9..17d5ac1 100644 --- a/src/StringIndexer.h +++ b/src/StringIndexer.h @@ -2,7 +2,6 @@ #include #include #include -#include /*** * Allows to store up to 255 different strings with one byte class diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index f2e52ca..4c2c707 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,4 +1,5 @@ -#include +#pragma once +#include #include #include #include diff --git a/tests/string-indexer-tests/string-indexer-tests.ino b/tests/string-indexer-tests/string-indexer-tests.ino index 86b8df0..4168946 100644 --- a/tests/string-indexer-tests/string-indexer-tests.ino +++ b/tests/string-indexer-tests/string-indexer-tests.ino @@ -1,28 +1,14 @@ #include -#include +#include #include /** - * TinyMqtt local unit tests. + * TinyMqtt / StringIndexer unit tests. * - * Clients are connected to pseudo remote broker - * The remote will be 127.0.0.1:1883 - * We are using 127.0.0.1 because this is simpler to test with a single ESP - * Also, this will allow to mock and thus run Action on github **/ using namespace std; -MqttBroker broker(1883); - -std::map> published; // map[client_id] => map[topic] = count - -void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) -{ - if (srce) - published[srce->id()][topic]++; -} - test(indexer_empty) { assertEqual(StringIndexer::count(), 0); From 3e8d34e4e7feec16b7be0e97ffadf8a834235325 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 13:42:43 +0200 Subject: [PATCH 3/4] Very promising async commit Very promising async commit --- src/TinyMqtt.cpp | 66 ++++++++++++++++++++++++++++++------------------ src/TinyMqtt.h | 17 ++++++------- 2 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 24fb984..da96ee1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -9,8 +9,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) Serial << '\'' << endl; } -MqttBroker::MqttBroker(uint16_t port) : server(port) +MqttBroker::MqttBroker(uint16_t port) { + server = new AsyncServer(port); + server->onClient(onClient, this); } MqttBroker::~MqttBroker() @@ -19,14 +21,16 @@ MqttBroker::~MqttBroker() { delete clients[0]; } - server.close(); + delete server; } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) - : parent(parent) +MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) + : parent(parent), client(new_client) { - client = new WiFiClient(new_client); + client->onData(onData, this); + // client->onConnect() TODO + // client->onDisconnect() TODO alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -70,8 +74,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) debug("cnx: closing"); close(); if (client) delete client; - client = new WiFiClient; + client = new AsyncClient; debug("Trying to connect to " << broker.c_str() << ':' << port); + // TODO This may return immediately !!! + // TODO so I have to add onConnect and move this code to onConnect + // TODO also, as this is async now, I must take care of + // TODO the broker that may disconnect and delete the client immediately if (client->connect(broker.c_str(), port)) { debug("cnx: connecting"); @@ -126,10 +134,16 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } +void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +{ + MqttBroker* broker = static_cast(broker_ptr); + + broker->addClient(new MqttClient(broker, client)); + debug("New client #" << broker->clients->size()); +} + void MqttBroker::loop() { - WiFiClient client = server.available(); - if (broker) { // TODO should monitor broker's activity. @@ -137,11 +151,6 @@ void MqttBroker::loop() broker->loop(); } - if (client) - { - addClient(new MqttClient(this, client)); - debug("New client (" << clients.size() << ')'); - } // for(auto it=clients.begin(); it!=clients.end(); it++) // use index because size can change during the loop @@ -168,6 +177,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) { return broker->subscribe(topic, qos); } + return MqttNowhereToSend; } MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const @@ -179,7 +189,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons for(auto client: clients) { i++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif @@ -200,7 +210,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons { doit = true; } -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG Serial << ", doit=" << doit << ' '; #endif @@ -250,22 +260,28 @@ void MqttClient::loop() { debug("pingreq"); uint16_t pingreq = MqttMessage::Type::PingReq; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); clientAlive(0); // TODO when many MqttClient passes through a local browser // there is no need to send one PingReq per instance. } } +} - while(client && client->available()>0) +void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) +{ + char* char_ptr = static_cast(data); + MqttClient* client=static_cast(client_ptr); + while(len>0) { - message.incoming(client->read()); - if (message.type()) + client->message.incoming(*char_ptr++); + if (client->message.type()) { - processMessage(&message); - message.reset(); + client->processMessage(&client->message); + client->message.reset(); } + len--; } } @@ -341,7 +357,7 @@ long MqttClient::counter=0; void MqttClient::processMessage(const MqttMessage* mesg) { counter++; -#if TINY_MQTT_DEBUG +#ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; @@ -438,7 +454,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T if (client) { uint16_t pingreq = MqttMessage::Type::PingResp; - client->write((uint8_t*)(&pingreq), 2); + client->write((const char*)(&pingreq), 2); bclose = false; } else @@ -470,7 +486,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T subscriptions.erase(it); } payload += len; - uint8_t qos = *payload++; + /* uint8_t qos =*/ *payload++; debug(" qos=" << qos); } debug("end loop"); @@ -488,7 +504,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T Topic published(payload, len); payload += len; // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len - // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; if (qos) payload+=2; // ignore packet identifier if any len=mesg->end()-payload; // TODO reset DUP diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 4c2c707..ee9692e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -6,11 +7,9 @@ #include "StringIndexer.h" #include -#if 0 +#ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } - #define TINY_MQTT_DEBUG 1 #else - #define TINY_MQTT_DEBUG 0 #define debug(what) {} #endif @@ -190,11 +189,12 @@ class MqttClient static long counter; private: + static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, WiFiClient& client); + MqttClient(MqttBroker* parent, AsyncClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -212,7 +212,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - WiFiClient* client=nullptr; // connection to mqtt client or to remote broker + AsyncClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -231,11 +231,9 @@ class MqttBroker MqttBroker(uint16_t port); ~MqttBroker(); - void begin() { server.begin(); } + void begin() { server->begin(); } void loop(); - uint16_t port() const { return server.port(); } - void connect(const std::string& host, uint16_t port=1883); bool connected() const { return state == Connected; } @@ -254,6 +252,7 @@ class MqttBroker private: friend class MqttClient; + static void onClient(void*, AsyncClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -271,7 +270,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - WiFiServer server; + AsyncServer* server; const char* auth_user = "guest"; const char* auth_password = "guest"; From 6711f30ad0c8199b6c202f93130beba5491924ef Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sat, 10 Apr 2021 14:03:36 +0200 Subject: [PATCH 4/4] AsyncTcp AsyncTcp --- src/TinyMqtt.cpp | 1 + src/TinyMqtt.h | 10 +--------- tests/Makefile | 2 +- tests/local-tests/Makefile | 2 +- tests/nowifi-tests/Makefile | 2 +- tests/string-indexer-tests/Makefile | 2 +- 6 files changed, 6 insertions(+), 13 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index da96ee1..74ca4b1 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -672,6 +672,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength) { if (addLength) { + buffer.reserve(buffer.length()+addLength+2); incoming(len>>8); incoming(len & 0xFF); } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index ee9692e..d6dc2db 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -80,23 +80,15 @@ class MqttMessage // output buff+=2, len=length(str) static void getString(const char* &buff, uint16_t& len); - Type type() const { return state == Complete ? static_cast(buffer[0]) : Unknown; } - // shouldn't exist because it breaks constness :-( - // but this saves memory so ... - void changeType(Type type) const - { - buffer[0] = type; - } - void create(Type type) { buffer=(char)type; - buffer+='\0'; + buffer+='\0'; // reserved for msg length vheader=2; size=0; state=Create; diff --git a/tests/Makefile b/tests/Makefile index bfea400..e2d6bac 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -5,7 +5,7 @@ tests: $(MAKE) -C $$(dirname $$i) -j; \ done -runtests: +runtests: tests set -e; \ for i in *-tests/Makefile; do \ echo '==== Running:' $$(dirname $$i); \ diff --git a/tests/local-tests/Makefile b/tests/local-tests/Makefile index 8781b42..232112d 100644 --- a/tests/local-tests/Makefile +++ b/tests/local-tests/Makefile @@ -3,5 +3,5 @@ APP_NAME := local-tests ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock -ESP_LIBS = ESP8266WiFi +ESP_LIBS = ESP8266WiFi ESPAsyncTCP include ../../../EspMock/EspMock.mk diff --git a/tests/nowifi-tests/Makefile b/tests/nowifi-tests/Makefile index 3733f64..b5913b1 100644 --- a/tests/nowifi-tests/Makefile +++ b/tests/nowifi-tests/Makefile @@ -3,5 +3,5 @@ APP_NAME := nowifi-tests ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock -ESP_LIBS = ESP8266WiFi +ESP_LIBS = ESP8266WiFi ESPAsyncTCP include ../../../EspMock/EspMock.mk diff --git a/tests/string-indexer-tests/Makefile b/tests/string-indexer-tests/Makefile index acb264f..9673cca 100644 --- a/tests/string-indexer-tests/Makefile +++ b/tests/string-indexer-tests/Makefile @@ -3,5 +3,5 @@ APP_NAME := string-indexer-tests ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock -ESP_LIBS = ESP8266WiFi +ESP_LIBS = ESP8266WiFi ESPAsyncTCP include ../../../EspMock/EspMock.mk