[fixes] Timeout and broker to broker modifications
This commit is contained in:
@@ -144,6 +144,7 @@ void MqttBroker::connect(const string& host, uint16_t port)
|
|||||||
if (remote_broker == nullptr) remote_broker = new MqttClient;
|
if (remote_broker == nullptr) remote_broker = new MqttClient;
|
||||||
remote_broker->connect(host, port);
|
remote_broker->connect(host, port);
|
||||||
remote_broker->local_broker = this; // Because connect removed the link
|
remote_broker->local_broker = this; // Because connect removed the link
|
||||||
|
// TODO shouldn't we resubscribe to all client subscriptions ?
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttBroker::removeClient(MqttClient* remove)
|
void MqttBroker::removeClient(MqttClient* remove)
|
||||||
@@ -308,25 +309,26 @@ void MqttClient::clientAlive(uint32_t more_seconds)
|
|||||||
|
|
||||||
void MqttClient::loop()
|
void MqttClient::loop()
|
||||||
{
|
{
|
||||||
if (keep_alive && (millis() >= alive))
|
if (keep_alive && (millis() >= alive - 5000))
|
||||||
{
|
{
|
||||||
if (local_broker)
|
if (tcp_client && tcp_client->connected())
|
||||||
{
|
|
||||||
debug(red << "timeout client");
|
|
||||||
close();
|
|
||||||
debug(red << "closed");
|
|
||||||
}
|
|
||||||
else if (tcp_client && tcp_client->connected())
|
|
||||||
{
|
{
|
||||||
debug("pingreq");
|
debug("pingreq");
|
||||||
uint16_t pingreq = MqttMessage::Type::PingReq;
|
static MqttMessage pingreq(MqttMessage::Type::PingReq);
|
||||||
tcp_client->write((const char*)(&pingreq), 2);
|
pingreq.sendTo(this);
|
||||||
clientAlive(0);
|
clientAlive(0);
|
||||||
|
|
||||||
// TODO when many MqttClient passes through a local broker
|
// TODO when many MqttClient passes through a local broker
|
||||||
// there is no need to send one PingReq per instance.
|
// there is no need to send one PingReq per instance.
|
||||||
}
|
}
|
||||||
|
else if (local_broker)
|
||||||
|
{
|
||||||
|
debug(red << "timeout client");
|
||||||
|
close();
|
||||||
|
debug(red << "closed");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#ifndef TINY_MQTT_ASYNC
|
#ifndef TINY_MQTT_ASYNC
|
||||||
while(tcp_client && tcp_client->available()>0)
|
while(tcp_client && tcp_client->available()>0)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -329,12 +329,6 @@ class MqttClient
|
|||||||
|
|
||||||
class MqttBroker
|
class MqttBroker
|
||||||
{
|
{
|
||||||
enum __attribute__((packed)) State
|
|
||||||
{
|
|
||||||
Disconnected, // Also the initial state
|
|
||||||
Connecting, // connect and sends a fake publish to avoid circular cnx
|
|
||||||
Connected, // this->broker is connected and circular cnx avoided
|
|
||||||
};
|
|
||||||
public:
|
public:
|
||||||
// TODO limit max number of clients
|
// TODO limit max number of clients
|
||||||
MqttBroker(uint16_t port, uint8_t retain_size=0);
|
MqttBroker(uint16_t port, uint8_t retain_size=0);
|
||||||
@@ -346,7 +340,7 @@ class MqttBroker
|
|||||||
/** Connect the broker to a parent broker */
|
/** Connect the broker to a parent broker */
|
||||||
void connect(const string& host, uint16_t port=1883);
|
void connect(const string& host, uint16_t port=1883);
|
||||||
/** returns true if connected to another broker */
|
/** returns true if connected to another broker */
|
||||||
bool connected() const { return state == Connected; }
|
bool connected() const { return remote_broker ? remote_broker->connected() : false; }
|
||||||
|
|
||||||
size_t clientsCount() const { return clients.size(); }
|
size_t clientsCount() const { return clients.size(); }
|
||||||
void retain(uint8_t size) { retain_size = size; }
|
void retain(uint8_t size) { retain_size = size; }
|
||||||
@@ -387,8 +381,6 @@ class MqttBroker
|
|||||||
const char* auth_password = "guest";
|
const char* auth_password = "guest";
|
||||||
MqttClient* remote_broker = nullptr;
|
MqttClient* remote_broker = nullptr;
|
||||||
|
|
||||||
State state = Disconnected;
|
|
||||||
|
|
||||||
void retain(const Topic& topic, const MqttMessage& msg);
|
void retain(const Topic& topic, const MqttMessage& msg);
|
||||||
void retainDrop();
|
void retainDrop();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user