From 6b9d764c2309c640c8d0fcca0c52d25193e358a5 Mon Sep 17 00:00:00 2001 From: Francois BIOT Date: Mon, 17 Apr 2023 02:40:15 +0200 Subject: [PATCH] [Retain] Fix bug in last retained msg and add retain commands to tinytests --- examples/tinymqtt-test/tinymqtt-test.ino | 39 ++++++++++++++++++++---- src/TinyMqtt.cpp | 9 ++++-- src/TinyMqtt.h | 5 ++- 3 files changed, 44 insertions(+), 9 deletions(-) diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino index 97b9537..2ad9a76 100644 --- a/examples/tinymqtt-test/tinymqtt-test.ino +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -448,6 +448,10 @@ void eval(string &cmd) Console << " " << fb.first << " : " << fb.second.url << ":" << fb.second.port << endl; } } + else if (compare(s, "free")) + { + Console << "Free memory: " << ESP.getFreeHeap() << endl; + } else if (compare(s, "delete")) { if (client == nullptr && broker == nullptr) @@ -497,7 +501,22 @@ void eval(string &cmd) { if (compare(s, "connect")) { - Console << "NYI" << endl; + string remote = getword(cmd); + int port = getint(cmd); + if (port == 0) port=1883; + broker->connect(remote, port); + if (broker->connected()) + Console << "Broker connected"; + else + Console << red << "Unable to connect"; + + Console << " to " << remote << ':' << port << white << endl; + } + else if (compare(s, "retain")) + { + if (cmd.size()) + broker->retain(getint(cmd)); + Console << "retain=" << broker->retain() << ", retained msg=" << broker->retainCount() << endl; } else if (compare(s, "view")) { @@ -515,6 +534,10 @@ void eval(string &cmd) { clientConnect(client, cmd); } + else if (compare(s, "rpublish")) + { + retval = client->publish(getword(cmd), getword(cmd), true); + } else if (compare(s, "publish")) { retval = client->publish(getword(cmd), getword(cmd)); @@ -760,7 +783,8 @@ void eval(string &cmd) Console << save_cursor << magenta; Console.gotoxy(1, 1); } - Console << "--< " << '/' << clients.size() << " client/s. >--" << erase_to_end << endl; + Console << "--< " << '/' << clients.size() << " client/s. >--" << erase_to_end; + Console << " (FreeMem: " << ESP.getFreeHeap() << ')' << endl; for (auto it : clients) { it.second->dump(" "); @@ -786,19 +810,22 @@ void eval(string &cmd) { Console << "syntax:" << endl; Console << " MqttBroker:" << endl; - Console << " broker {broker_name} {port} : create a new broker" << endl; - Console << " broker_name can be one of 'list'" << endl; - Console << " broker_name.delete : delete a broker (buggy)" << endl; - Console << " broker_name.view : dump a broker" << endl; + Console << " broker {name} {port} : create a new broker" << endl; + Console << " name can be one of 'list'" << endl; + Console << " name.delete : delete a broker (buggy)" << endl; + Console << " name.retain [#] : show/set retain value" << endl; + Console << " name.view : dump a broker" << endl; Console << endl; Console << " MqttClient:" << endl; Console << " client {name} {broker} : create a client then" << endl; Console << " name.connect [ip] [port] [alive]" << endl; Console << " name.[un]subscribe topic" << endl; Console << " name.publish topic [payload]" << endl; + Console << " name.rpublish topic [payload] : publish a retained message" << endl; Console << " name.view" << endl; Console << " name.delete" << endl; Console << endl; + Console << " free : view free mem" << endl; Console << " list : list of free brokers (debug 1 advised)" << endl; Console << " debug #" << endl; Console << " list : get list of free brokers" << endl; diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 2f71c55..d13fd28 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -957,14 +957,19 @@ void MqttBroker::retainDrop() void MqttBroker::retain(const Topic& topic, const MqttMessage& msg) { - debug("MqttBroker::retain msg_type=" << _HEX(msg.type())); + debug("MqttBroker::retain msg_type=" << _HEX(msg.type()) << ", retain_size=" << retain_size); if (retain_size==0 or msg.type() != MqttMessage::Publish) return; if (msg.flags() & 1) // flag RETAIN { debug(" retaining " << topic.str()); - if (retained.find(topic) == retained.end()) retainDrop(); + auto old = retained.find(topic); + if (old == retained.end()) + retainDrop(); + else + retained.erase(old); // FIXME if payload size == 0 remove message from retained Retain r(micros(), msg); + r.msg.retained(); retained.insert({ topic, std::move(r)}); } } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 6fb99a7..f6bff7b 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -133,6 +133,7 @@ class MqttMessage const char* end() const { return &buffer[0]+buffer.size(); } const char* getVHeader() const { return &buffer[vheader]; } void complete() { encodeLength(); } + void retained() { if ((buffer[0] & 0xF)==Publish) buffer[0] |= 1; } void reset(); @@ -346,7 +347,9 @@ class MqttBroker bool connected() const { return remote_broker ? remote_broker->connected() : false; } size_t clientsCount() const { return clients.size(); } - void retain(uint8_t size) { retain_size = size; } + uint8_t retain() { return retain_size; } + void retain(uint8_t size) { retain_size = size; if (size==0) retained.clear(); } + uint8_t retainCount() const { return retained.size(); } void dump(string indent="") {