From 7b20e7deb57490d3f19af12bab9f95e922947752 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Tue, 23 Mar 2021 23:51:33 +0100 Subject: [PATCH] Supports multiple subscriptions --- README.md | 4 +- examples/simple-broker/simple-broker.ino | 1 - examples/tinymqtt-test/tinymqtt-test.ino | 13 +++-- src/StringIndexer.h | 1 - src/TinyMqtt.cpp | 70 +++++++++++++++++++----- src/TinyMqtt.h | 13 +++-- 6 files changed, 73 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 2639d0c..a758135 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,9 @@ ESP 8266 is a small and very capable Mqtt Broker and Client * Add a max_clients in MqttBroker. Used with zeroconf, there will be no need for having tons of clients (also RAM is the problem with many clients) * Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. -* MqttMessage uses a buffer 256 bytes which is usually far than needed. -* MqttClient auto reconnection +* ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~ * MqttClient auto re-subscribe +* MqttClient auto reconnection * MqttClient does not callback payload... * MqttClient user/password diff --git a/examples/simple-broker/simple-broker.ino b/examples/simple-broker/simple-broker.ino index ae6cbd9..8636ea8 100644 --- a/examples/simple-broker/simple-broker.ino +++ b/examples/simple-broker/simple-broker.ino @@ -1,5 +1,4 @@ #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt -#include // https://github.com/janelia-arduino/Streaming #include diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino index 7e5c060..689fe4c 100644 --- a/examples/tinymqtt-test/tinymqtt-test.ino +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -225,6 +225,8 @@ void loop() last_cmd=cmd; while(cmd.length()) { + MqttError retval = MqttOk; + std::string s; MqttBroker* broker = nullptr; MqttClient* client = nullptr; @@ -314,11 +316,7 @@ void loop() } else if (compare(s,"publish")) { - auto ok=client->publish(getword(cmd, topic.c_str())); - if (ok != MqttOk) - { - Serial << "## ERROR " << ok << endl; - } + retval = client->publish(getword(cmd, topic.c_str())); } else if (compare(s,"subscribe")) { @@ -431,6 +429,11 @@ void loop() if (s.length()) Serial << "Unknown command (" << s.c_str() << ")" << endl; } + + if (retval != MqttOk) + { + Serial << "## ERROR " << retval << endl; + } } } else diff --git a/src/StringIndexer.h b/src/StringIndexer.h index 1f97636..be93827 100644 --- a/src/StringIndexer.h +++ b/src/StringIndexer.h @@ -2,7 +2,6 @@ #include #include #include -// #include #include /*** diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 8e56729..404b3a6 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -146,16 +146,19 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt { i++; Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << - " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected(); + " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; bool doit = false; - if (broker && broker->connected()) // Connected: R2 R3 R5 R6 + if (broker && broker->connected()) // Broker is connected { // ext broker -> clients or // or clients -> ext broker if (source == broker) // broker -> clients doit = true; else // clients -> broker - retval=broker->publish(topic, msg); + { + MqttError ret = broker->publish(topic, msg); + if (ret != MqttOk) retval = ret; + } } else // Disconnected: R7 { @@ -164,7 +167,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt } Serial << ", doit=" << doit << ' '; - if (doit) client->publish(topic, msg); + if (doit) retval = client->publish(topic, msg); debug(""); } return retval; @@ -227,6 +230,29 @@ void MqttClient::loop() } } +MqttError MqttClient::subscribe(Topic topic, uint8_t qos) +{ + debug("subsribe(" << topic.c_str() << ")"); + MqttError ret = MqttOk; + + subscriptions.insert(topic); + + if (parent==nullptr) // remote broker ? + { + debug("remote subscribe"); + MqttMessage msg(MqttMessage::Type::Subscribe, 2); + + // TODO Qos > 0 needs a packet identifier + + msg.add(topic.str()); + msg.add(qos); + ret = msg.sendTo(this); + + // TODO we should wait (state machine) for SUBACK + } + return ret; +} + void MqttClient::processMessage() { std::string error; @@ -327,14 +353,26 @@ void MqttClient::processMessage() break; case MqttMessage::Type::Subscribe: - if (!mqtt_connected) break; - payload = header+2; - message.getString(payload, len); // Topic - outstring("Subscribes", payload, len); - - subscribe(Topic(payload, len)); - bclose = false; - // TODO SUBACK + { + if (!mqtt_connected) break; + payload = header+2; + + debug("subscribe loop"); + while(payload < message.end()) + { + message.getString(payload, len); // Topic + debug( " topic (" << std::string(payload, len) << ')'); + outstring("Subscribes", payload, len); + // subscribe(Topic(payload, len)); + subscriptions.insert(Topic(payload, len)); + payload += len; + uint8_t qos = *payload++; + debug(" qos=" << qos); + } + debug("end loop"); + bclose = false; + // TODO SUBACK + } break; case MqttMessage::Type::Publish: @@ -398,7 +436,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa if (parent) return parent->publish(this, topic, msg); else if (client) - msg.sendTo(this); + return msg.sendTo(this); else return MqttNowhereToSend; } @@ -417,7 +455,7 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg) Serial << " match/send"; if (client) { - msg.sendTo(this); + retval = msg.sendTo(this); } else if (callback) { @@ -506,7 +544,7 @@ void MqttMessage::encodeLength(char* msb, int length) } while (length); }; -void MqttMessage::sendTo(MqttClient* client) +MqttError MqttMessage::sendTo(MqttClient* client) { if (buffer.size()>2) { @@ -517,7 +555,9 @@ void MqttMessage::sendTo(MqttClient* client) else { Serial << "??? Invalid send" << endl; + return MqttInvalidMessage; } + return MqttOk; } void MqttMessage::hexdump(const char* prefix) const diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 190d1d1..3d8aad4 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -3,9 +3,11 @@ #include #include #include "StringIndexer.h" +#include + +#define TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG - #include #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #else #define debug(what) {} @@ -15,6 +17,7 @@ enum MqttError { MqttOk = 0, MqttNowhereToSend=1, + MqttInvalidMessage=2, }; class Topic : public IndexedString @@ -57,7 +60,7 @@ class MqttMessage }; MqttMessage() { reset(); } - MqttMessage(Type t) { create(t); } + MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; } void incoming(char byte); void add(char byte) { incoming(byte); } void add(const char* p, size_t len, bool addLength=true ); @@ -87,7 +90,7 @@ class MqttMessage size=0; state=Create; } - void sendTo(MqttClient*); + MqttError sendTo(MqttClient*); void hexdump(const char* prefix=nullptr) const; private: @@ -141,8 +144,8 @@ class MqttClient MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; - void subscribe(Topic topic) { subscriptions.insert(topic); } - void unsubscribe(Topic& topic); + MqttError subscribe(Topic topic, uint8_t qos=0); + MqttError unsubscribe(Topic& topic); // connected to local broker // TODO seems to be useless