From 297a22efb5fab2324a4e2d75e2315f2dfb854783 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 4 Apr 2021 05:57:48 +0200 Subject: [PATCH] Big rewrite of MqttClient in order to avoid code duplicate --- src/TinyMqtt.cpp | 164 ++++++++++++++++++---------- src/TinyMqtt.h | 25 +++-- tests/local-tests/local-tests.ino | 5 + tests/nowifi-tests/nowifi-tests.ino | 32 +++++- 4 files changed, 158 insertions(+), 68 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 8f100d3..b24d1ef 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -98,6 +98,13 @@ void MqttBroker::addClient(MqttClient* client) clients.push_back(client); } +void MqttBroker::connect(const std::string& host, uint16_t port) +{ + if (broker == nullptr) broker = new MqttClient; + broker->connect(host, port); + broker->parent = this; // Because connect removed the link +} + void MqttBroker::removeClient(MqttClient* remove) { for(auto it=clients.begin(); it!=clients.end(); it++) @@ -105,6 +112,11 @@ void MqttBroker::removeClient(MqttClient* remove) auto client=*it; if (client==remove) { + // TODO if this broker is connected to an external broker + // we have to unsubscribe remove's topics. + // (but doing this, check that other clients are not subscribed...) + // Unless -> we could receive useless messages + // -> we are using (memory) one IndexedString plus its string for nothing. debug("Remove " << clients.size()); clients.erase(it); debug("Client removed " << clients.size()); @@ -118,6 +130,13 @@ void MqttBroker::loop() { WiFiClient client = server.available(); + if (broker) + { + // TODO should monitor broker's activity. + // 1 When broker disconnect and reconnect we have to re-subscribe + broker->loop(); + } + if (client) { addClient(new MqttClient(this, client)); @@ -143,7 +162,15 @@ void MqttBroker::loop() } } -MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) +MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) +{ + if (broker && broker->connected()) + { + return broker->subscribe(topic, qos); + } +} + +MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const { MqttError retval = MqttOk; @@ -157,28 +184,27 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif bool doit = false; - if (broker && broker->connected()) // Broker is connected + if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker) { - // ext broker -> clients or - // or clients -> ext broker - if (source == broker) // broker -> clients + // ext_broker -> clients or clients -> ext_broker + if (source == broker) // external broker -> internal clients doit = true; - else // clients -> broker + else // external clients -> this broker { - MqttError ret = broker->publish(topic, msg); + // As this broker is connected to another broker, simply forward the msg + MqttError ret = broker->publishIfSubscribed(topic, msg); if (ret != MqttOk) retval = ret; } } - else // Disconnected: R7 + else // Disconnected { - // All is allowed doit = true; } #if TINY_MQTT_DEBUG Serial << ", doit=" << doit << ' '; #endif - if (doit) retval = client->publish(topic, msg); + if (doit) retval = client->publishIfSubscribed(topic, msg); debug(""); } return retval; @@ -237,7 +263,8 @@ void MqttClient::loop() message.incoming(client->read()); if (message.type()) { - processMessage(); + processMessage(&message); + message.reset(); } } } @@ -273,6 +300,10 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos) { return sendTopic(topic, MqttMessage::Type::Subscribe, qos); } + else + { + return parent->subscribe(topic, qos); + } return ret; } @@ -307,22 +338,22 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint long MqttClient::counter=0; -void MqttClient::processMessage() +void MqttClient::processMessage(const MqttMessage* mesg) { counter++; #if TINY_MQTT_DEBUG -if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) +if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { - Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; - // message.hexdump("Incoming"); + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; + // mesg->hexdump("Incoming"); } #endif - auto header = message.getVHeader(); + auto header = mesg->getVHeader(); const char* payload; uint16_t len; bool bclose=true; - switch(message.type() & 0XF0) + switch(mesg->type() & 0XF0) { case MqttMessage::Type::Connect: if (mqtt_connected) @@ -345,30 +376,30 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag } // ClientId - message.getString(payload, len); + mesg->getString(payload, len); clientId = std::string(payload, len); payload += len; if (mqtt_flags & FlagWill) // Will topic { - message.getString(payload, len); // Will Topic + mesg->getString(payload, len); // Will Topic outstring("WillTopic", payload, len); payload += len; - message.getString(payload, len); // Will Message + mesg->getString(payload, len); // Will Message outstring("WillMessage", payload, len); payload += len; } // FIXME forgetting credential is allowed (security hole) if (mqtt_flags & FlagUserName) { - message.getString(payload, len); + mesg->getString(payload, len); if (!parent->checkUser(payload, len)) break; payload += len; } if (mqtt_flags & FlagPassword) { - message.getString(payload, len); + mesg->getString(payload, len); if (!parent->checkPassword(payload, len)) break; payload += len; } @@ -423,14 +454,14 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag payload = header+2; debug("subscribe loop"); - while(payload < message.end()) + while(payload < mesg->end()) { - message.getString(payload, len); // Topic + mesg->getString(payload, len); // Topic debug( " topic (" << std::string(payload, len) << ')'); outstring("Subscribes", payload, len); // subscribe(Topic(payload, len)); Topic topic(payload, len); - if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe) + if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe) subscriptions.insert(topic); else { @@ -449,37 +480,43 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag break; case MqttMessage::Type::Publish: - if (!mqtt_connected) break; + if (mqtt_connected or client == nullptr) { - uint8_t qos = message.type() & 0x6; + uint8_t qos = mesg->type() & 0x6; payload = header; - message.getString(payload, len); + mesg->getString(payload, len); Topic published(payload, len); payload += len; - len=message.end()-payload; // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len - // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl; + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; if (qos) payload+=2; // ignore packet identifier if any + len=mesg->end()-payload; // TODO reset DUP // TODO reset RETAIN - if (parent) + + if (client==nullptr) // internal MqttClient receives publish + { + if (callback and isSubscribedTo(published)) + { + callback(this, published, payload, len); // TODO send the real payload + + mesg->changeType(MqttMessage::Type::PubAck); // TODO constness design but saves memory & speed + // TODO re-add packet identifier if any + mesg->sendTo(this); + mesg->changeType(MqttMessage::Type::Publish); // mesg is const (...) + } + } + else if (parent) // from outside to inside { debug("publishing to parent"); - parent->publish(this, published, message); + parent->publish(this, published, *mesg); } - else if (callback && subscriptions.find(published)!=subscriptions.end()) - { - callback(this, published, nullptr, 0); // TODO send the real payload - } - message.create(MqttMessage::Type::PubAck); - // TODO re-add packet identifier if any - message.sendTo(this); bclose = false; } break; case MqttMessage::Type::Disconnect: - // TODO should discard any will message + // TODO should discard any will msg if (!mqtt_connected) break; mqtt_connected = false; close(false); @@ -492,8 +529,9 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag }; if (bclose) { - Serial << "*************** Error msg 0x" << _HEX(message.type()); - message.hexdump("-------ERROR ------"); + Serial << "*************** Error msg 0x" << _HEX(mesg->type()); + mesg->hexdump("-------ERROR ------"); + dump(); Serial << endl; close(); } @@ -501,7 +539,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag { clientAlive(parent ? 5 : 0); } - message.reset(); } bool Topic::matches(const Topic& topic) const @@ -517,8 +554,11 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa MqttMessage msg(MqttMessage::Publish); msg.add(topic); msg.add(payload, pay_length, false); + msg.complete(); if (parent) + { return parent->publish(this, topic, msg); + } else if (client) return msg.sendTo(this); else @@ -526,29 +566,33 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa } // republish a received publish if it matches any in subscriptions -MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg) +MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg) { MqttError retval=MqttOk; debug("mqttclient publish " << subscriptions.size()); - for(const auto& subscription: subscriptions) + if (isSubscribedTo(topic)) { - if (subscription.matches(topic)) + if (client) + retval = msg.sendTo(this); + else { - debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' '); - if (client) - { - retval = msg.sendTo(this); - } - else if (callback) - { - callback(this, topic, nullptr, 0); // TODO Payload - } + processMessage(&msg); + // callback(this, topic, nullptr, 0); // TODO Payload } } return retval; } +bool MqttClient::isSubscribedTo(const Topic& topic) const +{ + for(const auto& subscription: subscriptions) + if (subscription.matches(topic)) + return true; + + return false; +} + void MqttMessage::reset() { buffer.clear(); @@ -623,7 +667,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength) while(len--) incoming(*p++); } -void MqttMessage::encodeLength(char* msb, int length) +void MqttMessage::encodeLength(char* msb, int length) const { do { @@ -634,7 +678,13 @@ void MqttMessage::encodeLength(char* msb, int length) } while (length); }; -MqttError MqttMessage::sendTo(MqttClient* client) +void MqttMessage::complete() +{ + encodeLength(&buffer[1], buffer.size()-2); + state = Complete; +} + +MqttError MqttMessage::sendTo(MqttClient* client) const { if (buffer.size()) { diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index a1a6885..6119bae 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -72,6 +72,7 @@ class MqttMessage const char* end() const { return &buffer[0]+buffer.size(); } const char* getVHeader() const { return &buffer[vheader]; } uint16_t length() const { return buffer.size(); } + void complete(); void reset(); @@ -85,6 +86,13 @@ class MqttMessage return state == Complete ? static_cast(buffer[0]) : Unknown; } + // shouldn't exist because it breaks constness :-( + // but this saves memory so ... + void changeType(Type type) const + { + buffer[0] = type; + } + void create(Type type) { buffer=(char)type; @@ -93,13 +101,13 @@ class MqttMessage size=0; state=Create; } - MqttError sendTo(MqttClient*); + MqttError sendTo(MqttClient*) const; void hexdump(const char* prefix=nullptr) const; private: - void encodeLength(char* msb, int length); + void encodeLength(char* msb, int length) const; - std::string buffer; + mutable std::string buffer; // mutable -> sendTo() uint8_t vheader; uint16_t size; // bytes left to receive State state; @@ -152,6 +160,7 @@ class MqttClient MqttError subscribe(Topic topic, uint8_t qos=0); MqttError unsubscribe(Topic topic); + bool isSubscribedTo(const Topic& topic) const; // connected to local broker // TODO seems to be useless @@ -186,10 +195,10 @@ class MqttClient friend class MqttBroker; MqttClient(MqttBroker* parent, WiFiClient& client); // republish a received publish if topic matches any in subscriptions - MqttError publish(const Topic& topic, MqttMessage& msg); + MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); void clientAlive(uint32_t more_seconds); - void processMessage(); + void processMessage(const MqttMessage* message); bool mqtt_connected = false; char mqtt_flags; @@ -226,7 +235,7 @@ class MqttBroker uint16_t port() const { return server.port(); } - void connect(std::string host, uint16_t port=1883); + void connect(const std::string& host, uint16_t port=1883); bool connected() const { return state == Connected; } size_t clientsCount() const { return clients.size(); } @@ -251,7 +260,9 @@ class MqttBroker { return compareString(auth_password, password, len); } - MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); + MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const; + + MqttError subscribe(const Topic& topic, uint8_t qos); // For clients that are added not by the broker itself void addClient(MqttClient* client); diff --git a/tests/local-tests/local-tests.ino b/tests/local-tests/local-tests.ino index 43dec9b..73883f4 100644 --- a/tests/local-tests/local-tests.ino +++ b/tests/local-tests/local-tests.ino @@ -17,10 +17,15 @@ MqttBroker broker(1883); std::map> published; // map[client_id] => map[topic] = count +const char* lastPayload; +size_t lastLength; + void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) { if (srce) published[srce->id()][topic]++; + lastPayload = payload; + lastLength = length; } test(local_client_should_unregister_when_destroyed) diff --git a/tests/nowifi-tests/nowifi-tests.ino b/tests/nowifi-tests/nowifi-tests.ino index 406d777..3fd3b00 100644 --- a/tests/nowifi-tests/nowifi-tests.ino +++ b/tests/nowifi-tests/nowifi-tests.ino @@ -6,7 +6,7 @@ * TinyMqtt nowifi unit tests. * * No wifi connection unit tests. - * Checks with a local broker. Clients must connect to the local client + * Checks with a local broker. Clients must connect to the local broker **/ using namespace std; @@ -15,10 +15,15 @@ MqttBroker broker(1883); std::map> published; // map[client_id] => map[topic] = count +const char* lastPayload; +size_t lastLength; + void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) { if (srce) published[srce->id()][topic]++; + lastPayload = payload; + lastLength = length; } test(nowifi_client_should_unregister_when_destroyed) @@ -60,7 +65,7 @@ test(nowifi_publish_should_be_dispatched) assertEqual(published[""]["a/c"], 2); } -test(nowifi_publish_should_be_dispatched_to_nowifi_clients) +test(nowifi_publish_should_be_dispatched_to_clients) { published.clear(); assertEqual(broker.clientsCount(), (size_t)0); @@ -75,8 +80,8 @@ test(nowifi_publish_should_be_dispatched_to_nowifi_clients) subscriber_b.subscribe("a/b"); MqttClient publisher(&broker); - publisher.publish("a/b"); - publisher.publish("a/c"); + publisher.publish("a/b"); // A and B should receive this + publisher.publish("a/c"); // A should receive this assertEqual(published.size(), (size_t)2); // 2 clients have received something assertEqual(published["A"]["a/b"], 1); @@ -124,6 +129,25 @@ test(nowifi_nocallback_when_destroyed) assertEqual(published.size(), (size_t)1); // Only one publish has been received } +test(nowifi_payload_nullptr) +{ + return; // FIXME + published.clear(); + + const char* payload="abcd"; + + MqttClient subscriber(&broker); + subscriber.setCallback(onPublish); + subscriber.subscribe("a/b"); + + MqttClient publisher(&broker); + publisher.publish("a/b", payload, strlen(payload)); // This publish is received + + // coming from MqttClient::publish(...) + assertEqual(payload, lastPayload); + assertEqual(lastLength, (size_t)4); +} + //---------------------------------------------------------------------------- // setup() and loop() void setup() {