From f348d821670a7d4d2c385989f419b3e3a611d348 Mon Sep 17 00:00:00 2001 From: hsaturn Date: Wed, 28 Dec 2022 23:02:26 +0100 Subject: [PATCH] Try to fix alive problem --- src/TinyMqtt.cpp | 59 ++++++++++++++----------------- src/TinyMqtt.h | 22 +++++++----- tests/local-tests/Makefile | 2 +- tests/local-tests/local-tests.ino | 14 +++++++- 4 files changed, 55 insertions(+), 42 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 51b654c..4c9a78b 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -34,8 +34,8 @@ MqttBroker::~MqttBroker() // private constructor used by broker only MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client) - : local_broker(local_broker) { + connect(local_broker); debug("MqttClient private with broker"); #ifdef TINY_MQTT_ASYNC client = new_client; @@ -45,19 +45,16 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client) #else client = new WiFiClient(*new_client); #endif -#ifdef EPOXY_DUINO - alive = millis()+500000; -#else - alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg -#endif + alive = millis()+5000; } MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id) : local_broker(local_broker), clientId(id) { - client = nullptr; + alive = 0; + client = nullptr; - if (local_broker) local_broker->addClient(this); + if (local_broker) local_broker->addClient(this); } MqttClient::~MqttClient() @@ -69,7 +66,7 @@ MqttClient::~MqttClient() void MqttClient::close(bool bSendDisconnect) { debug("close " << id().c_str()); - mqtt_connected = false; + mqtt_flags &= ~FlagConnected; if (client) // connected to a remote broker { if (bSendDisconnect and client->connected()) @@ -91,6 +88,7 @@ void MqttClient::close(bool bSendDisconnect) void MqttClient::connect(MqttBroker* local) { debug("MqttClient::connect_local"); + alive = 0; close(); local_broker = local; } @@ -268,16 +266,12 @@ void MqttMessage::getString(const char* &buff, uint16_t& len) buff+=2; } -void MqttClient::clientAlive(uint32_t more_seconds) +void MqttClient::clientAlive() { debug("MqttClient::clientAlive"); if (keep_alive) { -#ifdef EPOXY_DUINO - alive=millis()+500000+0*more_seconds; -#else - alive=millis()+1000*(keep_alive+more_seconds); -#endif + alive=millis()+1000*(keep_alive+local_broker ? 5 : 0); } else alive=0; @@ -285,11 +279,11 @@ void MqttClient::clientAlive(uint32_t more_seconds) void MqttClient::loop() { - if (alive && (millis() > alive)) + if (alive && (millis() >= alive)) { if (local_broker) { - debug(red << "timeout client"); + Serial << "timeout client " << clientId << endl; close(); debug(red << "closed"); } @@ -298,7 +292,7 @@ void MqttClient::loop() debug("pingreq"); uint16_t pingreq = MqttMessage::Type::PingReq; client->write((const char*)(&pingreq), 2); - clientAlive(0); + clientAlive(); // TODO when many MqttClient passes through a local broker // there is no need to send one PingReq per instance. @@ -334,7 +328,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) msg.reset(); debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker); - mqtt->clientAlive(0); + mqtt->clientAlive(); } #ifdef TINY_MQTT_ASYNC @@ -441,13 +435,14 @@ void MqttClient::processMessage(MqttMessage* mesg) switch(mesg->type()) { case MqttMessage::Type::Connect: - if (mqtt_connected) + if (mqtt_flags & FlagConnected) { debug("already connected"); break; } payload = header+10; - mqtt_flags = header[7]; + // Todo should check that reserved == 0 (spec) + mqtt_flags = header[7] & ~FlagConnected; keep_alive = MqttMessage::getSize(header+8); if (strncmp("MQTT", header+2,4)) { @@ -491,7 +486,7 @@ void MqttClient::processMessage(MqttMessage* mesg) Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl; #endif bclose = false; - mqtt_connected=true; + mqtt_flags |= FlagConnected; { MqttMessage msg(MqttMessage::Type::ConnAck); msg.add(0); // Session present (not implemented) @@ -501,14 +496,14 @@ void MqttClient::processMessage(MqttMessage* mesg) break; case MqttMessage::Type::ConnAck: - mqtt_connected = true; + mqtt_flags |= FlagConnected; bclose = false; resubscribe(); break; case MqttMessage::Type::SubAck: case MqttMessage::Type::PubAck: - if (!mqtt_connected) break; + if (not (mqtt_flags & FlagConnected)) break; // Ignore acks bclose = false; break; @@ -519,7 +514,7 @@ void MqttClient::processMessage(MqttMessage* mesg) break; case MqttMessage::Type::PingReq: - if (!mqtt_connected) break; + if (not (mqtt_flags & FlagConnected)) break; if (client) { uint16_t pingreq = MqttMessage::Type::PingResp; @@ -536,7 +531,7 @@ void MqttClient::processMessage(MqttMessage* mesg) case MqttMessage::Type::Subscribe: case MqttMessage::Type::UnSubscribe: { - if (!mqtt_connected) break; + if (not (mqtt_flags & FlagConnected)) break; payload = header+2; debug("un/subscribe loop"); @@ -580,15 +575,15 @@ void MqttClient::processMessage(MqttMessage* mesg) break; case MqttMessage::Type::UnSuback: - if (!mqtt_connected) break; + if (not (mqtt_flags & FlagConnected)) break; bclose = false; break; case MqttMessage::Type::Publish: #if TINY_MQTT_DEBUG - Console << "publish " << mqtt_connected << '/' << (long) client << endl; + Console << "publish " << (mqtt_flags & FlagConnected) << '/' << (long) client << endl; #endif - if (mqtt_connected or client == nullptr) + if ((mqtt_flags & FlagConnected) or client == nullptr) { uint8_t qos = mesg->flags(); payload = header; @@ -629,8 +624,8 @@ void MqttClient::processMessage(MqttMessage* mesg) case MqttMessage::Type::Disconnect: // TODO should discard any will msg - if (!mqtt_connected) break; - mqtt_connected = false; + if (not (mqtt_flags & FlagConnected)) break; + mqtt_flags &= ~FlagConnected; close(false); bclose=false; break; @@ -651,7 +646,7 @@ void MqttClient::processMessage(MqttMessage* mesg) } else { - clientAlive(local_broker ? 5 : 0); + clientAlive(); } } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 8980d9b..089856b 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -4,6 +4,9 @@ #ifndef TINY_MQTT_DEBUG #define TINY_MQTT_DEBUG 0 #endif +#ifndef TINY_MQTT_DEFAULT_ALIVE +#define TINY_MQTT_DEFAULT_ALIVE 10 +#endif // TODO Should add a AUnit with both TINY_MQTT_ASYNC and not TINY_MQTT_ASYNC // #define TINY_MQTT_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx @@ -173,7 +176,9 @@ class MqttClient FlagWillQos = 16 | 8, // unsupported FlagWill = 4, // unsupported FlagCleanSession = 2, // unsupported - FlagReserved = 1 + + FlagReserved = 1, // use reserved as connected (save 1 byte) + FlagConnected = 1 }; public: @@ -187,7 +192,7 @@ class MqttClient ~MqttClient(); void connect(MqttBroker* local_broker); - void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10); + void connect(std::string broker, uint16_t port, uint16_t keep_alive = TINY_MQTT_DEFAULT_ALIVE); // TODO it seems that connected returns true in tcp mode even if // no negociation occurred @@ -282,13 +287,14 @@ class MqttClient // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg); - void clientAlive(uint32_t more_seconds); + void clientAlive(); void processMessage(MqttMessage* message); - bool mqtt_connected = false; - char mqtt_flags; - uint32_t keep_alive = 30; - uint32_t alive; + char mqtt_flags = 0; + uint16_t keep_alive = 30; + // for client connected to remote broker, PingReq is sent when millis() >= alive + // for a client managed by a broker, disconnect it if millis() >= alive + uint32_t alive; // PingReq if millis() > alive, MqttMessage message; // connection to local broker, or link to the parent @@ -296,7 +302,7 @@ class MqttClient MqttBroker* local_broker=nullptr; TcpClient* client=nullptr; // connection to remote broker - std::set subscriptions; + std::set subscriptions; std::string clientId; CallBack callback = nullptr; }; diff --git a/tests/local-tests/Makefile b/tests/local-tests/Makefile index e1c23a9..41c5d71 100644 --- a/tests/local-tests/Makefile +++ b/tests/local-tests/Makefile @@ -1,7 +1,7 @@ # See https://github.com/bxparks/EpoxyDuino for documentation about this # Makefile to compile and run Arduino programs natively on Linux or MacOS. -EXTRA_CXXFLAGS=-g3 -O0 +EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEFAULT_ALIVE=1 # Remove flto flag from EpoxyDuino (too many ) CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics diff --git a/tests/local-tests/local-tests.ino b/tests/local-tests/local-tests.ino index 9d59bb9..da8622e 100644 --- a/tests/local-tests/local-tests.ino +++ b/tests/local-tests/local-tests.ino @@ -14,7 +14,6 @@ using namespace std; -MqttBroker broker(1883); std::map> published; // map[client_id] => map[topic] = count @@ -31,6 +30,7 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, test(local_client_should_unregister_when_destroyed) { + MqttBroker broker(1883); assertEqual(broker.clientsCount(), (size_t)0); { assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected @@ -40,6 +40,18 @@ test(local_client_should_unregister_when_destroyed) assertEqual(broker.clientsCount(), (size_t)0); } +test(local_client_alive) +{ + MqttBroker broker(1883); + MqttClient client(&broker); + for(int i=0; i<10; i++) + { + assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected + broker.loop(); + usleep(TINY_MQTT_DEFAULT_ALIVE*1000000/2); + } +} + #if 0 test(local_connect) {