Try to fix alive problem

This commit is contained in:
hsaturn
2022-12-28 23:02:26 +01:00
parent 2b92833ea5
commit f348d82167
4 changed files with 55 additions and 42 deletions

View File

@@ -34,8 +34,8 @@ MqttBroker::~MqttBroker()
// private constructor used by broker only // private constructor used by broker only
MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client) MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
: local_broker(local_broker)
{ {
connect(local_broker);
debug("MqttClient private with broker"); debug("MqttClient private with broker");
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
client = new_client; client = new_client;
@@ -45,19 +45,16 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
#else #else
client = new WiFiClient(*new_client); client = new WiFiClient(*new_client);
#endif #endif
#ifdef EPOXY_DUINO alive = millis()+5000;
alive = millis()+500000;
#else
alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg
#endif
} }
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id) MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
: local_broker(local_broker), clientId(id) : local_broker(local_broker), clientId(id)
{ {
client = nullptr; alive = 0;
client = nullptr;
if (local_broker) local_broker->addClient(this); if (local_broker) local_broker->addClient(this);
} }
MqttClient::~MqttClient() MqttClient::~MqttClient()
@@ -69,7 +66,7 @@ MqttClient::~MqttClient()
void MqttClient::close(bool bSendDisconnect) void MqttClient::close(bool bSendDisconnect)
{ {
debug("close " << id().c_str()); debug("close " << id().c_str());
mqtt_connected = false; mqtt_flags &= ~FlagConnected;
if (client) // connected to a remote broker if (client) // connected to a remote broker
{ {
if (bSendDisconnect and client->connected()) if (bSendDisconnect and client->connected())
@@ -91,6 +88,7 @@ void MqttClient::close(bool bSendDisconnect)
void MqttClient::connect(MqttBroker* local) void MqttClient::connect(MqttBroker* local)
{ {
debug("MqttClient::connect_local"); debug("MqttClient::connect_local");
alive = 0;
close(); close();
local_broker = local; local_broker = local;
} }
@@ -268,16 +266,12 @@ void MqttMessage::getString(const char* &buff, uint16_t& len)
buff+=2; buff+=2;
} }
void MqttClient::clientAlive(uint32_t more_seconds) void MqttClient::clientAlive()
{ {
debug("MqttClient::clientAlive"); debug("MqttClient::clientAlive");
if (keep_alive) if (keep_alive)
{ {
#ifdef EPOXY_DUINO alive=millis()+1000*(keep_alive+local_broker ? 5 : 0);
alive=millis()+500000+0*more_seconds;
#else
alive=millis()+1000*(keep_alive+more_seconds);
#endif
} }
else else
alive=0; alive=0;
@@ -285,11 +279,11 @@ void MqttClient::clientAlive(uint32_t more_seconds)
void MqttClient::loop() void MqttClient::loop()
{ {
if (alive && (millis() > alive)) if (alive && (millis() >= alive))
{ {
if (local_broker) if (local_broker)
{ {
debug(red << "timeout client"); Serial << "timeout client " << clientId << endl;
close(); close();
debug(red << "closed"); debug(red << "closed");
} }
@@ -298,7 +292,7 @@ void MqttClient::loop()
debug("pingreq"); debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((const char*)(&pingreq), 2); client->write((const char*)(&pingreq), 2);
clientAlive(0); clientAlive();
// TODO when many MqttClient passes through a local broker // TODO when many MqttClient passes through a local broker
// there is no need to send one PingReq per instance. // there is no need to send one PingReq per instance.
@@ -334,7 +328,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
msg.reset(); msg.reset();
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker); debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker);
mqtt->clientAlive(0); mqtt->clientAlive();
} }
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
@@ -441,13 +435,14 @@ void MqttClient::processMessage(MqttMessage* mesg)
switch(mesg->type()) switch(mesg->type())
{ {
case MqttMessage::Type::Connect: case MqttMessage::Type::Connect:
if (mqtt_connected) if (mqtt_flags & FlagConnected)
{ {
debug("already connected"); debug("already connected");
break; break;
} }
payload = header+10; payload = header+10;
mqtt_flags = header[7]; // Todo should check that reserved == 0 (spec)
mqtt_flags = header[7] & ~FlagConnected;
keep_alive = MqttMessage::getSize(header+8); keep_alive = MqttMessage::getSize(header+8);
if (strncmp("MQTT", header+2,4)) if (strncmp("MQTT", header+2,4))
{ {
@@ -491,7 +486,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl; Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl;
#endif #endif
bclose = false; bclose = false;
mqtt_connected=true; mqtt_flags |= FlagConnected;
{ {
MqttMessage msg(MqttMessage::Type::ConnAck); MqttMessage msg(MqttMessage::Type::ConnAck);
msg.add(0); // Session present (not implemented) msg.add(0); // Session present (not implemented)
@@ -501,14 +496,14 @@ void MqttClient::processMessage(MqttMessage* mesg)
break; break;
case MqttMessage::Type::ConnAck: case MqttMessage::Type::ConnAck:
mqtt_connected = true; mqtt_flags |= FlagConnected;
bclose = false; bclose = false;
resubscribe(); resubscribe();
break; break;
case MqttMessage::Type::SubAck: case MqttMessage::Type::SubAck:
case MqttMessage::Type::PubAck: case MqttMessage::Type::PubAck:
if (!mqtt_connected) break; if (not (mqtt_flags & FlagConnected)) break;
// Ignore acks // Ignore acks
bclose = false; bclose = false;
break; break;
@@ -519,7 +514,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
break; break;
case MqttMessage::Type::PingReq: case MqttMessage::Type::PingReq:
if (!mqtt_connected) break; if (not (mqtt_flags & FlagConnected)) break;
if (client) if (client)
{ {
uint16_t pingreq = MqttMessage::Type::PingResp; uint16_t pingreq = MqttMessage::Type::PingResp;
@@ -536,7 +531,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
case MqttMessage::Type::UnSubscribe: case MqttMessage::Type::UnSubscribe:
{ {
if (!mqtt_connected) break; if (not (mqtt_flags & FlagConnected)) break;
payload = header+2; payload = header+2;
debug("un/subscribe loop"); debug("un/subscribe loop");
@@ -580,15 +575,15 @@ void MqttClient::processMessage(MqttMessage* mesg)
break; break;
case MqttMessage::Type::UnSuback: case MqttMessage::Type::UnSuback:
if (!mqtt_connected) break; if (not (mqtt_flags & FlagConnected)) break;
bclose = false; bclose = false;
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
#if TINY_MQTT_DEBUG #if TINY_MQTT_DEBUG
Console << "publish " << mqtt_connected << '/' << (long) client << endl; Console << "publish " << (mqtt_flags & FlagConnected) << '/' << (long) client << endl;
#endif #endif
if (mqtt_connected or client == nullptr) if ((mqtt_flags & FlagConnected) or client == nullptr)
{ {
uint8_t qos = mesg->flags(); uint8_t qos = mesg->flags();
payload = header; payload = header;
@@ -629,8 +624,8 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::Disconnect: case MqttMessage::Type::Disconnect:
// TODO should discard any will msg // TODO should discard any will msg
if (!mqtt_connected) break; if (not (mqtt_flags & FlagConnected)) break;
mqtt_connected = false; mqtt_flags &= ~FlagConnected;
close(false); close(false);
bclose=false; bclose=false;
break; break;
@@ -651,7 +646,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
} }
else else
{ {
clientAlive(local_broker ? 5 : 0); clientAlive();
} }
} }

View File

@@ -4,6 +4,9 @@
#ifndef TINY_MQTT_DEBUG #ifndef TINY_MQTT_DEBUG
#define TINY_MQTT_DEBUG 0 #define TINY_MQTT_DEBUG 0
#endif #endif
#ifndef TINY_MQTT_DEFAULT_ALIVE
#define TINY_MQTT_DEFAULT_ALIVE 10
#endif
// TODO Should add a AUnit with both TINY_MQTT_ASYNC and not TINY_MQTT_ASYNC // TODO Should add a AUnit with both TINY_MQTT_ASYNC and not TINY_MQTT_ASYNC
// #define TINY_MQTT_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx // #define TINY_MQTT_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
@@ -173,7 +176,9 @@ class MqttClient
FlagWillQos = 16 | 8, // unsupported FlagWillQos = 16 | 8, // unsupported
FlagWill = 4, // unsupported FlagWill = 4, // unsupported
FlagCleanSession = 2, // unsupported FlagCleanSession = 2, // unsupported
FlagReserved = 1
FlagReserved = 1, // use reserved as connected (save 1 byte)
FlagConnected = 1
}; };
public: public:
@@ -187,7 +192,7 @@ class MqttClient
~MqttClient(); ~MqttClient();
void connect(MqttBroker* local_broker); void connect(MqttBroker* local_broker);
void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10); void connect(std::string broker, uint16_t port, uint16_t keep_alive = TINY_MQTT_DEFAULT_ALIVE);
// TODO it seems that connected returns true in tcp mode even if // TODO it seems that connected returns true in tcp mode even if
// no negociation occurred // no negociation occurred
@@ -282,13 +287,14 @@ class MqttClient
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg); MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds); void clientAlive();
void processMessage(MqttMessage* message); void processMessage(MqttMessage* message);
bool mqtt_connected = false; char mqtt_flags = 0;
char mqtt_flags; uint16_t keep_alive = 30;
uint32_t keep_alive = 30; // for client connected to remote broker, PingReq is sent when millis() >= alive
uint32_t alive; // for a client managed by a broker, disconnect it if millis() >= alive
uint32_t alive; // PingReq if millis() > alive,
MqttMessage message; MqttMessage message;
// connection to local broker, or link to the parent // connection to local broker, or link to the parent
@@ -296,7 +302,7 @@ class MqttClient
MqttBroker* local_broker=nullptr; MqttBroker* local_broker=nullptr;
TcpClient* client=nullptr; // connection to remote broker TcpClient* client=nullptr; // connection to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
}; };

View File

@@ -1,7 +1,7 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this # See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS. # Makefile to compile and run Arduino programs natively on Linux or MacOS.
EXTRA_CXXFLAGS=-g3 -O0 EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEFAULT_ALIVE=1
# Remove flto flag from EpoxyDuino (too many <optimized out>) # Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics

View File

@@ -14,7 +14,6 @@
using namespace std; using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
@@ -31,6 +30,7 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
test(local_client_should_unregister_when_destroyed) test(local_client_should_unregister_when_destroyed)
{ {
MqttBroker broker(1883);
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
{ {
assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected
@@ -40,6 +40,18 @@ test(local_client_should_unregister_when_destroyed)
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
} }
test(local_client_alive)
{
MqttBroker broker(1883);
MqttClient client(&broker);
for(int i=0; i<10; i++)
{
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected
broker.loop();
usleep(TINY_MQTT_DEFAULT_ALIVE*1000000/2);
}
}
#if 0 #if 0
test(local_connect) test(local_connect)
{ {