Memory deletion fixes
This commit is contained in:
@@ -27,7 +27,14 @@ MqttBroker::~MqttBroker()
|
|||||||
{
|
{
|
||||||
while(clients.size())
|
while(clients.size())
|
||||||
{
|
{
|
||||||
delete clients[0];
|
auto client = clients[0];
|
||||||
|
client->local_broker = nullptr;
|
||||||
|
if (client->cltFlags & MqttClient::CltFlags::CltFlagToDelete)
|
||||||
|
{
|
||||||
|
// std::cout << "Deleting client" << std::endl;
|
||||||
|
delete client;
|
||||||
|
}
|
||||||
|
clients.erase(clients.begin());
|
||||||
}
|
}
|
||||||
delete server;
|
delete server;
|
||||||
}
|
}
|
||||||
@@ -71,7 +78,7 @@ MqttClient::~MqttClient()
|
|||||||
void MqttClient::close(bool bSendDisconnect)
|
void MqttClient::close(bool bSendDisconnect)
|
||||||
{
|
{
|
||||||
debug("close " << id().c_str());
|
debug("close " << id().c_str());
|
||||||
mqtt_connected = false;
|
resetFlag(CltFlagConnected);
|
||||||
if (tcp_client) // connected to a remote broker
|
if (tcp_client) // connected to a remote broker
|
||||||
{
|
{
|
||||||
if (bSendDisconnect and tcp_client->connected())
|
if (bSendDisconnect and tcp_client->connected())
|
||||||
@@ -95,6 +102,7 @@ void MqttClient::connect(MqttBroker* local)
|
|||||||
debug("MqttClient::connect_local");
|
debug("MqttClient::connect_local");
|
||||||
close();
|
close();
|
||||||
local_broker = local;
|
local_broker = local;
|
||||||
|
local_broker->addClient(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttClient::connect(string broker, uint16_t port, uint16_t ka)
|
void MqttClient::connect(string broker, uint16_t port, uint16_t ka)
|
||||||
@@ -163,7 +171,9 @@ void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
|||||||
debug("MqttBroker::onClient");
|
debug("MqttBroker::onClient");
|
||||||
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
||||||
|
|
||||||
broker->addClient(new MqttClient(broker, client));
|
MqttClient* mqtt = new MqttClient(broker, client);
|
||||||
|
mqtt->setFlag(MqttClient::CltFlags::CltFlagToDelete);
|
||||||
|
broker->addClient(mqtt);
|
||||||
debug("New client");
|
debug("New client");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -440,7 +450,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
switch(mesg->type())
|
switch(mesg->type())
|
||||||
{
|
{
|
||||||
case MqttMessage::Type::Connect:
|
case MqttMessage::Type::Connect:
|
||||||
if (mqtt_connected)
|
if (mqtt_connected())
|
||||||
{
|
{
|
||||||
debug("already connected");
|
debug("already connected");
|
||||||
break;
|
break;
|
||||||
@@ -490,7 +500,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl;
|
Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl;
|
||||||
#endif
|
#endif
|
||||||
bclose = false;
|
bclose = false;
|
||||||
mqtt_connected=true;
|
setFlag(CltFlagConnected);
|
||||||
{
|
{
|
||||||
MqttMessage msg(MqttMessage::Type::ConnAck);
|
MqttMessage msg(MqttMessage::Type::ConnAck);
|
||||||
msg.add(0); // Session present (not implemented)
|
msg.add(0); // Session present (not implemented)
|
||||||
@@ -500,14 +510,14 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::ConnAck:
|
case MqttMessage::Type::ConnAck:
|
||||||
mqtt_connected = true;
|
setFlag(CltFlagConnected);
|
||||||
bclose = false;
|
bclose = false;
|
||||||
resubscribe();
|
resubscribe();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::SubAck:
|
case MqttMessage::Type::SubAck:
|
||||||
case MqttMessage::Type::PubAck:
|
case MqttMessage::Type::PubAck:
|
||||||
if (!mqtt_connected) break;
|
if (not mqtt_connected()) break;
|
||||||
// Ignore acks
|
// Ignore acks
|
||||||
bclose = false;
|
bclose = false;
|
||||||
break;
|
break;
|
||||||
@@ -518,7 +528,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::PingReq:
|
case MqttMessage::Type::PingReq:
|
||||||
if (!mqtt_connected) break;
|
if (not mqtt_connected()) break;
|
||||||
if (tcp_client)
|
if (tcp_client)
|
||||||
{
|
{
|
||||||
uint16_t pingreq = MqttMessage::Type::PingResp;
|
uint16_t pingreq = MqttMessage::Type::PingResp;
|
||||||
@@ -535,7 +545,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
case MqttMessage::Type::Subscribe:
|
case MqttMessage::Type::Subscribe:
|
||||||
case MqttMessage::Type::UnSubscribe:
|
case MqttMessage::Type::UnSubscribe:
|
||||||
{
|
{
|
||||||
if (!mqtt_connected) break;
|
if (not mqtt_connected()) break;
|
||||||
payload = header+2;
|
payload = header+2;
|
||||||
|
|
||||||
debug("un/subscribe loop");
|
debug("un/subscribe loop");
|
||||||
@@ -579,15 +589,15 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::UnSuback:
|
case MqttMessage::Type::UnSuback:
|
||||||
if (!mqtt_connected) break;
|
if (not mqtt_connected()) break;
|
||||||
bclose = false;
|
bclose = false;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::Publish:
|
case MqttMessage::Type::Publish:
|
||||||
#if TINY_MQTT_DEBUG
|
#if TINY_MQTT_DEBUG
|
||||||
Console << "publish " << mqtt_connected << '/' << (long) tcp_client << endl;
|
Console << "publish " << mqtt_connected() << '/' << (long) tcp_client << endl;
|
||||||
#endif
|
#endif
|
||||||
if (mqtt_connected or tcp_client == nullptr)
|
if (mqtt_connected() or tcp_client == nullptr)
|
||||||
{
|
{
|
||||||
uint8_t qos = mesg->flags();
|
uint8_t qos = mesg->flags();
|
||||||
payload = header;
|
payload = header;
|
||||||
@@ -628,8 +638,8 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
|
|
||||||
case MqttMessage::Type::Disconnect:
|
case MqttMessage::Type::Disconnect:
|
||||||
// TODO should discard any will msg
|
// TODO should discard any will msg
|
||||||
if (!mqtt_connected) break;
|
if (not mqtt_connected()) break;
|
||||||
mqtt_connected = false;
|
resetFlag(CltFlagConnected);
|
||||||
close(false);
|
close(false);
|
||||||
bclose=false;
|
bclose=false;
|
||||||
break;
|
break;
|
||||||
|
|||||||
@@ -178,6 +178,13 @@ class MqttClient
|
|||||||
FlagCleanSession = 2, // unsupported
|
FlagCleanSession = 2, // unsupported
|
||||||
FlagReserved = 1
|
FlagReserved = 1
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum __attribute__((packed)) CltFlags
|
||||||
|
{
|
||||||
|
CltFlagNone = 0,
|
||||||
|
CltFlagConnected = 1,
|
||||||
|
CltFlagToDelete = 2
|
||||||
|
};
|
||||||
public:
|
public:
|
||||||
|
|
||||||
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
|
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
|
||||||
@@ -272,6 +279,9 @@ class MqttClient
|
|||||||
uint32_t keepAlive() const { return keep_alive; }
|
uint32_t keepAlive() const { return keep_alive; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
bool mqtt_connected() const { return cltFlags & CltFlagConnected; }
|
||||||
|
void setFlag(CltFlags f) { cltFlags |= f; }
|
||||||
|
void resetFlag(CltFlags f) { cltFlags &= ~f; }
|
||||||
|
|
||||||
// event when tcp/ip link established (real or fake)
|
// event when tcp/ip link established (real or fake)
|
||||||
static void onConnect(void * client_ptr, TcpClient*);
|
static void onConnect(void * client_ptr, TcpClient*);
|
||||||
@@ -289,7 +299,7 @@ class MqttClient
|
|||||||
void clientAlive(uint32_t more_seconds);
|
void clientAlive(uint32_t more_seconds);
|
||||||
void processMessage(MqttMessage* message);
|
void processMessage(MqttMessage* message);
|
||||||
|
|
||||||
bool mqtt_connected = false;
|
uint8_t cltFlags = CltFlagNone;
|
||||||
char mqtt_flags;
|
char mqtt_flags;
|
||||||
uint32_t keep_alive = 30;
|
uint32_t keep_alive = 30;
|
||||||
uint32_t alive;
|
uint32_t alive;
|
||||||
|
|||||||
@@ -406,6 +406,20 @@ test(network_hudge_payload)
|
|||||||
assertEqual(strcmp(payload, lastPayload), 0);
|
assertEqual(strcmp(payload, lastPayload), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test(disconnected_when_broker_is_deleted)
|
||||||
|
{
|
||||||
|
MqttBroker* broker = new MqttBroker(1883);
|
||||||
|
broker->begin();
|
||||||
|
|
||||||
|
MqttClient client;
|
||||||
|
client.connect(broker);
|
||||||
|
assertEqual(client.connected(), true);
|
||||||
|
client.publish("a", "b");
|
||||||
|
|
||||||
|
delete broker;
|
||||||
|
assertEqual(client.connected(), false);
|
||||||
|
}
|
||||||
|
|
||||||
test(connack)
|
test(connack)
|
||||||
{
|
{
|
||||||
const bool view = false;
|
const bool view = false;
|
||||||
|
|||||||
Reference in New Issue
Block a user