From b33c9ba6872d5e39373574e054d198ce575a5dd7 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Fri, 19 Mar 2021 19:02:40 +0100 Subject: [PATCH] Version 0.2 --- examples/simple-broker/simple-broker.ino | 1 - examples/simple-client/simple-client.ino | 32 +++++++++ keywords.txt | 19 +++-- src/TinyMqtt.cpp | 88 ++++++++++++++++++++---- src/TinyMqtt.h | 59 +++++++++++----- 5 files changed, 159 insertions(+), 40 deletions(-) create mode 100644 examples/simple-client/simple-client.ino diff --git a/examples/simple-broker/simple-broker.ino b/examples/simple-broker/simple-broker.ino index 6f0fe64..254fd73 100644 --- a/examples/simple-broker/simple-broker.ino +++ b/examples/simple-broker/simple-broker.ino @@ -16,7 +16,6 @@ void setup() WiFi.begin(ssid, password); while (WiFi.status() != WL_CONNECTED) { - delay(500); Serial << '.'; delay(500); } diff --git a/examples/simple-client/simple-client.ino b/examples/simple-client/simple-client.ino new file mode 100644 index 0000000..6f0fe64 --- /dev/null +++ b/examples/simple-client/simple-client.ino @@ -0,0 +1,32 @@ +#include +#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt +#include // https://github.com/janelia-arduino/Streaming + +const char *ssid = ; // Put here your wifi SSID ("ssid") +const char *password = ; // Put here your Wifi password ("pwd") + +#define PORT 1883 +MqttBroker broker(PORT); + +void setup() +{ + Serial.begin(115200); + + WiFi.mode(WIFI_STA); + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) { + delay(500); + Serial << '.'; + delay(500); + } + Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; + + broker.begin(); + Serial << "Broker ready : " << WiFi.localIP() << " on port " << PORT << endl; +} + +void loop() +{ + broker.loop(); +} diff --git a/keywords.txt b/keywords.txt index 2430b38..91c46ae 100644 --- a/keywords.txt +++ b/keywords.txt @@ -3,16 +3,23 @@ ####################################### ####################################### -# Datatypes (KEYWORD1) +# Datatypes and functions ####################################### -MqttBroker KEYWORD1 +TinyMqtt KEYWORD1 + +MqttBroker KEYWORD1 +begin KEYWORD2 +loop KEYWORD2 + MqttClient KEYWORD1 +publish KEYWORD2 +setCallback KEYWORD2 +subscribe KEYWORD2 -####################################### -# Methods and Functions (KEYWORD2) -####################################### - +Topic KEYWORD1 +matches KEYWORD2 +c_str KEYWORD2 ####################################### # Constants (LITERAL1) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index b5f694a..c130f8d 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -15,20 +15,29 @@ MqttBroker::MqttBroker(uint16_t port) { } -MqttCnx::MqttCnx(MqttBroker* parent, WiFiClient& new_client) +MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) : parent(parent), mqtt_connected(false) { client = new_client ? new WiFiClient(new_client) : nullptr; - clientAlive(); + alive = millis()+5000; // client expires after 5s if no CONNECT msg } -MqttCnx::~MqttCnx() +MqttClient::MqttClient(MqttBroker* parent) + : parent(parent) +{ + client = nullptr; + + parent->addClient(this); +} + +MqttClient::~MqttClient() { close(); + parent->removeClient(this); } -void MqttCnx::close() +void MqttClient::close() { if (client) { @@ -38,13 +47,32 @@ void MqttCnx::close() } } +void MqttBroker::addClient(MqttClient* client) +{ + clients.push_back(client); +} + +void MqttBroker::removeClient(MqttClient* remove) +{ + for(auto it=clients.begin(); it!=clients.end(); it++) + { + auto client=*it; + if (client==remove) + { + clients.erase(it); + return; + } + } + Serial << "Error cannot remove client" << endl; // TODO should not occur +} + void MqttBroker::loop() { WiFiClient client = server.available(); if (client) { - clients.push_back(new MqttCnx(this, client)); + addClient(new MqttClient(this, client)); Serial << "New client (" << clients.size() << ')' << endl; } @@ -58,7 +86,7 @@ void MqttBroker::loop() else { Serial << "Client " << client->id().c_str() << " Disconnected" << endl; - clients.erase(it); + // Note: deleting a client not added by the broker itself will probably crash later. delete client; break; } @@ -67,6 +95,7 @@ void MqttBroker::loop() void MqttBroker::publish(const Topic& topic, MqttMessage& msg) { + Serial << " publish" << __LINE__ << endl; for(auto client: clients) client->publish(topic, msg); } @@ -87,7 +116,7 @@ void MqttMessage::getString(char* &buffer, uint16_t& len) buffer+=2; } -void MqttCnx::clientAlive() +void MqttClient::clientAlive() { if (keep_alive) { @@ -97,7 +126,7 @@ void MqttCnx::clientAlive() alive=0; } -void MqttCnx::loop() +void MqttClient::loop() { if (alive && (millis() > alive)) { @@ -115,7 +144,7 @@ void MqttCnx::loop() } } -void MqttCnx::processMessage() +void MqttClient::processMessage() { std::string error; std::string s; @@ -186,7 +215,7 @@ void MqttCnx::processMessage() message.getString(payload, len); // Topic outstring("Subscribes", payload, len); - subscriptions.insert(Topic(payload, len)); + subscribe(Topic(payload, len)); bclose = false; // TODO SUBACK break; @@ -241,14 +270,40 @@ bool Topic::matches(const Topic& topic) const return false; } -void MqttCnx::publish(const Topic& topic, MqttMessage& msg) +// publish from local client to a broker +void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) { + Serial << " publish" << __LINE__ << endl; + message.create(MqttMessage::Publish); + message.add(topic); + message.add(payload, pay_length); + if (parent) + parent->publish(topic, message); + else if (client) + publish(topic, message); + else + Serial << " Should not happen" << endl; +} + +// republish a received publish if it matches any in subscriptions +void MqttClient::publish(const Topic& topic, MqttMessage& msg) +{ + Serial << " publish " << topic.c_str() << __LINE__ << endl; for(const auto& subscription: subscriptions) { + Serial << " check " << subscription.c_str() << __LINE__ << endl; if (subscription.matches(topic)) { - // Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl; - msg.sendTo(this); + Serial << " matche !" << endl; + if (client) + { + // Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl; + msg.sendTo(this); + } + else if (callback) + { + callback(topic, nullptr, 0); // TODO + } } } } @@ -311,6 +366,11 @@ void MqttMessage::incoming(char in_byte) } } +void MqttMessage::add(const char* p, size_t len) +{ + while(len--) incoming(*p); +} + void MqttMessage::encodeLength(char* msb, int length) { do @@ -322,7 +382,7 @@ void MqttMessage::encodeLength(char* msb, int length) } while (length); }; -void MqttMessage::sendTo(MqttCnx* client) +void MqttMessage::sendTo(MqttClient* client) { if (curr-buffer-2 >= 0) { diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 5ee47ee..e45edad 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -11,11 +11,14 @@ class Topic : public IndexedString public: Topic(const char* s, uint8_t len) : IndexedString(s,len){} Topic(const char* s) : Topic(s, strlen(s)) {} + Topic(const std::string s) : Topic(s.c_str(), s.length()){}; + + const char* c_str() const { return str().c_str(); } bool matches(const Topic&) const; }; -class MqttCnx; +class MqttClient; class MqttMessage { public: @@ -45,6 +48,9 @@ class MqttMessage MqttMessage(Type t) { create(t); } void incoming(char byte); void add(char byte) { incoming(byte); } + void add(const char* p, size_t len); + void add(const std::string& s) { add(s.c_str(), s.length()); } + void add(const Topic& t) { add(t.str()); } char* getVHeader() const { return vheader; } char* end() const { return curr; } uint16_t length() const { return curr-buffer; } @@ -69,7 +75,7 @@ class MqttMessage size=0; state=Create; } - void sendTo(MqttCnx*); + void sendTo(MqttClient*); void hexdump(const char* prefix=nullptr) const; private: @@ -83,8 +89,9 @@ class MqttMessage }; class MqttBroker; -class MqttCnx +class MqttClient { + using CallBack = void (*)(const Topic& topic, const char* payload, size_t payload_length); enum Flags { FlagUserName = 128, @@ -96,11 +103,11 @@ class MqttCnx FlagReserved = 1 }; public: - MqttCnx(MqttBroker* parent, WiFiClient& client); + MqttClient(MqttBroker*); - ~MqttCnx(); + ~MqttClient(); - bool connected() { return client && client->connected(); } + bool connected() { return client==nullptr || client->connected(); } void write(const char* buf, size_t length) { if (client) client->write(buf, length); } @@ -108,9 +115,22 @@ class MqttCnx void loop(); void close(); - void publish(const Topic& topic, MqttMessage& msg); + void setCallback(CallBack fun) {callback=fun; }; + + // Publish from client to the world + void publish(const Topic&, const char* payload, size_t pay_length); + void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());} + void publish(const Topic& t) { publish(t, nullptr, 0);}; + + void subscribe(Topic topic) { subscriptions.insert(topic); } + void unsubscribe(Topic& topic); private: + friend class MqttBroker; + MqttClient(MqttBroker* parent, WiFiClient& client); + // republish a received publish if topic matches any in subscriptions + void publish(const Topic& topic, MqttMessage& msg); + void clientAlive(); void processMessage(); @@ -118,20 +138,12 @@ class MqttCnx uint32_t keep_alive; uint32_t alive; bool mqtt_connected; - WiFiClient* client; + WiFiClient* client; // nullptr if this client is local MqttMessage message; MqttBroker* parent; std::set subscriptions; std::string clientId; -}; - -class MqttClient -{ - public: - MqttClient(IPAddress broker) : broker_ip(broker) {} - - protected: - IPAddress broker_ip; + CallBack callback; }; class MqttBroker @@ -142,17 +154,26 @@ class MqttBroker void begin() { server.begin(); } void loop(); + uint8_t port() const { return server.port(); } + + private: + friend class MqttClient; + bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } bool checkPassword(const char* password, uint8_t len) const { return compareString(auth_password, password, len); } + void publish(const Topic& topic, MqttMessage& msg); - private: + // For clients that are added not by the broker itself + void addClient(MqttClient* client); + void removeClient(MqttClient* client); + bool compareString(const char* good, const char* str, uint8_t str_len) const; - std::vector clients; + std::vector clients; WiFiServer server; const char* auth_user = "guest";