diff --git a/src/StringIndexer.h b/src/StringIndexer.h index 41db7be..72c5237 100644 --- a/src/StringIndexer.h +++ b/src/StringIndexer.h @@ -107,6 +107,8 @@ class IndexedString index = source.index; } + IndexedString(IndexedString&& i) : index(i.index) {} + IndexedString(const char* str, uint8_t len) { index=StringIndexer::strToIndex(str, len); diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 91aeddf..8bf772d 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -15,8 +15,10 @@ int TinyMqtt::debug=2; std::map MqttClient::counters; #endif -MqttBroker::MqttBroker(uint16_t port) +MqttBroker::MqttBroker(uint16_t port, uint8_t max_retain_size) { + debug("New broker" << port); + retain_size = max_retain_size; server = new TcpServer(port); #ifdef TINY_MQTT_ASYNC server->onClient(onClient, this); @@ -211,9 +213,19 @@ void MqttBroker::loop() } } -MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) +// Obvioulsy called when the broker is connected to another broker. +MqttError MqttBroker::subscribe(MqttClient* client, const Topic& topic, uint8_t qos) { - debug("MqttBroker::subscribe"); + debug("MqttBroker::subscribe to " << topic.str() << ", retained=" << retained.size() ); + for(auto& [retained_topic, retain]: retained) + { + debug(" retained: " << retained_topic.str()); + if (topic.matches(retained_topic)) + { + debug(" -> sending"); + client->publishIfSubscribed(retained_topic, retain.msg); + } + } if (remote_broker && remote_broker->connected()) { return remote_broker->subscribe(topic, qos); @@ -221,10 +233,12 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) return MqttNowhereToSend; } -MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const +MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) { MqttError retval = MqttOk; + retain(topic, msg); + debug("MqttBroker::publish"); int i=0; for(auto client: clients) @@ -389,15 +403,15 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos) debug("MqttClient::subsribe(" << topic.c_str() << ")"); MqttError ret = MqttOk; - subscriptions.insert(topic); + subscriptions.insert(topic); - if (local_broker==nullptr) // remote broker + if (local_broker==nullptr) // connected to a remote broker { return sendTopic(topic, MqttMessage::Type::Subscribe, qos); } else { - return local_broker->subscribe(topic, qos); + return local_broker->subscribe(this, topic, qos); } return ret; } @@ -568,7 +582,7 @@ void MqttClient::processMessage(MqttMessage* mesg) } else qoss.push_back(qos); - subscriptions.insert(topic); + subscribe(topic); } else { @@ -618,8 +632,8 @@ void MqttClient::processMessage(MqttMessage* mesg) #if TINY_MQTT_DEBUG if (TinyMqtt::debug >= 2) { - Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n"; - Console << "has " << (callback ? "" : "no ") << " callback.\n"; + Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\r\n"; + Console << "has " << (callback ? "" : "no ") << " callback.\r\n"; } #endif if (callback and isSubscribedTo(published)) @@ -728,9 +742,9 @@ bool Topic::matches(const Topic& topic) const // publish from local client -MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) +MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length, bool retain) { - MqttMessage msg(MqttMessage::Publish); + MqttMessage msg(MqttMessage::Publish, retain ? 1 : 0); msg.add(topic); msg.add(payload, pay_length, false); msg.complete(); @@ -894,6 +908,35 @@ MqttError MqttMessage::sendTo(MqttClient* client) return MqttOk; } +void MqttBroker::retainDrop() +{ + if (retained.size() >= retain_size) + { + std::map::iterator oldest = retained.begin(); + auto it = oldest; + while(++it != retained.end()) + { + if (oldest->second.timestamp > it->second.timestamp) + oldest = it; + } + retained.erase(oldest); + } +} + +void MqttBroker::retain(const Topic& topic, const MqttMessage& msg) +{ + debug("MqttBroker::retain msg_type=" << _HEX(msg.type())); + if (retain_size==0 or msg.type() != MqttMessage::Publish) return; + if (msg.flags() & 1) // flag RETAIN + { + debug(" retaining " << topic.str()); + if (retained.find(topic) == retained.end()) retainDrop(); + // FIXME if payload size == 0 remove message from retained + Retain r(micros(), msg); + retained.insert({ topic, std::move(r)}); + } +} + void MqttMessage::hexdump(const char* prefix) const { (void)prefix; diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 8f7149b..9119de4 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -121,7 +121,10 @@ class MqttMessage return (*bun << 8) | bun[1]; } MqttMessage() { reset(); } - MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; } + MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= (bits_d3_d0 & 0xF); } + MqttMessage(const MqttMessage& m) + : buffer(m.buffer), vheader(m.vheader), size(m.size), state(m.state) {} + void incoming(char byte); void add(char byte) { incoming(byte); } void add(const char* p, size_t len, bool addLength=true ); @@ -156,6 +159,15 @@ class MqttMessage MqttError sendTo(MqttClient*); void hexdump(const char* prefix=nullptr) const; + MqttMessage& operator = (MqttMessage&& m) + { + buffer = std::move(m.buffer); + vheader = m.vheader; + size = m.size; + state = m.state; + return *this; + } + private: void encodeLength(); @@ -228,11 +240,11 @@ class MqttClient }; // Publish from client to the world - MqttError publish(const Topic&, const char* payload, size_t pay_length); - MqttError publish(const Topic& t, const char* payload) { return publish(t, payload, strlen(payload)); } - MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); } - MqttError publish(const Topic& t, const string& s) { return publish(t,s.c_str(),s.length());} - MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; + MqttError publish(const Topic&, const char* payload, size_t pay_length, bool retain=false); + MqttError publish(const Topic& t, const char* payload, bool retain=false) { return publish(t, payload, strlen(payload), retain); } + MqttError publish(const Topic& t, const String& s, bool retain=false) { return publish(t, s.c_str(), s.length(), retain); } + MqttError publish(const Topic& t, const string& s, bool retain=false) { return publish(t,s.c_str(),s.length(), retain);} + MqttError publish(const Topic& t, bool retain=false) { return publish(t, nullptr, 0, retain);}; MqttError subscribe(Topic topic, uint8_t qos=0); MqttError unsubscribe(Topic topic); @@ -325,7 +337,7 @@ class MqttBroker }; public: // TODO limit max number of clients - MqttBroker(uint16_t port); + MqttBroker(uint16_t port, uint8_t retain_size=0); ~MqttBroker(); void begin() { server->begin(); } @@ -337,6 +349,7 @@ class MqttBroker bool connected() const { return state == Connected; } size_t clientsCount() const { return clients.size(); } + void retain(uint8_t size) { retain_size = size; } void dump(string indent="") { @@ -356,10 +369,9 @@ class MqttBroker bool checkPassword(const char* password, uint8_t len) const { return compareString(auth_password, password, len); } + MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); - MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const; - - MqttError subscribe(const Topic& topic, uint8_t qos); + MqttError subscribe(MqttClient*, const Topic& topic, uint8_t qos); // For clients that are added not by the broker itself (local clients) void addClient(MqttClient* client); @@ -376,4 +388,26 @@ class MqttBroker MqttClient* remote_broker = nullptr; State state = Disconnected; + + void retain(const Topic& topic, const MqttMessage& msg); + void retainDrop(); + + struct Retain + { + Retain(unsigned long ts, const MqttMessage& m) : timestamp(ts), msg(m) {} + Retain(Retain&& r) : timestamp(r.timestamp), msg(std::move(r.msg)) {} + + Retain& operator=(Retain&& r) + { + timestamp = r.timestamp; + msg = std::move(r.msg); + return *this; + } + + unsigned long timestamp; + MqttMessage msg; + }; + + std::map retained; + uint8_t retain_size; };