From cc708cdf22fc6635324b6b9666e2a9237a26a225 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Fri, 19 Mar 2021 22:04:04 +0100 Subject: [PATCH] Example when wifi is not connected --- .../client-without-wifi.ino | 64 +++++++++++++++++++ library.json | 2 +- library.properties | 2 +- src/TinyMqtt.cpp | 35 ++++++---- src/TinyMqtt.h | 35 +++++++++- 5 files changed, 124 insertions(+), 14 deletions(-) create mode 100644 examples/client-without-wifi/client-without-wifi.ino diff --git a/examples/client-without-wifi/client-without-wifi.ino b/examples/client-without-wifi/client-without-wifi.ino new file mode 100644 index 0000000..9792afe --- /dev/null +++ b/examples/client-without-wifi/client-without-wifi.ino @@ -0,0 +1,64 @@ +#include // https://github.com/hsaturn/TinyMqtt +#include // https://github.com/janelia-arduino/Streaming + +/** TinyMQTT allows a disconnected mode: + * + * In this example, local clients A and B are talking together, no need to be connected. + * A single ESP can use this to be able to comunicate with itself with the power + * of MQTT, and once connected still continue to work with others. + * + */ + +std::string topic="sensor/temperature"; + +MqttBroker broker(1883); +MqttClient mqtt_a(&broker); +MqttClient mqtt_b(&broker); + +void onPublishA(const Topic& topic, const char* payload, size_t length) +{ Serial << "--> A Received " << topic.c_str() << endl; } + +void onPublishB(const Topic& topic, const char* payload, size_t length) +{ Serial << "--> B Received " << topic.c_str() << endl; } + +void setup() +{ + Serial.begin(115200); + delay(500); + Serial << "init" << endl; + + mqtt_a.setCallback(onPublishA); + mqtt_a.subscribe(topic); + + mqtt_b.setCallback(onPublishB); + mqtt_b.subscribe(topic); +} + +void loop() +{ + broker.loop(); + mqtt_a.loop(); + mqtt_b.loop(); + + // ============= client A publish ================ + static const int intervalA = 5000; + static uint32_t timerA = millis() + intervalA; + + if (millis() > timerA) + { + Serial << "A is publishing " << topic.c_str() << endl; + timerA += intervalA; + mqtt_a.publish(topic); + } + + // ============= client B publish ================ + static const int intervalB = 3000; // will send topic each 5000 ms + static uint32_t timerB = millis() + intervalB; + + if (millis() > timerB) + { + Serial << "B is publishing " << topic.c_str() << endl; + timerB += intervalB; + mqtt_b.publish(topic); + } +} diff --git a/library.json b/library.json index 838e1fc..efe1a47 100644 --- a/library.json +++ b/library.json @@ -6,7 +6,7 @@ "type": "git", "url": "https://github.com/hsaturn/TinyMqtt.git" }, - "version": "0.1", + "version": "0.2", "exclude": "", "examples": "examples/*/*.ino", "frameworks": "arduino", diff --git a/library.properties b/library.properties index 6c52853..1743238 100644 --- a/library.properties +++ b/library.properties @@ -1,5 +1,5 @@ name=TinyMqtt -version=0.1.0 +version=0.2.0 author=HSaturn maintainer=HSaturn sentence=A tiny broker and client library for MQTT messaging. diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index c130f8d..f9c6600 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -93,11 +93,29 @@ void MqttBroker::loop() } } -void MqttBroker::publish(const Topic& topic, MqttMessage& msg) +// Should be called for inside and outside incoming publishes (all) +void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) { - Serial << " publish" << __LINE__ << endl; for(auto client: clients) - client->publish(topic, msg); + { + bool doit = false; + if (broker && broker->connected()) // Connected: R2 R3 R5 R6 + { + if (!client->isLocal()) // R2 go outside allowed + doit = true; + else // R3 any client to outside allowed + doit = true; + } + else // Disconnected: R3 R4 R5 + { + if (!source->isLocal()) // R3 + doit = true; + else if (client->isLocal()) // R4 local -> local + doit = true; + } + + if (doit) client->publish(topic, msg); // goes outside R2 + } } bool MqttBroker::compareString( @@ -234,7 +252,7 @@ void MqttClient::processMessage() if (qos) payload+=2; // ignore packet identifier if any // TODO reset DUP // TODO reset RETAIN - parent->publish(published, message); + parent->publish(this, published, message); // TODO should send PUBACK bclose = false; } @@ -270,15 +288,14 @@ bool Topic::matches(const Topic& topic) const return false; } -// publish from local client to a broker +// publish from local client void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) { - Serial << " publish" << __LINE__ << endl; message.create(MqttMessage::Publish); message.add(topic); message.add(payload, pay_length); if (parent) - parent->publish(topic, message); + parent->publish(this, topic, message); else if (client) publish(topic, message); else @@ -288,16 +305,12 @@ void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_len // republish a received publish if it matches any in subscriptions void MqttClient::publish(const Topic& topic, MqttMessage& msg) { - Serial << " publish " << topic.c_str() << __LINE__ << endl; for(const auto& subscription: subscriptions) { - Serial << " check " << subscription.c_str() << __LINE__ << endl; if (subscription.matches(topic)) { - Serial << " matche !" << endl; if (client) { - // Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl; msg.sendTo(this); } else if (callback) diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index e45edad..2fef85c 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -125,6 +125,8 @@ class MqttClient void subscribe(Topic topic) { subscriptions.insert(topic); } void unsubscribe(Topic& topic); + bool isLocal() const { return client==nullptr; } + private: friend class MqttBroker; MqttClient(MqttBroker* parent, WiFiClient& client); @@ -146,8 +148,33 @@ class MqttClient CallBack callback; }; +/*********************************************** + * R1 - accept external cnx + * R2 - allows all clients pusblish to go outside + * R3 - allows ext publish to all clients + * R4 - allows local publish to local clients + * R5 - tries to connect elsewhere (*) + * R6 - disconnect external clients + * --------------------------------------------- + * (*) single client or ip range + * --------------------------------------------- + * + * =============================================+ + * | connected | not connected | + * -------------+---------------+---------------+ + * proxy broker | R2 R3 R5 R6 | R3 R4 R5 | + * normal broker| R2 R3 R5 R6 | R1 R3 R4 R5 | + * -------------+---------------+---------------+ + * + */ class MqttBroker { + enum State + { + Disconnected, // Also the initial state + Connecting, // connect and sends a fake publish to avoid circular cnx + Connected, // this->broker is connected and circular cnx avoided + }; public: MqttBroker(uint16_t port); @@ -156,6 +183,9 @@ class MqttBroker uint8_t port() const { return server.port(); } + void connect(std::string host, uint32_t port=1883); + bool connected() const { return state == Connected; } + private: friend class MqttClient; @@ -166,7 +196,7 @@ class MqttBroker { return compareString(auth_password, password, len); } - void publish(const Topic& topic, MqttMessage& msg); + void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); // For clients that are added not by the broker itself void addClient(MqttClient* client); @@ -178,4 +208,7 @@ class MqttBroker const char* auth_user = "guest"; const char* auth_password = "guest"; + State state = Disconnected; + + MqttClient* broker = nullptr; };