From 428eb5185094057333eeacb9104ed6aac2db5405 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 21 Mar 2021 13:50:42 +0100 Subject: [PATCH] Release 0.3.0 clients can now connect to outside. bug fixed for broker (pings etc.) crashes fixed when clients where removed More examples added (the tinymqtt-test is great) --- README.md | 10 + .../client-with-wifi/client-with-wifi.ino | 90 +++++ examples/simple-broker/simple-broker.ino | 4 +- examples/simple-client/simple-client.ino | 30 +- examples/tinymqtt-test/tinymqtt-test.ino | 326 ++++++++++++++++++ src/TinyMqtt.cpp | 178 +++++++--- src/TinyMqtt.h | 65 +++- 7 files changed, 632 insertions(+), 71 deletions(-) create mode 100644 examples/client-with-wifi/client-with-wifi.ino create mode 100644 examples/tinymqtt-test/tinymqtt-test.ino diff --git a/README.md b/README.md index 4690d4d..f88a8c5 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,14 @@ ESP 8266 is a small and very capable Mqtt Broker and Client - zeroconf, this is a strange but very powerful mode where all brokers tries to connect together on the same local network. +## TODO List +* Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP) +* Implement zeroconf mode (needs async) +* Add a max_clients in MqttBroker. Used with zeroconf, there will be +no need for having tons of clients (also RAM is the problem with many clients) +* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. +* MqttMessage uses a buffer 256 bytes which is usually far than needed. + ## Quickstart * install [Streaming library](https://github.com/janelia-arduino/Streaming) @@ -39,6 +47,8 @@ ESP 8266 is a small and very capable Mqtt Broker and Client ## Standalone mode (zeroconf) -> The zeroconf mode is not yet implemented +zerofonf clients to connect to broker on local network. + In Zeroconf mode, each ESP is a a broker and scans the local network. After a while one ESP naturally becomes a 'master' and all ESP are connected together. No problem if the master dies, a new master will be choosen soon. diff --git a/examples/client-with-wifi/client-with-wifi.ino b/examples/client-with-wifi/client-with-wifi.ino new file mode 100644 index 0000000..ccadadb --- /dev/null +++ b/examples/client-with-wifi/client-with-wifi.ino @@ -0,0 +1,90 @@ +#include // https://github.com/hsaturn/TinyMqtt +#include // https://github.com/janelia-arduino/Streaming + +// TODO should be renamed to most-complete setup + +/** + * Local broker that accept connections + * + * pros - Reduces internal latency (when publish is received by the same ESP) + * - Reduces wifi traffic + * - No need to have an external broker + * - can still report to a 'main' broker (TODO see documentation that have to be written) + * - accepts external clients + * + * cons - Takes more memory + * - a bit hard to understand + * + * This sounds crazy: a mqtt mqtt that do not need a broker ! + * The use case arise when one ESP wants to publish topics and subscribe to them at the same time. + * Without broker, the ESP won't react to its own topics. + * + * TinyMqtt mqtt allows this use case to work. + */ + +#include + +std::string topic="sensor/temperature"; + +MqttBroker broker(1883); + +MqttClient mqtt_a(&broker); +MqttClient mqtt_b(&broker); + +void onPublishA(const Topic& topic, const char* payload, size_t length) +{ Serial << "--> A Received " << topic.c_str() << endl; } + +void onPublishB(const Topic& topic, const char* payload, size_t length) +{ Serial << "--> B Received " << topic.c_str() << endl; } + +void setup() +{ + Serial.begin(115200); + delay(500); + Serial << "Clients with wifi " << endl; + + WiFi.mode(WIFI_STA); + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) { Serial << '-'; delay(500); } + + Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; + + broker.begin(); + + mqtt_a.setCallback(onPublishA); + mqtt_a.subscribe(topic); + + mqtt_b.setCallback(onPublishB); + mqtt_b.subscribe(topic); +} + +void loop() +{ + broker.loop(); + + mqtt_a.loop(); + mqtt_b.loop(); + + // ============= client A publish ================ + static const int intervalA = 50000; + static uint32_t timerA = millis() + intervalA; + + if (millis() > timerA) + { + Serial << "A is publishing " << topic.c_str() << endl; + timerA += intervalA; + mqtt_a.publish(topic); + } + + // ============= client B publish ================ + static const int intervalB = 30000; // will send topic each 5000 ms + static uint32_t timerB = millis() + intervalB; + + if (millis() > timerB) + { + Serial << "B is publishing " << topic.c_str() << endl; + timerB += intervalB; + mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); + } +} diff --git a/examples/simple-broker/simple-broker.ino b/examples/simple-broker/simple-broker.ino index 254fd73..ae6cbd9 100644 --- a/examples/simple-broker/simple-broker.ino +++ b/examples/simple-broker/simple-broker.ino @@ -1,9 +1,7 @@ -#include #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include // https://github.com/janelia-arduino/Streaming -const char *ssid = ; // Put here your wifi SSID ("ssid") -const char *password = ; // Put here your Wifi password ("pwd") +#include #define PORT 1883 MqttBroker broker(PORT); diff --git a/examples/simple-client/simple-client.ino b/examples/simple-client/simple-client.ino index 6f0fe64..484ae3e 100644 --- a/examples/simple-client/simple-client.ino +++ b/examples/simple-client/simple-client.ino @@ -2,31 +2,35 @@ #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include // https://github.com/janelia-arduino/Streaming -const char *ssid = ; // Put here your wifi SSID ("ssid") -const char *password = ; // Put here your Wifi password ("pwd") +/** Simple Client + * + * This is the simplest Mqtt client configuration + * + * pro - small memory footprint (both ram and flash) + * - very simple to setup and use + * + * cons - cannot work without wifi connection + * - stop working if broker is down + * - local publishes takes more time (because they go outside) + */ -#define PORT 1883 -MqttBroker broker(PORT); +#include void setup() { Serial.begin(115200); + delay(500); + Serial << "Simple clients with wifi " << endl; WiFi.mode(WIFI_STA); WiFi.begin(ssid, password); - while (WiFi.status() != WL_CONNECTED) { - delay(500); - Serial << '.'; - delay(500); - } - Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; + while (WiFi.status() != WL_CONNECTED) + { delay(500); Serial << '.'; } - broker.begin(); - Serial << "Broker ready : " << WiFi.localIP() << " on port " << PORT << endl; + Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; } void loop() { - broker.loop(); } diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino new file mode 100644 index 0000000..1b94763 --- /dev/null +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -0,0 +1,326 @@ +#include // https://github.com/hsaturn/TinyMqtt +#include // https://github.com/janelia-arduino/Streaming +#include + +/** + * Local broker that accept connections + * + * pros - Reduces internal latency (when publish is received by the same ESP) + * - Reduces wifi traffic + * - No need to have an external broker + * - can still report to a 'main' broker (TODO see documentation that have to be written) + * - accepts external clients + * + * cons - Takes more memory + * - a bit hard to understand + * + * This sounds crazy: a mqtt mqtt that do not need a broker ! + * The use case arise when one ESP wants to publish topics and subscribe to them at the same time. + * Without broker, the ESP won't react to its own topics. + * + * TinyMqtt mqtt allows this use case to work. + */ + +#include + +std::string topic="sensor/temperature"; + +MqttBroker broker(1883); + +void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) +{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; } + +void setup() +{ + Serial.begin(115200); + delay(500); + Serial << endl << endl << endl + << "Demo started. Type help for more..." << endl + << "Connecting to '" << ssid << "' "; + + WiFi.mode(WIFI_STA); + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) + { Serial << '-'; delay(500); } + + Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; + + broker.begin(); +} + +std::string getword(std::string& str, const char* if_empty=nullptr) +{ + std::string sword; + while(str.length() && str[0]!=' ') + { + sword += str[0]; str.erase(0,1); + } + while(str[0]==' ') str.erase(0,1); + if (if_empty and sword.length()==0) return if_empty; + return sword; +} + +class automatic +{ + public: + automatic(MqttClient* clt, uint32_t intervl) + : client(clt), topic_(::topic) + { + interval(intervl); + autos[clt] = this; + } + + void interval(uint32_t new_interval) + { + interval_ = new_interval; + if (interval_<1000) interval_=1000; + timer_ = millis() + interval_; + } + + void loop_() + { + if (!bon) return; + if (interval_ && millis() > timer_) + { + Serial << "AUTO PUBLISH " << interval_ << endl; + timer_ += interval_; + client->publish(topic_, std::string(String(15+millis()%10).c_str())); + } + } + + void topic(std::string new_topic) { topic_ = new_topic; } + + static void loop() + { + for(auto it: autos) + it.second->loop_(); + } + + static void command(MqttClient* who, std::string cmd) + { + automatic* autop = nullptr; + if (autos.find(who) != autos.end()) + { + autop=autos[who]; + } + std::string s = getword(cmd); + if (compare(s, "create")) + { + std::string seconds=getword(cmd, "10000"); + if (autop) delete autop; + std::string top = getword(cmd, ::topic.c_str()); + autos[who] = new automatic(who, atol(seconds.c_str())); + autos[who]->topic(top); + autos[who]->bon=true; + Serial << "New auto (" << seconds.c_str() << " topic:" << top.c_str() << ')' << endl; + } + else if (autop) + { + while(s.length()) + { + if (s=="on") + { + autop->bon = true; + autop->interval(autop->interval_); + } + else if (s=="off") + autop->bon=false; + else if (s=="interval") + { + int32_t i=atol(getword(cmd).c_str()); + if (i) + autop->interval(atol(s.c_str())); + else + Serial << "Bad value" << endl; + } + else if (s=="view") + { + Serial << " automatic " + << (int32_t)autop->client + << " interval " << autop->interval_ + << (autop->bon ? " on" : " off") << endl; + } + else + { + Serial << "Unknown auto command (" << s.c_str() << ")" << endl; + break; + } + s=getword(cmd); + } + } + else if (who==nullptr) + { + for(auto it: autos) + command(it.first, s+' '+cmd); + } + else + Serial << "what ? (" << s.c_str() << ")" << endl; + } + + static void help() + { + Serial << " auto [$id] on/off" << endl; + Serial << " auto [$id] view" << endl; + Serial << " auto [$id] create [millis] [topic]" << endl; + } + + private: + MqttClient* client; + uint32_t interval_; + uint32_t timer_; + std::string topic_; + bool bon=false; + static std::map autos; +}; +std::map automatic::autos; + +bool compare(std::string s, const char* cmd) +{ + if (s.length()==0 or s.length()>strlen(cmd)) return false; + return strncmp(cmd, s.c_str(), s.length())==0; +} + +std::map clients; + +using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); + +void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false) +{ + std::string s=getword(cmd); + bool found = clients.find(s) != clients.end(); + + if (canBeNull && found==false) + { + cmd += ' ' + s; + } + + if (found or canBeNull) + { + MqttClient* publish = publish = clients[s]; + func(cmd, publish); + } + else + { + Serial << "client not found (" << s.c_str() << ")" << endl; + cmd=""; + } +} + +void loop() +{ + broker.loop(); + + for(auto it: clients) + it.second->loop(); + + automatic::loop(); + + if (Serial.available()) + { + static std::string cmd; + char c=Serial.read(); + + if (c==10 or c==14) + { + Serial << "------------------------------------------------------" << endl; + while(cmd.length()) + { + std::string s = getword(cmd); + if (compare(s,"connect")) + { + clientCommand(cmd, [](std::string& cmd, MqttClient* publish) + { publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); }); + } + else if (compare(s,"publish")) + { + clientCommand(cmd, [](std::string& cmd, MqttClient* publish) + { publish->publish(getword(cmd, topic.c_str())); }); + } + else if (compare(s,"subscribe")) + { + clientCommand(cmd, [](std::string& cmd, MqttClient* publish) + { publish->subscribe(getword(cmd, topic.c_str())); }); + } + else if (compare(s, "view")) + { + clientCommand(cmd, [](std::string& cmd, MqttClient* publish) + { publish->dump(); }); + } + else if (compare(s, "auto")) + { + clientCommand(cmd, [](std::string& cmd, MqttClient* publish) + { automatic::command(publish, cmd); + if (publish == nullptr) + cmd.clear(); + }, true); + } + else if (compare(s, "new")) + { + std::string id=getword(cmd); + if (id.length()) + { + MqttClient* client = new MqttClient(&broker); + client->id(id); + clients[id]=client; + client->setCallback(onPublish); + client->subscribe(topic); + } + else + Serial << "missing id" << endl; + cmd+=" ls"; + } + else if (compare(s, "delete")) + { + s = getword(cmd); + auto it=clients.find(s); + if (it != clients.end()) + { + delete it->second; + clients.erase(it); + cmd+=" ls"; + } + else + Serial << "Unknown client (" << s.c_str() << ")" << endl; + } + else if (compare(s, "ls")) + { + Serial << "main : " << clients.size() << " client/s." << endl; + for(auto it: clients) + { + Serial << " "; it.second->dump(); + } + broker.dump(); + } + else if (compare(s, "reset")) + ESP.restart(); + else if (compare(s,"help")) + { + Serial << "syntax:" << endl; + Serial << " new/delete $id" << endl; + Serial << " connect $id [ip]" << endl; + Serial << " subscribe $id [topic]" << endl; + Serial << " publish $id [topic]" << endl; + Serial << " view $id " << endl; + automatic::help(); + Serial << endl; + Serial << " help" << endl; + Serial << " ls" << endl; + Serial << " reset" << endl; + Serial << endl; + Serial << " $id : name of the client." << endl; + Serial << " default topic is '" << topic.c_str() << "'" << endl; + Serial << endl; + } + else + { + Serial << "Unknown command (" << s.c_str() << ")" << endl; + } + } + } + else + { + cmd=cmd+c; + } + } +} diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index e2f7a2d..1f59474 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -2,6 +2,12 @@ #include #include +#if 1 +#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } +#else +#define debug(what) {} +#endif + void outstring(const char* prefix, const char*p, uint16_t len) { return; @@ -10,14 +16,12 @@ void outstring(const char* prefix, const char*p, uint16_t len) Serial << '\'' << endl; } -MqttBroker::MqttBroker(uint16_t port) - : server(port) +MqttBroker::MqttBroker(uint16_t port) : server(port) { } MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) - : parent(parent), - mqtt_connected(false) + : parent(parent) { client = new_client ? new WiFiClient(new_client) : nullptr; alive = millis()+5000; // client expires after 5s if no CONNECT msg @@ -35,15 +39,48 @@ MqttClient::~MqttClient() { close(); delete client; - parent->removeClient(this); } void MqttClient::close() { + debug("close " << id().c_str()); + mqtt_connected = false; if (client) { client->stop(); } + + if (parent) + { + parent->removeClient(this); + parent=nullptr; + } +} + +void MqttClient::connect(std::string broker, uint16_t port) +{ + debug("cnx: closing"); + close(); + debug("cnx: closed"); + if (client) delete client; + client = new WiFiClient; + if (client->connect(broker.c_str(), port)) + { + debug("cnx: connecting"); + message.create(MqttMessage::Type::Connect); + message.add("MQTT",4); + message.add(0x4); // Mqtt protocol version 3.1.1 + message.add(0x0); // Connect flags TODO user / name + + keep_alive = 1; + message.add(0x00); // keep_alive + message.add((char)keep_alive); + message.add(clientId); + debug("cnx: mqtt connecting"); + message.sendTo(this); + debug("cnx: mqtt sent " << (int32_t)parent); + clientAlive(0); + } } void MqttBroker::addClient(MqttClient* client) @@ -58,11 +95,13 @@ void MqttBroker::removeClient(MqttClient* remove) auto client=*it; if (client==remove) { + debug("Remove " << clients.size()); clients.erase(it); + debug("Client removed " << clients.size()); return; } } - Serial << "Error cannot remove client" << endl; // TODO should not occur + debug("Error cannot remove client"); // TODO should not occur } void MqttBroker::loop() @@ -72,19 +111,21 @@ void MqttBroker::loop() if (client) { addClient(new MqttClient(this, client)); - Serial << "New client (" << clients.size() << ')' << endl; + debug("New client (" << clients.size() << ')'); } - for(auto it=clients.begin(); it!=clients.end(); it++) + // for(auto it=clients.begin(); it!=clients.end(); it++) + // use index because size can change during the loop + for(int i=0; iconnected()) { client->loop(); } else { - Serial << "Client " << client->id().c_str() << " Disconnected" << endl; + debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent); // Note: deleting a client not added by the broker itself will probably crash later. delete client; break; @@ -92,28 +133,34 @@ void MqttBroker::loop() } } -// Should be called for inside and outside incoming publishes (all) void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) { + debug("publish "); + int i=0; for(auto client: clients) { + i++; + Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << + " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected(); bool doit = false; if (broker && broker->connected()) // Connected: R2 R3 R5 R6 { - if (!client->isLocal()) // R2 go outside allowed - doit = true; - else // R3 any client to outside allowed + // ext broker -> clients or + // or clients -> ext broker + if (source == broker) // broker -> clients doit = true; + else // clients -> broker + broker->publish(topic, msg); } - else // Disconnected: R3 R4 R5 + else // Disconnected: R7 { - if (!source->isLocal()) // R3 - doit = true; - else if (client->isLocal()) // R4 local -> local - doit = true; + // All is allowed + doit = true; } + Serial << ", doit=" << doit << ' '; - if (doit) client->publish(topic, msg); // goes outside R2 + if (doit) client->publish(topic, msg); + debug(""); } } @@ -133,11 +180,11 @@ void MqttMessage::getString(char* &buffer, uint16_t& len) buffer+=2; } -void MqttClient::clientAlive() +void MqttClient::clientAlive(uint32_t more_seconds) { if (keep_alive) { - alive=millis()+1000*(keep_alive+5); + alive=millis()+1000*(keep_alive+more_seconds); } else alive=0; @@ -147,8 +194,20 @@ void MqttClient::loop() { if (alive && (millis() > alive)) { - Serial << "timeout client" << endl; - close(); + if (parent) + { + debug("timeout client"); + close(); + } + else + { + uint16_t pingreq = MqttMessage::Type::PingReq; + client->write((uint8_t*)(&pingreq), 2); + clientAlive(0); + + // TODO when many MqttClient passes through a local browser + // there is no need to send one PingReq per instance. + } } while(client && client->available()>0) @@ -174,19 +233,45 @@ void MqttClient::processMessage() switch(message.type() & 0XF0) { case MqttMessage::Type::Connect: - if (mqtt_connected) break; + if (mqtt_connected) + { + debug("already connected"); + break; + } payload = header+10; - flags = header[7]; + mqtt_flags = header[7]; keep_alive = (header[8]<<8)|(header[9]); - if (strncmp("MQTT", header+2,4)) break; - if (header[6]!=0x04) break; // Level 3.1.1 + if (strncmp("MQTT", header+2,4)) + { + debug("bad mqtt header"); + break; + } + if (header[6]!=0x04) + { + debug("unknown level"); + break; // Level 3.1.1 + } // ClientId message.getString(payload, len); + debug("client id len=" << len); + if (len>30) + { + Serial << '('; + for(int i=0; i<30; i++) + { + if (i%5==0) Serial << ' '; + char c=*(header+i); + Serial << (c < 32 ? '.' : c); + } + Serial << " )" << endl; + debug("Bad client id length"); + break; + } clientId = std::string(payload, len); payload += len; - if (flags & FlagWill) // Will topic + if (mqtt_flags & FlagWill) // Will topic { message.getString(payload, len); // Will Topic outstring("WillTopic", payload, len); @@ -196,13 +281,14 @@ void MqttClient::processMessage() outstring("WillMessage", payload, len); payload += len; } - if (flags & FlagUserName) + // FIXME forgetting credential is allowed (security hole) + if (mqtt_flags & FlagUserName) { message.getString(payload, len); if (!parent->checkUser(payload, len)) break; payload += len; } - if (flags & FlagPassword) + if (mqtt_flags & FlagPassword) { message.getString(payload, len); if (!parent->checkPassword(payload, len)) break; @@ -220,10 +306,17 @@ void MqttClient::processMessage() break; case MqttMessage::Type::PingReq: - message.create(MqttMessage::Type::PingResp); - message.add(0); - message.sendTo(this); - bclose = false; + if (!mqtt_connected) break; + if (client) + { + uint16_t pingreq = MqttMessage::Type::PingResp; + client->write((uint8_t*)(&pingreq), 2); + bclose = false; + } + else + { + debug("internal pingreq ?"); + } break; case MqttMessage::Type::Subscribe: @@ -251,6 +344,7 @@ void MqttClient::processMessage() if (qos) payload+=2; // ignore packet identifier if any // TODO reset DUP // TODO reset RETAIN + debug("publishing to parent"); parent->publish(this, published, message); // TODO should send PUBACK bclose = false; @@ -275,7 +369,7 @@ void MqttClient::processMessage() } else { - clientAlive(); + clientAlive(5); } message.reset(); } @@ -304,19 +398,23 @@ void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_len // republish a received publish if it matches any in subscriptions void MqttClient::publish(const Topic& topic, MqttMessage& msg) { + debug("mqttclient publish " << subscriptions.size()); for(const auto& subscription: subscriptions) { + Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' '; if (subscription.matches(topic)) { + Serial << " match/send"; if (client) { msg.sendTo(this); } else if (callback) { - callback(topic, nullptr, 0); // TODO + callback(this, topic, nullptr, 0); // TODO } } + Serial << endl; } } @@ -373,14 +471,16 @@ void MqttMessage::incoming(char in_byte) } if (curr-buffer > 250) { - Serial << "Too much incoming bytes." << endl; + debug("Spurious byte " << _HEX(in_byte)); curr=buffer; } } void MqttMessage::add(const char* p, size_t len) { - while(len--) incoming(*p); + incoming(len>>8); + incoming(len & 0xFF); + while(len--) incoming(*p++); } void MqttMessage::encodeLength(char* msb, int length) diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 2fef85c..f5a392e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -81,7 +81,7 @@ class MqttMessage private: void encodeLength(char* msb, int length); - char buffer[256]; // TODO 256 ? + char buffer[256]; // TODO why 256 ? (should be replaced by a std::string) char* vheader; char* curr; uint16_t size; // bytes left to receive @@ -91,14 +91,14 @@ class MqttMessage class MqttBroker; class MqttClient { - using CallBack = void (*)(const Topic& topic, const char* payload, size_t payload_length); + using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length); enum Flags { FlagUserName = 128, FlagPassword = 64, FlagWillRetain = 32, // unsupported FlagWillQos = 16 | 8, // unsupported - FlagWill = 4, // unsupported + FlagWill = 4, // unsupported FlagCleanSession = 2, // unsupported FlagReserved = 1 }; @@ -107,11 +107,15 @@ class MqttClient ~MqttClient(); + void connect(MqttBroker* parent); + void connect(std::string broker, uint16_t port); + bool connected() { return client==nullptr || client->connected(); } void write(const char* buf, size_t length) { if (client) client->write(buf, length); } const std::string& id() const { return clientId; } + void id(std::string& new_id) { clientId = new_id; } void loop(); void close(); @@ -125,7 +129,24 @@ class MqttClient void subscribe(Topic topic) { subscriptions.insert(topic); } void unsubscribe(Topic& topic); - bool isLocal() const { return client==nullptr; } + // connected to local broker + // TODO seems to be useless + bool isLocal() const { return client == nullptr; } + + void dump() + { + Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent + << " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); + Serial << " ["; + bool c=false; + for(auto s: subscriptions) + { + Serial << (c?", ": "")<< s.str().c_str(); + c=true; + } + Serial << "]" << endl; + } + private: friend class MqttBroker; @@ -133,19 +154,19 @@ class MqttClient // republish a received publish if topic matches any in subscriptions void publish(const Topic& topic, MqttMessage& msg); - void clientAlive(); + void clientAlive(uint32_t more_seconds); void processMessage(); - char flags; + bool mqtt_connected = false; + char mqtt_flags; uint32_t keep_alive; uint32_t alive; - bool mqtt_connected; - WiFiClient* client; // nullptr if this client is local MqttMessage message; - MqttBroker* parent; + MqttBroker* parent=nullptr; // connection to local broker + WiFiClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; - CallBack callback; + CallBack callback = nullptr; }; /*********************************************** @@ -155,16 +176,17 @@ class MqttClient * R4 - allows local publish to local clients * R5 - tries to connect elsewhere (*) * R6 - disconnect external clients + * R7 - allows all publish to go everywhere * --------------------------------------------- * (*) single client or ip range * --------------------------------------------- * - * =============================================+ - * | connected | not connected | - * -------------+---------------+---------------+ - * proxy broker | R2 R3 R5 R6 | R3 R4 R5 | - * normal broker| R2 R3 R5 R6 | R1 R3 R4 R5 | - * -------------+---------------+---------------+ + * ================================================+ + * | connected | not connected | + * -------------+---------------+------------------+ + * proxy broker | R2 R3 R5 R6 | R5 R7 | + * normal broker| R2 R3 R5 R6 | R1 R5 R7 | + * -------------+---------------+------------------+ * */ class MqttBroker @@ -176,6 +198,7 @@ class MqttBroker Connected, // this->broker is connected and circular cnx avoided }; public: + // TODO limit max number of clients MqttBroker(uint16_t port); void begin() { server.begin(); } @@ -186,6 +209,16 @@ class MqttBroker void connect(std::string host, uint32_t port=1883); bool connected() const { return state == Connected; } + void dump() + { + Serial << "broker: " << clients.size() << " client/s" << endl; + for(auto client: clients) + { + Serial << " "; + client->dump(); + } + } + private: friend class MqttClient;