From 2e92a98db21270642260e3ae72f1f025d42ff0bd Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 15:51:33 +0200 Subject: [PATCH 1/6] Trying to fuse togeter Async and not async version --- src/TinyMqtt.cpp | 29 ++++++++++++++++++++++++----- src/TinyMqtt.h | 28 +++++++++++++++++++--------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 40ddef4..3b51ddc 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) MqttBroker::MqttBroker(uint16_t port) { - server = new AsyncServer(port); + server = new TcpServer(port); +#ifdef TCP_ASYNC server->onClient(onClient, this); +#endif } MqttBroker::~MqttBroker() @@ -25,12 +27,14 @@ MqttBroker::~MqttBroker() } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) +MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client) : parent(parent), client(new_client) { +#ifdef TCP_ASYNC client->onData(onData, this); // client->onConnect() TODO // client->onDisconnect() TODO +#endif alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -75,11 +79,16 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) keep_alive = ka; close(); if (client) delete client; - client = new AsyncClient; + client = new TcpClient; client->onData(onData, this); client->onConnect(onConnect, this); debug("Trying to connect to " << broker.c_str() << ':' << port); - client->connect(broker.c_str(), port); + if (client->connect(broker.c_str(), port)) + { + #ifndef TCP_ASYNC + onConnect(this, client); + #endif + } } void MqttBroker::addClient(MqttClient* client) @@ -115,7 +124,7 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } -void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +void MqttBroker::onClient(void* broker_ptr, TcpClient* client) { MqttBroker* broker = static_cast(broker_ptr); @@ -125,6 +134,14 @@ void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) void MqttBroker::loop() { +#ifndef TCP_ASYNC + WiFiClient client = server.available(); + + if (client) + { + onClient(this, &client); + } +#endif if (broker) { // TODO should monitor broker's activity. @@ -270,6 +287,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) mqtt->clientAlive(0); } +#ifdef TCP_ASYNC void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) { char* char_ptr = static_cast(data); @@ -285,6 +303,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) len--; } } +#endif void MqttClient::resubscribe() { diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index c5d5964..7cf31d3 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -16,13 +16,22 @@ #include // #define TINY_MQTT_DEBUG - #ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #else #define debug(what) {} #endif +#define TCP_ASYNC + +#ifdef TCP_ASYNC + using TcpClient = AsyncClient; + using TcpServer = AsyncServer; +#else + using TcpClient = WiFiClient; + using TcpServer = WiFiServer; +#endif + enum MqttError { MqttOk = 0, @@ -57,7 +66,7 @@ class MqttMessage Subscribe = 0x80, SubAck = 0x90, UnSubscribe = 0xA0, - UnSuback = 0xB0, + UnSuback = 0xB0, PingReq = 0xC0, PingResp = 0xD0, Disconnect = 0xE0 @@ -192,14 +201,15 @@ class MqttClient static long counter; private: - - static void onConnect(void * client_ptr, AsyncClient*); - static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); +#ifdef TCP_ASYNC + static void onConnect(void * client_ptr, TcpClient*); + static void onData(void* client_ptr, TcpClient*, void* data, size_t len); +#endif MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, AsyncClient* client); + MqttClient(MqttBroker* parent, TcpClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -217,7 +227,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - AsyncClient* client=nullptr; // connection to mqtt client or to remote broker + TcpClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -257,7 +267,7 @@ class MqttBroker private: friend class MqttClient; - static void onClient(void*, AsyncClient*); + static void onClient(void*, TcpClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -275,7 +285,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - AsyncServer* server; + TcpServer* server; const char* auth_user = "guest"; const char* auth_password = "guest"; From 24ee6b5201b4b0c5bebedaa15a3effc5c211a750 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 16:33:12 +0200 Subject: [PATCH 2/6] Fixes in WiFiClient mode --- src/TinyMqtt.cpp | 28 +++++++++++++++++----------- src/TinyMqtt.h | 16 ++++++++++++---- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 3b51ddc..7171f4a 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -28,12 +28,15 @@ MqttBroker::~MqttBroker() // private constructor used by broker only MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client) - : parent(parent), client(new_client) + : parent(parent) { #ifdef TCP_ASYNC + client = new_client; client->onData(onData, this); // client->onConnect() TODO // client->onDisconnect() TODO +#else + client = new WiFiClient(*new_client); #endif alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -80,15 +83,18 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) close(); if (client) delete client; client = new TcpClient; + + debug("Trying to connect to " << broker.c_str() << ':' << port); +#ifdef TCP_ASYNC client->onData(onData, this); client->onConnect(onConnect, this); - debug("Trying to connect to " << broker.c_str() << ':' << port); + client->connect(broker.c_str(), port); +#else if (client->connect(broker.c_str(), port)) - { - #ifndef TCP_ASYNC - onConnect(this, client); - #endif - } + { + onConnect(this, client); + } +#endif } void MqttBroker::addClient(MqttClient* client) @@ -129,13 +135,13 @@ void MqttBroker::onClient(void* broker_ptr, TcpClient* client) MqttBroker* broker = static_cast(broker_ptr); broker->addClient(new MqttClient(broker, client)); - debug("New client #" << broker->clients.size()); + debug("New client"); } void MqttBroker::loop() { #ifndef TCP_ASYNC - WiFiClient client = server.available(); + WiFiClient client = server->available(); if (client) { @@ -267,7 +273,7 @@ void MqttClient::loop() } } -void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) +void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) { MqttClient* mqtt = static_cast(mqttclient_ptr); debug("cnx: connecting"); @@ -288,7 +294,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) } #ifdef TCP_ASYNC -void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) +void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len) { char* char_ptr = static_cast(data); MqttClient* client=static_cast(client_ptr); diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 7cf31d3..006c01e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,6 +1,14 @@ #pragma once + +// TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC +// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx + #ifdef ESP8266 - #include + #ifdef TCP_ASYNC + #include + #else + #include + #endif #elif defined(ESP32) #include #include // https://github.com/me-no-dev/AsyncTCP @@ -16,14 +24,14 @@ #include // #define TINY_MQTT_DEBUG +#define TINY_MQTT_DEBUG + #ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #else #define debug(what) {} #endif -#define TCP_ASYNC - #ifdef TCP_ASYNC using TcpClient = AsyncClient; using TcpServer = AsyncServer; @@ -201,8 +209,8 @@ class MqttClient static long counter; private: -#ifdef TCP_ASYNC static void onConnect(void * client_ptr, TcpClient*); +#ifdef TCP_ASYNC static void onData(void* client_ptr, TcpClient*, void* data, size_t len); #endif MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); From b023cd67a9a440a4654d11eee6380580efc68c1f Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 17:02:24 +0200 Subject: [PATCH 3/6] Fix AUnit in debug mode / Not async --- src/TinyMqtt.cpp | 6 +++--- src/TinyMqtt.h | 21 +++++++++++++-------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 7171f4a..7a4f49d 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -167,7 +167,7 @@ void MqttBroker::loop() } else { - debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent); + debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent); // Note: deleting a client not added by the broker itself will probably crash later. delete client; break; @@ -288,7 +288,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) debug("cnx: mqtt connecting"); msg.sendTo(mqtt); msg.reset(); - debug("cnx: mqtt sent " << (int32_t)mqtt->parent); + debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent); mqtt->clientAlive(0); } @@ -386,7 +386,7 @@ void MqttClient::processMessage(const MqttMessage* mesg) #ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { - Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; // mesg->hexdump("Incoming"); } #endif diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 006c01e..ed96fb3 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -3,20 +3,26 @@ // TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC // #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx -#ifdef ESP8266 +#if defined(ESP8266) || defined(EPOXY_DUINO) #ifdef TCP_ASYNC #include - #else - #include + #else + #include #endif #elif defined(ESP32) - #include - #include // https://github.com/me-no-dev/AsyncTCP -#elif defined(EPOXY_DUINO) - #include + #ifdef TCP_ASYNC + #include // https://github.com/me-no-dev/AsyncTCP + #else + #include + #endif #else #error "Unsupported platform" #endif +#ifdef EPOXY_DUINO + #define dbg_ptr uint64_t +#else + #define dbg_ptr uint32_t +#endif #include #include #include @@ -24,7 +30,6 @@ #include // #define TINY_MQTT_DEBUG -#define TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } From 28b0ac16112865a20c35cd87f956d6ebec6133e1 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 21:21:48 +0200 Subject: [PATCH 4/6] Fix missing receive loop for mqttclient --- src/TinyMqtt.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 7a4f49d..f6fe9fe 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -271,6 +271,17 @@ void MqttClient::loop() // there is no need to send one PingReq per instance. } } +#ifndef TCP_ASYNC + while(client && client->available()>0) + { + message.incoming(client->read()); + if (message.type()) + { + processMessage(&message); + message.reset(); + } + } +#endif } void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) From 122ab8896049890f65f3dc475bff4754408742f6 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 23:26:49 +0200 Subject: [PATCH 5/6] Rewrite client-with-wifi.ino --- .../client-with-wifi/client-with-wifi.ino | 77 +++++++++++-------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/examples/client-with-wifi/client-with-wifi.ino b/examples/client-with-wifi/client-with-wifi.ino index d8c5ba7..1f6cce6 100644 --- a/examples/client-with-wifi/client-with-wifi.ino +++ b/examples/client-with-wifi/client-with-wifi.ino @@ -1,22 +1,31 @@ #include // https://github.com/hsaturn/TinyMqtt /** - * Local broker that accept connections and two local clients - * + * + * +-----------------------------+ + * | ESP | + * | +--------+ | 1883 <--- External client/s + * | +-------->| broker | | 1883 <--- External client/s + * | | +--------+ | + * | | ^ | + * | | | | + * | v v | + * | +----------+ +----------+ | + * | | internal | | internal | | + * | | client | | client | | + * | +----------+ +----------+ | + * | | + * +-----------------------------+ + * * pros - Reduces internal latency (when publish is received by the same ESP) * - Reduces wifi traffic * - No need to have an external broker - * - can still report to a 'main' broker (TODO see documentation that have to be written) - * - accepts external clients + * - can still report to a 'main' broker (TODO see documentation that have to be written) + * - accepts external clients * * cons - Takes more memory - * - a bit hard to understand - * - * This sounds crazy: a mqtt mqtt that do not need a broker ! - * The use case arise when one ESP wants to publish topics and subscribe to them at the same time. - * Without broker, the ESP won't react to its own topics. - * - * TinyMqtt mqtt allows this use case to work. + * - a bit hard to understand + * */ #include @@ -37,8 +46,8 @@ void onPublishB(const MqttClient* source, const Topic& topic, const char* payloa void setup() { Serial.begin(115200); - delay(500); - Serial << "Clients with wifi " << endl; + delay(500); + Serial << "Clients with wifi " << endl; WiFi.mode(WIFI_STA); WiFi.begin(ssid, password); @@ -47,8 +56,8 @@ void setup() Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; - broker.begin(); - + broker.begin(); + mqtt_a.setCallback(onPublishA); mqtt_a.subscribe(topic); @@ -58,30 +67,30 @@ void setup() void loop() { - broker.loop(); + broker.loop(); // Don't forget to add loop for every broker and clients mqtt_a.loop(); mqtt_b.loop(); // ============= client A publish ================ - static const int intervalA = 5000; // publishes every 5s - static uint32_t timerA = millis() + intervalA; - - if (millis() > timerA) - { - Serial << "A is publishing " << topic.c_str() << endl; - timerA += intervalA; - mqtt_a.publish(topic); - } + static const int intervalA = 5000; // publishes every 5s + static uint32_t timerA = millis() + intervalA; + + if (millis() > timerA) + { + Serial << "A is publishing " << topic.c_str() << endl; + timerA += intervalA; + mqtt_a.publish(topic); + } // ============= client B publish ================ - static const int intervalB = 7000; // will send topic each 7s - static uint32_t timerB = millis() + intervalB; - - if (millis() > timerB) - { - Serial << "B is publishing " << topic.c_str() << endl; - timerB += intervalB; - mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); - } + static const int intervalB = 7000; // will send topic each 7s + static uint32_t timerB = millis() + intervalB; + + if (millis() > timerB) + { + Serial << "B is publishing " << topic.c_str() << endl; + timerB += intervalB; + mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); + } } From 23f12077186488f24581f49dec91ab042d0ed720 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 23:27:15 +0200 Subject: [PATCH 6/6] Lot of new functions for tinytest - command every allowing to add peridic evaluations very usefull for benchmarks and load tests - on/off command --- examples/tinymqtt-test/tinymqtt-test.ino | 646 +++++++++++++---------- 1 file changed, 371 insertions(+), 275 deletions(-) diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino index c310fad..883b338 100644 --- a/examples/tinymqtt-test/tinymqtt-test.ino +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -138,7 +138,7 @@ std::set commands = { "auto", "broker", "blink", "client", "connect", "create", "delete", "help", "interval", "ls", "ip", "off", "on", "set", - "publish", "reset", "subscribe", "unsubscribe", "view" + "publish", "reset", "subscribe", "unsubscribe", "view", "every" }; void getCommand(std::string& search) @@ -316,22 +316,389 @@ std::map automatic::autos; bool compare(std::string s, const char* cmd) { - if (s.length()==0 or s.length()>strlen(cmd)) return false; - return strncmp(cmd, s.c_str(), s.length())==0; + uint8_t p=0; + while(s[p++]==*cmd++) + { + if (*cmd==0 or s[p]==0) return true; + if (s[p]==' ') return true; + } + return false; } using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); +struct Every +{ + std::string cmd; + uint32_t ms; + uint32_t next; + + void dump() + { + auto mill=millis(); + Serial << ms << "ms [" << cmd << "] next in "; + if (mill > next) + Serial << "now"; + else + Serial << next-mill << "ms"; + } +}; + uint32_t blink_ms_on[16]; uint32_t blink_ms_off[16]; uint32_t blink_next[16]; bool blink_state[16]; int16_t blink; + +std::vector everies; + +void eval(std::string& cmd) +{ + while(cmd.length()) + { + MqttError retval = MqttOk; + + std::string s; + MqttBroker* broker = nullptr; + MqttClient* client = nullptr; + + // client.function notation + // ("a.fun " becomes "fun a ") + if (cmd.find('.') != std::string::npos && + cmd.find('.') < cmd.find(' ')) + { + s=getword(cmd, nullptr, '.'); + + if (s.length()) + { + if (clients.find(s) != clients.end()) + { + client = clients[s]; + } + else if (brokers.find(s) != brokers.end()) + { + broker = brokers[s]; + } + else + { + Serial << "Unknown class (" << s.c_str() << ")" << endl; + cmd=""; + } + } + } + + s = getword(cmd); + if (s.length()) getCommand(s); + if (s.length()==0) + {} + else if (compare(s, "delete")) + { + if (client==nullptr && broker==nullptr) + { + s = getword(cmd); + if (clients.find(s) != clients.end()) + { + client = clients[s]; + } + else if (brokers.find(s) != brokers.end()) + { + broker = brokers[s]; + } + else + Serial << "Unable to find (" << s.c_str() << ")" << endl; + } + if (client) + { + for (auto it: clients) + { + if (it.second != client) continue; + Serial << "deleted" << endl; + delete (it.second); + clients.erase(it.first); + break; + } + cmd += " ls"; + } + else if (broker) + { + for(auto it: brokers) + { + if (broker != it.second) continue; + Serial << "deleted" << endl; + delete (it.second); + brokers.erase(it.first); + break; + } + cmd += " ls"; + } + else + Serial << "Nothing to delete" << endl; + } + else if (broker) + { + if (compare(s,"connect")) + { + Serial << "NYI" << endl; + } + else if (compare(s, "view")) + { + broker->dump(); + } + } + else if (client) + { + if (compare(s,"connect")) + { + client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); + Serial << (client->connected() ? "connected." : "not connected") << endl; + } + else if (compare(s,"publish")) + { + while (cmd[0]==' ') cmd.erase(0,1); + retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length()); + cmd=""; // remove payload + } + else if (compare(s,"subscribe")) + { + client->subscribe(getword(cmd, topic.c_str())); + } + else if (compare(s, "unsubscribe")) + { + client->unsubscribe(getword(cmd, topic.c_str())); + } + else if (compare(s, "view")) + { + client->dump(); + } + } + else if (compare(s, "on")) + { + uint8_t pin=getint(cmd, 2); + pinMode(pin, OUTPUT); + digitalWrite(pin, HIGH); + } + else if (compare(s, "off")) + { + uint8_t pin=getint(cmd, 2); + pinMode(pin, OUTPUT); + digitalWrite(pin, LOW); + } + else if (compare(s, "every")) + { + uint32_t ms = getint(cmd, 0); + if (ms and cmd.length()) + { + Every every; + every.ms=ms; + every.cmd=cmd; + every.next=millis()+ms; + everies.push_back(every); + every.dump(); + Serial << endl; + cmd=""; + } + else if (ms==0 and compare(cmd, "list")) + { + getword(cmd); + Serial << "List of everies (ms=" << millis() << ")" << endl; + uint8_t count=0; + for(auto& every: everies) + { + Serial << count << ": "; + every.dump(); + Serial << endl; + count++; + } + } + else if (ms==0 and compare(cmd, "remove")) + { + getword(cmd); + int8_t every=getint(cmd, -1); + if (every==-1 and compare(cmd, "all")) + { + getword(cmd); + everies.clear(); + } + else if (everies.size() > (uint8_t)every) + { + everies.erase(everies.begin()+every); + } + } + } + else if (compare(s, "blink")) + { + int8_t blink_nr = getint(cmd, -1); + if (blink_nr >= 0) + { + blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); + blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); + pinMode(blink_nr, OUTPUT); + blink_next[blink_nr] = millis(); + Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl; + if (blink_ms_on[blink_nr]) + blink |= 1<< blink_nr; + else + { + blink &= ~(1<< blink_nr); + } + } + } + else if (compare(s, "auto")) + { + automatic::command(client, cmd); + if (client == nullptr) + cmd.clear(); + } + else if (compare(s, "broker")) + { + std::string id=getword(cmd); + if (id.length() or brokers.find(id)!=brokers.end()) + { + int port=getint(cmd, 0); + if (port) + { + MqttBroker* broker = new MqttBroker(port); + broker->begin(); + + brokers[id] = broker; + Serial << "new broker (" << id.c_str() << ")" << endl; + } + else + Serial << "Missing port" << endl; + } + else + Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl; + cmd+=" ls"; + } + else if (compare(s, "client")) + { + std::string id=getword(cmd); + if (id.length() or clients.find(id)!=clients.end()) + { + s=getword(cmd); // broker name + if (s=="" or brokers.find(s) != brokers.end()) + { + MqttBroker* broker = nullptr; + if (s.length()) broker = brokers[s]; + MqttClient* client = new MqttClient(broker); + client->id(id); + clients[id]=client; + client->setCallback(onPublish); + client->subscribe(topic); + Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl; + } + else if (s.length()) + { + Serial << " not found." << endl; + } + } + else + Serial << "Missing or existing client name" << endl; + cmd+=" ls"; + } + else if (compare(s, "set")) + { + std::string name(getword(cmd)); + if (name.length()==0) + { + for(auto it: vars) + { + Serial << " " << it.first << " -> " << it.second << endl; + } + } + else if (commands.find(name) != commands.end()) + { + Serial << "Reserved keyword (" << name << ")" << endl; + cmd.clear(); + } + else + { + if (cmd.length()) + { + vars[name] = cmd; + cmd.clear(); + } + else if (vars.find(name) != vars.end()) + vars.erase(vars.find(name)); + } + } + else if (compare(s, "ls") or compare(s, "view")) + { + Serial << "--< " << clients.size() << " client/s. >--" << endl; + for(auto it: clients) + { + Serial << " "; it.second->dump(); + } + + Serial << "--< " << brokers.size() << " brokers/s. >--" << endl; + for(auto it: brokers) + { + Serial << " ==[ Broker: " << it.first.c_str() << " ]== "; + it.second->dump(); + } + } + else if (compare(s, "reset")) + ESP.restart(); + else if (compare(s, "ip")) + Serial << "IP: " << WiFi.localIP() << endl; + else if (compare(s,"help")) + { + Serial << "syntax:" << endl; + Serial << " MqttBroker:" << endl; + Serial << " broker {name} {port} : create a new broker" << endl; + Serial << endl; + Serial << " MqttClient:" << endl; + Serial << " client {name} {parent broker} : create a client then" << endl; + Serial << " name.connect [ip] [port] [alive]" << endl; + Serial << " name.[un]subscribe [topic]" << endl; + Serial << " name.publish [topic][payload]" << endl; + Serial << " name.view" << endl; + Serial << " name.delete" << endl; + + automatic::help(); + Serial << endl; + Serial << " help" << endl; + Serial << " blink [Dx on_ms off_ms]" << endl; + Serial << " ls / ip / reset" << endl; + Serial << " set [name][value]" << endl; + Serial << " ! repeat last command" << endl; + Serial << endl; + Serial << " every ms [command]; every list; every remove [nr|all]" << endl; + Serial << " on {output}; off {output}" << endl; + Serial << " $id : name of the client." << endl; + Serial << " default topic is '" << topic.c_str() << "'" << endl; + Serial << endl; + } + else + { + while(s[0]==' ') s.erase(0,1); + if (s.length()) + Serial << "Unknown command (" << s.c_str() << ")" << endl; + } + + if (retval != MqttOk) + { + Serial << "## ERROR " << retval << endl; + } + } +} + void loop() { auto ms=millis(); int8_t out=0; int16_t blink_bits = blink; + + for(auto& every: everies) + { + if (every.ms && every.cmd.length() && ms > every.next) + { + std::string cmd(every.cmd); + eval(cmd); + every.next += every.ms; + } + } + while(blink_bits) { if (blink_ms_on[out] and ms > blink_next[out]) @@ -375,7 +742,6 @@ void loop() if (c==10 or c==14) { - Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl; static std::string last_cmd; if (cmd=="!") @@ -384,277 +750,7 @@ void loop() last_cmd=cmd; if (cmd.substr(0,3)!="set") replaceVars(cmd); - while(cmd.length()) - { - MqttError retval = MqttOk; - - std::string s; - MqttBroker* broker = nullptr; - MqttClient* client = nullptr; - - // client.function notation - // ("a.fun " becomes "fun a ") - if (cmd.find('.') != std::string::npos && - cmd.find('.') < cmd.find(' ')) - { - s=getword(cmd, nullptr, '.'); - - if (s.length()) - { - if (clients.find(s) != clients.end()) - { - client = clients[s]; - } - else if (brokers.find(s) != brokers.end()) - { - broker = brokers[s]; - } - else - { - Serial << "Unknown class (" << s.c_str() << ")" << endl; - cmd=""; - } - } - } - - s = getword(cmd); - if (s.length()) getCommand(s); - if (s.length()==0) - {} - else if (compare(s, "delete")) - { - if (client==nullptr && broker==nullptr) - { - s = getword(cmd); - if (clients.find(s) != clients.end()) - { - client = clients[s]; - } - else if (brokers.find(s) != brokers.end()) - { - broker = brokers[s]; - } - else - Serial << "Unable to find (" << s.c_str() << ")" << endl; - } - if (client) - { - for (auto it: clients) - { - if (it.second != client) continue; - Serial << "deleted" << endl; - delete (it.second); - clients.erase(it.first); - break; - } - cmd += " ls"; - } - else if (broker) - { - for(auto it: brokers) - { - if (broker != it.second) continue; - Serial << "deleted" << endl; - delete (it.second); - brokers.erase(it.first); - break; - } - cmd += " ls"; - } - else - Serial << "Nothing to delete" << endl; - } - else if (broker) - { - if (compare(s,"connect")) - { - Serial << "NYI" << endl; - } - else if (compare(s, "view")) - { - broker->dump(); - } - } - else if (client) - { - if (compare(s,"connect")) - { - client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); - Serial << (client->connected() ? "connected." : "not connected") << endl; - } - else if (compare(s,"publish")) - { - while (cmd[0]==' ') cmd.erase(0,1); - retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length()); - cmd=""; // remove payload - } - else if (compare(s,"subscribe")) - { - client->subscribe(getword(cmd, topic.c_str())); - } - else if (compare(s, "unsubscribe")) - { - client->unsubscribe(getword(cmd, topic.c_str())); - } - else if (compare(s, "view")) - { - client->dump(); - } - } - else if (compare(s, "blink")) - { - int8_t blink_nr = getint(cmd, -1); - if (blink_nr >= 0) - { - blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); - blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); - pinMode(blink_nr, OUTPUT); - blink_next[blink_nr] = millis(); - Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl; - if (blink_ms_on[blink_nr]) - blink |= 1<< blink_nr; - else - { - blink &= ~(1<< blink_nr); - } - } - } - else if (compare(s, "auto")) - { - automatic::command(client, cmd); - if (client == nullptr) - cmd.clear(); - } - else if (compare(s, "broker")) - { - std::string id=getword(cmd); - if (id.length() or brokers.find(id)!=brokers.end()) - { - int port=getint(cmd, 0); - if (port) - { - MqttBroker* broker = new MqttBroker(port); - broker->begin(); - - brokers[id] = broker; - Serial << "new broker (" << id.c_str() << ")" << endl; - } - else - Serial << "Missing port" << endl; - } - else - Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl; - cmd+=" ls"; - } - else if (compare(s, "client")) - { - std::string id=getword(cmd); - if (id.length() or clients.find(id)!=clients.end()) - { - s=getword(cmd); // broker name - if (s=="" or brokers.find(s) != brokers.end()) - { - MqttBroker* broker = nullptr; - if (s.length()) broker = brokers[s]; - MqttClient* client = new MqttClient(broker); - client->id(id); - clients[id]=client; - client->setCallback(onPublish); - client->subscribe(topic); - Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl; - } - else if (s.length()) - { - Serial << " not found." << endl; - } - } - else - Serial << "Missing or existing client name" << endl; - cmd+=" ls"; - } - else if (compare(s, "set")) - { - std::string name(getword(cmd)); - if (name.length()==0) - { - for(auto it: vars) - { - Serial << " " << it.first << " -> " << it.second << endl; - } - } - else if (commands.find(name) != commands.end()) - { - Serial << "Reserved keyword (" << name << ")" << endl; - cmd.clear(); - } - else - { - if (cmd.length()) - { - vars[name] = cmd; - cmd.clear(); - } - else if (vars.find(name) != vars.end()) - vars.erase(vars.find(name)); - } - } - else if (compare(s, "ls") or compare(s, "view")) - { - Serial << "--< " << clients.size() << " client/s. >--" << endl; - for(auto it: clients) - { - Serial << " "; it.second->dump(); - } - - Serial << "--< " << brokers.size() << " brokers/s. >--" << endl; - for(auto it: brokers) - { - Serial << " ==[ Broker: " << it.first.c_str() << " ]== "; - it.second->dump(); - } - } - else if (compare(s, "reset")) - ESP.restart(); - else if (compare(s, "ip")) - Serial << "IP: " << WiFi.localIP() << endl; - else if (compare(s,"help")) - { - Serial << "syntax:" << endl; - Serial << " MqttBroker:" << endl; - Serial << " broker {name} {port} : create a new broker" << endl; - Serial << endl; - Serial << " MqttClient:" << endl; - Serial << " client {name} {parent broker} : create a client then" << endl; - Serial << " name.connect [ip] [port] [alive]" << endl; - Serial << " name.[un]subscribe [topic]" << endl; - Serial << " name.publish [topic][payload]" << endl; - Serial << " name.view" << endl; - Serial << " name.delete" << endl; - - automatic::help(); - Serial << endl; - Serial << " help" << endl; - Serial << " blink [Dx on_ms off_ms]" << endl; - Serial << " ls / ip / reset" << endl; - Serial << " set [name][value]" << endl; - Serial << " ! repeat last command" << endl; - Serial << endl; - Serial << " $id : name of the client." << endl; - Serial << " default topic is '" << topic.c_str() << "'" << endl; - Serial << endl; - } - else - { - while(s[0]==' ') s.erase(0,1); - if (s.length()) - Serial << "Unknown command (" << s.c_str() << ")" << endl; - } - - if (retval != MqttOk) - { - Serial << "## ERROR " << retval << endl; - } - } + eval(cmd); } else {