Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a0435b2cfb | ||
|
|
bda041417d | ||
|
|
baffda8a6d |
@@ -1,5 +1,5 @@
|
|||||||
name=TinyMqtt
|
name=TinyMqtt
|
||||||
version=0.9.11
|
version=0.9.12
|
||||||
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
||||||
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
||||||
sentence=A tiny broker and client library for MQTT messaging.
|
sentence=A tiny broker and client library for MQTT messaging.
|
||||||
|
|||||||
166
src/TinyMqtt.cpp
166
src/TinyMqtt.cpp
@@ -17,7 +17,7 @@ int TinyMqtt::debug=2;
|
|||||||
|
|
||||||
MqttBroker::MqttBroker(uint16_t port)
|
MqttBroker::MqttBroker(uint16_t port)
|
||||||
{
|
{
|
||||||
server.reset(new TcpServer(port));
|
server = new TcpServer(port);
|
||||||
#ifdef TINY_MQTT_ASYNC
|
#ifdef TINY_MQTT_ASYNC
|
||||||
server->onClient(onClient, this);
|
server->onClient(onClient, this);
|
||||||
#endif
|
#endif
|
||||||
@@ -29,29 +29,32 @@ MqttBroker::~MqttBroker()
|
|||||||
{
|
{
|
||||||
delete clients[0];
|
delete clients[0];
|
||||||
}
|
}
|
||||||
|
delete server;
|
||||||
}
|
}
|
||||||
|
|
||||||
// private constructor used by broker only
|
// private constructor used by broker only
|
||||||
MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
|
MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
|
||||||
|
: local_broker(local_broker)
|
||||||
{
|
{
|
||||||
dclass;
|
|
||||||
connect(local_broker);
|
|
||||||
debug("MqttClient private with broker");
|
debug("MqttClient private with broker");
|
||||||
#ifdef TINY_MQTT_ASYNC
|
#ifdef TINY_MQTT_ASYNC
|
||||||
tcp_client.reset(new_client);
|
tcp_client = new_client;
|
||||||
tcp_client->onData(onData, this);
|
tcp_client->onData(onData, this);
|
||||||
// client->onConnect() TODO
|
// client->onConnect() TODO
|
||||||
// client->onDisconnect() TODO
|
// client->onDisconnect() TODO
|
||||||
#else
|
#else
|
||||||
tcp_client.reset(new WiFiClient(*new_client));
|
tcp_client = new WiFiClient(*new_client);
|
||||||
|
#endif
|
||||||
|
#ifdef EPOXY_DUINO
|
||||||
|
alive = millis()+500000;
|
||||||
|
#else
|
||||||
|
alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg
|
||||||
#endif
|
#endif
|
||||||
alive = millis()+5000;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
|
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
|
||||||
: local_broker(local_broker), clientId(id)
|
: local_broker(local_broker), clientId(id)
|
||||||
{
|
{
|
||||||
dclass;
|
|
||||||
alive = 0;
|
alive = 0;
|
||||||
keep_alive = 0;
|
keep_alive = 0;
|
||||||
|
|
||||||
@@ -60,15 +63,15 @@ MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
|
|||||||
|
|
||||||
MqttClient::~MqttClient()
|
MqttClient::~MqttClient()
|
||||||
{
|
{
|
||||||
dtor;
|
|
||||||
close();
|
close();
|
||||||
|
delete tcp_client;
|
||||||
debug("*** MqttClient delete()");
|
debug("*** MqttClient delete()");
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttClient::close(bool bSendDisconnect)
|
void MqttClient::close(bool bSendDisconnect)
|
||||||
{
|
{
|
||||||
debug("close " << id().c_str());
|
debug("close " << id().c_str());
|
||||||
mqtt_flags &= ~FlagConnected;
|
mqtt_connected = false;
|
||||||
if (tcp_client) // connected to a remote broker
|
if (tcp_client) // connected to a remote broker
|
||||||
{
|
{
|
||||||
if (bSendDisconnect and tcp_client->connected())
|
if (bSendDisconnect and tcp_client->connected())
|
||||||
@@ -90,10 +93,8 @@ void MqttClient::close(bool bSendDisconnect)
|
|||||||
void MqttClient::connect(MqttBroker* local)
|
void MqttClient::connect(MqttBroker* local)
|
||||||
{
|
{
|
||||||
debug("MqttClient::connect_local");
|
debug("MqttClient::connect_local");
|
||||||
alive = 0;
|
|
||||||
close();
|
close();
|
||||||
local_broker = local;
|
local_broker = local;
|
||||||
clientAlive();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
||||||
@@ -101,7 +102,8 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
|||||||
debug("MqttClient::connect_to_host " << broker << ':' << port);
|
debug("MqttClient::connect_to_host " << broker << ':' << port);
|
||||||
keep_alive = ka;
|
keep_alive = ka;
|
||||||
close();
|
close();
|
||||||
tcp_client.reset(new TcpClient);
|
if (tcp_client) delete tcp_client;
|
||||||
|
tcp_client = new TcpClient;
|
||||||
|
|
||||||
#ifdef TINY_MQTT_ASYNC
|
#ifdef TINY_MQTT_ASYNC
|
||||||
tcp_client->onData(onData, this);
|
tcp_client->onData(onData, this);
|
||||||
@@ -111,7 +113,7 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
|||||||
if (tcp_client->connect(broker.c_str(), port))
|
if (tcp_client->connect(broker.c_str(), port))
|
||||||
{
|
{
|
||||||
debug("link established");
|
debug("link established");
|
||||||
onConnect(this, tcp_client.get());
|
onConnect(this, tcp_client);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -120,6 +122,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MqttBroker::addClient(MqttClient* client)
|
||||||
|
{
|
||||||
|
debug("MqttBroker::addClient");
|
||||||
|
clients.push_back(client);
|
||||||
|
}
|
||||||
|
|
||||||
void MqttBroker::connect(const std::string& host, uint16_t port)
|
void MqttBroker::connect(const std::string& host, uint16_t port)
|
||||||
{
|
{
|
||||||
debug("MqttBroker::connect");
|
debug("MqttBroker::connect");
|
||||||
@@ -130,13 +138,24 @@ void MqttBroker::connect(const std::string& host, uint16_t port)
|
|||||||
|
|
||||||
void MqttBroker::removeClient(MqttClient* remove)
|
void MqttBroker::removeClient(MqttClient* remove)
|
||||||
{
|
{
|
||||||
local_clients.erase(remove);
|
debug("removeClient");
|
||||||
for(auto it = clients.begin(); it!=clients.end(); it++)
|
for(auto it=clients.begin(); it!=clients.end(); it++)
|
||||||
if (*it == remove)
|
{
|
||||||
|
auto client=*it;
|
||||||
|
if (client==remove)
|
||||||
{
|
{
|
||||||
|
// TODO if this broker is connected to an external broker
|
||||||
|
// we have to unsubscribe remove's topics.
|
||||||
|
// (but doing this, check that other clients are not subscribed...)
|
||||||
|
// Unless -> we could receive useless messages
|
||||||
|
// -> we are using (memory) one IndexedString plus its string for nothing.
|
||||||
|
debug("Remove " << clients.size());
|
||||||
clients.erase(it);
|
clients.erase(it);
|
||||||
break;
|
debug("Client removed " << clients.size());
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
debug(red << "Error cannot remove client"); // TODO should not occur
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
||||||
@@ -144,14 +163,13 @@ void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
|||||||
debug("MqttBroker::onClient");
|
debug("MqttBroker::onClient");
|
||||||
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
||||||
|
|
||||||
broker->clients.push_back(new MqttClient(broker, client));
|
broker->addClient(new MqttClient(broker, client));
|
||||||
debug("New client");
|
debug("New client");
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttBroker::loop()
|
void MqttBroker::loop()
|
||||||
{
|
{
|
||||||
#ifndef TINY_MQTT_ASYNC
|
#ifndef TINY_MQTT_ASYNC
|
||||||
if (not server) return;
|
|
||||||
WiFiClient client = server->available();
|
WiFiClient client = server->available();
|
||||||
|
|
||||||
if (client)
|
if (client)
|
||||||
@@ -166,12 +184,9 @@ void MqttBroker::loop()
|
|||||||
remote_broker->loop();
|
remote_broker->loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
// keep track on size because loop can remove a client from containers
|
for(size_t i=0; i<clients.size(); i++)
|
||||||
// loop on remote clients (connected through network)
|
|
||||||
auto size = clients.size();
|
|
||||||
for(auto it = clients.begin(); it!=clients.end(); it++)
|
|
||||||
{
|
{
|
||||||
MqttClient* client = *it;
|
MqttClient* client = clients[i];
|
||||||
if (client->connected())
|
if (client->connected())
|
||||||
{
|
{
|
||||||
client->loop();
|
client->loop();
|
||||||
@@ -180,16 +195,9 @@ void MqttBroker::loop()
|
|||||||
{
|
{
|
||||||
debug("Client " << client->id().c_str() << " Disconnected, local_broker=" << (dbg_ptr)client->local_broker);
|
debug("Client " << client->id().c_str() << " Disconnected, local_broker=" << (dbg_ptr)client->local_broker);
|
||||||
// Note: deleting a client not added by the broker itself will probably crash later.
|
// Note: deleting a client not added by the broker itself will probably crash later.
|
||||||
|
delete client;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
if (size != clients.size()) break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// loop on local clients (on same device as the broker's)
|
|
||||||
size = local_clients.size();
|
|
||||||
for(auto& client: local_clients)
|
|
||||||
{
|
|
||||||
client->loop();
|
|
||||||
if (local_clients.size() != size) break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -205,28 +213,40 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
|
|||||||
|
|
||||||
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const
|
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const
|
||||||
{
|
{
|
||||||
MqttError retval = MqttOk; // TODO here retval is badly computed
|
MqttError retval = MqttOk;
|
||||||
|
|
||||||
debug("MqttBroker::publish");
|
debug("MqttBroker::publish");
|
||||||
|
int i=0;
|
||||||
|
for(auto client: clients)
|
||||||
|
{
|
||||||
|
i++;
|
||||||
|
#if TINY_MQTT_DEBUG
|
||||||
|
Console << __LINE__ << " broker:" << (remote_broker && remote_broker->connected() ? "linked" : "alone") <<
|
||||||
|
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
|
||||||
|
#endif
|
||||||
|
bool doit = false;
|
||||||
|
if (remote_broker && remote_broker->connected()) // this (MqttBroker) is connected (to a external broker)
|
||||||
|
{
|
||||||
|
// ext_broker -> clients or clients -> ext_broker
|
||||||
|
if (source == remote_broker) // external broker -> internal clients
|
||||||
|
doit = true;
|
||||||
|
else // external clients -> this broker
|
||||||
|
{
|
||||||
|
// As this broker is connected to another broker, simply forward the msg
|
||||||
|
MqttError ret = remote_broker->publishIfSubscribed(topic, msg);
|
||||||
|
if (ret != MqttOk) retval = ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else // Disconnected
|
||||||
|
{
|
||||||
|
doit = true;
|
||||||
|
}
|
||||||
|
#if TINY_MQTT_DEBUG
|
||||||
|
Console << ", doit=" << doit << ' ';
|
||||||
|
#endif
|
||||||
|
|
||||||
if (remote_broker == nullptr or source == remote_broker) // external broker -> internal clients
|
if (doit) retval = client->publishIfSubscribed(topic, msg);
|
||||||
{
|
debug("");
|
||||||
for(auto& client: clients)
|
|
||||||
{
|
|
||||||
retval = client->publishIfSubscribed(topic, msg);
|
|
||||||
}
|
|
||||||
for(auto& client: local_clients)
|
|
||||||
{
|
|
||||||
retval = client->publishIfSubscribed(topic, msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
if (remote_broker && remote_broker->connected())
|
|
||||||
{
|
|
||||||
MqttError ret = remote_broker->publishIfSubscribed(topic, msg);
|
|
||||||
if (ret != MqttOk) retval = ret;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
@@ -247,12 +267,16 @@ void MqttMessage::getString(const char* &buff, uint16_t& len)
|
|||||||
buff+=2;
|
buff+=2;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttClient::clientAlive()
|
void MqttClient::clientAlive(uint32_t more_seconds)
|
||||||
{
|
{
|
||||||
debug("MqttClient::clientAlive");
|
debug("MqttClient::clientAlive");
|
||||||
if (keep_alive)
|
if (keep_alive)
|
||||||
{
|
{
|
||||||
alive=millis()+1000*(keep_alive+(local_broker ? TINY_MQTT_CLIENT_ALIVE_TOLERANCE : 0));
|
#ifdef EPOXY_DUINO
|
||||||
|
alive=millis()+500000+0*more_seconds;
|
||||||
|
#else
|
||||||
|
alive=millis()+1000*(keep_alive+more_seconds);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
alive=0;
|
alive=0;
|
||||||
@@ -264,7 +288,7 @@ void MqttClient::loop()
|
|||||||
{
|
{
|
||||||
if (local_broker)
|
if (local_broker)
|
||||||
{
|
{
|
||||||
Serial << "timeout client " << clientId << endl;
|
debug(red << "timeout client");
|
||||||
close();
|
close();
|
||||||
debug(red << "closed");
|
debug(red << "closed");
|
||||||
}
|
}
|
||||||
@@ -272,9 +296,8 @@ void MqttClient::loop()
|
|||||||
{
|
{
|
||||||
debug("pingreq");
|
debug("pingreq");
|
||||||
uint16_t pingreq = MqttMessage::Type::PingReq;
|
uint16_t pingreq = MqttMessage::Type::PingReq;
|
||||||
|
|
||||||
tcp_client->write((const char*)(&pingreq), 2);
|
tcp_client->write((const char*)(&pingreq), 2);
|
||||||
clientAlive();
|
clientAlive(0);
|
||||||
|
|
||||||
// TODO when many MqttClient passes through a local broker
|
// TODO when many MqttClient passes through a local broker
|
||||||
// there is no need to send one PingReq per instance.
|
// there is no need to send one PingReq per instance.
|
||||||
@@ -310,7 +333,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
|
|||||||
msg.reset();
|
msg.reset();
|
||||||
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker);
|
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker);
|
||||||
|
|
||||||
mqtt->clientAlive();
|
mqtt->clientAlive(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef TINY_MQTT_ASYNC
|
#ifdef TINY_MQTT_ASYNC
|
||||||
@@ -417,14 +440,13 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
switch(mesg->type())
|
switch(mesg->type())
|
||||||
{
|
{
|
||||||
case MqttMessage::Type::Connect:
|
case MqttMessage::Type::Connect:
|
||||||
if (mqtt_flags & FlagConnected)
|
if (mqtt_connected)
|
||||||
{
|
{
|
||||||
debug("already connected");
|
debug("already connected");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
payload = header+10;
|
payload = header+10;
|
||||||
// Todo should check that reserved == 0 (spec)
|
mqtt_flags = header[7];
|
||||||
mqtt_flags = header[7] & ~FlagConnected;
|
|
||||||
keep_alive = MqttMessage::getSize(header+8);
|
keep_alive = MqttMessage::getSize(header+8);
|
||||||
if (strncmp("MQTT", header+2,4))
|
if (strncmp("MQTT", header+2,4))
|
||||||
{
|
{
|
||||||
@@ -468,7 +490,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl;
|
Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl;
|
||||||
#endif
|
#endif
|
||||||
bclose = false;
|
bclose = false;
|
||||||
mqtt_flags |= FlagConnected;
|
mqtt_connected=true;
|
||||||
{
|
{
|
||||||
MqttMessage msg(MqttMessage::Type::ConnAck);
|
MqttMessage msg(MqttMessage::Type::ConnAck);
|
||||||
msg.add(0); // Session present (not implemented)
|
msg.add(0); // Session present (not implemented)
|
||||||
@@ -478,14 +500,14 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::ConnAck:
|
case MqttMessage::Type::ConnAck:
|
||||||
mqtt_flags |= FlagConnected;
|
mqtt_connected = true;
|
||||||
bclose = false;
|
bclose = false;
|
||||||
resubscribe();
|
resubscribe();
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::SubAck:
|
case MqttMessage::Type::SubAck:
|
||||||
case MqttMessage::Type::PubAck:
|
case MqttMessage::Type::PubAck:
|
||||||
if (not (mqtt_flags & FlagConnected)) break;
|
if (!mqtt_connected) break;
|
||||||
// Ignore acks
|
// Ignore acks
|
||||||
bclose = false;
|
bclose = false;
|
||||||
break;
|
break;
|
||||||
@@ -496,7 +518,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::PingReq:
|
case MqttMessage::Type::PingReq:
|
||||||
if (not (mqtt_flags & FlagConnected)) break;
|
if (!mqtt_connected) break;
|
||||||
if (tcp_client)
|
if (tcp_client)
|
||||||
{
|
{
|
||||||
uint16_t pingreq = MqttMessage::Type::PingResp;
|
uint16_t pingreq = MqttMessage::Type::PingResp;
|
||||||
@@ -513,7 +535,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
case MqttMessage::Type::Subscribe:
|
case MqttMessage::Type::Subscribe:
|
||||||
case MqttMessage::Type::UnSubscribe:
|
case MqttMessage::Type::UnSubscribe:
|
||||||
{
|
{
|
||||||
if (not (mqtt_flags & FlagConnected)) break;
|
if (!mqtt_connected) break;
|
||||||
payload = header+2;
|
payload = header+2;
|
||||||
|
|
||||||
debug("un/subscribe loop");
|
debug("un/subscribe loop");
|
||||||
@@ -557,15 +579,15 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::UnSuback:
|
case MqttMessage::Type::UnSuback:
|
||||||
if (not (mqtt_flags & FlagConnected)) break;
|
if (!mqtt_connected) break;
|
||||||
bclose = false;
|
bclose = false;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::Publish:
|
case MqttMessage::Type::Publish:
|
||||||
#if TINY_MQTT_DEBUG
|
#if TINY_MQTT_DEBUG
|
||||||
Console << "publish " << (mqtt_flags & FlagConnected) << '/' << (long) tcp_client.get() << endl;
|
Console << "publish " << mqtt_connected << '/' << (long) tcp_client << endl;
|
||||||
#endif
|
#endif
|
||||||
if ((mqtt_flags & FlagConnected) or tcp_client == nullptr)
|
if (mqtt_connected or tcp_client == nullptr)
|
||||||
{
|
{
|
||||||
uint8_t qos = mesg->flags();
|
uint8_t qos = mesg->flags();
|
||||||
payload = header;
|
payload = header;
|
||||||
@@ -606,8 +628,8 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
|
|
||||||
case MqttMessage::Type::Disconnect:
|
case MqttMessage::Type::Disconnect:
|
||||||
// TODO should discard any will msg
|
// TODO should discard any will msg
|
||||||
if (not (mqtt_flags & FlagConnected)) break;
|
if (!mqtt_connected) break;
|
||||||
mqtt_flags &= ~FlagConnected;
|
mqtt_connected = false;
|
||||||
close(false);
|
close(false);
|
||||||
bclose=false;
|
bclose=false;
|
||||||
break;
|
break;
|
||||||
@@ -628,7 +650,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
clientAlive();
|
clientAlive(local_broker ? 5 : 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,10 +4,6 @@
|
|||||||
#ifndef TINY_MQTT_DEBUG
|
#ifndef TINY_MQTT_DEBUG
|
||||||
#define TINY_MQTT_DEBUG 0
|
#define TINY_MQTT_DEBUG 0
|
||||||
#endif
|
#endif
|
||||||
#ifndef TINY_MQTT_DEFAULT_ALIVE
|
|
||||||
#define TINY_MQTT_DEFAULT_ALIVE 10
|
|
||||||
#endif
|
|
||||||
#define TINY_MQTT_CLIENT_ALIVE_TOLERANCE 5
|
|
||||||
|
|
||||||
// TODO Should add a AUnit with both TINY_MQTT_ASYNC and not TINY_MQTT_ASYNC
|
// TODO Should add a AUnit with both TINY_MQTT_ASYNC and not TINY_MQTT_ASYNC
|
||||||
// #define TINY_MQTT_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
|
// #define TINY_MQTT_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
|
||||||
@@ -57,15 +53,6 @@
|
|||||||
#define debug(what) {}
|
#define debug(what) {}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <TinyConsole.h>
|
|
||||||
#if 0
|
|
||||||
#define dclass { Console << __LINE__ << ':' << __PRETTY_FUNCTION__ << ", this=" << (long)this << endl; }
|
|
||||||
#define dtor { Console << __LINE__ << ": ~" << __PRETTY_FUNCTION__ << ", this=" << (long)this << endl; }
|
|
||||||
#else
|
|
||||||
#define dclass
|
|
||||||
#define dtor
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef TINY_MQTT_ASYNC
|
#ifdef TINY_MQTT_ASYNC
|
||||||
using TcpClient = AsyncClient;
|
using TcpClient = AsyncClient;
|
||||||
using TcpServer = AsyncServer;
|
using TcpServer = AsyncServer;
|
||||||
@@ -186,9 +173,7 @@ class MqttClient
|
|||||||
FlagWillQos = 16 | 8, // unsupported
|
FlagWillQos = 16 | 8, // unsupported
|
||||||
FlagWill = 4, // unsupported
|
FlagWill = 4, // unsupported
|
||||||
FlagCleanSession = 2, // unsupported
|
FlagCleanSession = 2, // unsupported
|
||||||
|
FlagReserved = 1
|
||||||
FlagReserved = 1, // use reserved as connected (save 1 byte)
|
|
||||||
FlagConnected = 1
|
|
||||||
};
|
};
|
||||||
public:
|
public:
|
||||||
|
|
||||||
@@ -197,12 +182,12 @@ class MqttClient
|
|||||||
/** Constructor. Broker is the adress of a local broker if not null
|
/** Constructor. Broker is the adress of a local broker if not null
|
||||||
If you want to connect elsewhere, leave broker null and use connect() **/
|
If you want to connect elsewhere, leave broker null and use connect() **/
|
||||||
MqttClient(MqttBroker* broker = nullptr, const std::string& id = TINY_MQTT_DEFAULT_CLIENT_ID);
|
MqttClient(MqttBroker* broker = nullptr, const std::string& id = TINY_MQTT_DEFAULT_CLIENT_ID);
|
||||||
MqttClient(const std::string& id) : MqttClient(nullptr, id){ dclass; }
|
MqttClient(const std::string& id) : MqttClient(nullptr, id){}
|
||||||
|
|
||||||
~MqttClient();
|
~MqttClient();
|
||||||
|
|
||||||
void connect(MqttBroker* local_broker);
|
void connect(MqttBroker* local_broker);
|
||||||
void connect(std::string broker, uint16_t port, uint16_t keep_alive = TINY_MQTT_DEFAULT_ALIVE);
|
void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10);
|
||||||
|
|
||||||
// TODO it seems that connected returns true in tcp mode even if
|
// TODO it seems that connected returns true in tcp mode even if
|
||||||
// no negociation occurred
|
// no negociation occurred
|
||||||
@@ -298,22 +283,21 @@ class MqttClient
|
|||||||
// republish a received publish if topic matches any in subscriptions
|
// republish a received publish if topic matches any in subscriptions
|
||||||
MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg);
|
MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg);
|
||||||
|
|
||||||
void clientAlive();
|
void clientAlive(uint32_t more_seconds);
|
||||||
void processMessage(MqttMessage* message);
|
void processMessage(MqttMessage* message);
|
||||||
|
|
||||||
char mqtt_flags = 0;
|
bool mqtt_connected = false;
|
||||||
uint16_t keep_alive = 30;
|
char mqtt_flags;
|
||||||
// for client connected to remote broker, PingReq is sent when millis() >= alive
|
uint32_t keep_alive = 30;
|
||||||
// for a client managed by a broker, disconnect it if millis() >= alive
|
uint32_t alive;
|
||||||
uint32_t alive; // PingReq if millis() > alive,
|
|
||||||
MqttMessage message;
|
MqttMessage message;
|
||||||
|
|
||||||
// connection to local broker, or link to the parent
|
// connection to local broker, or link to the parent
|
||||||
// when MqttBroker uses MqttClient for each external connexion
|
// when MqttBroker uses MqttClient for each external connexion
|
||||||
MqttBroker* local_broker=nullptr;
|
MqttBroker* local_broker=nullptr;
|
||||||
|
|
||||||
std::unique_ptr<TcpClient> tcp_client; // connection to remote broker
|
TcpClient* tcp_client=nullptr; // connection to remote broker
|
||||||
std::set<Topic> subscriptions;
|
std::set<Topic> subscriptions;
|
||||||
std::string clientId;
|
std::string clientId;
|
||||||
CallBack callback = nullptr;
|
CallBack callback = nullptr;
|
||||||
};
|
};
|
||||||
@@ -331,7 +315,7 @@ class MqttBroker
|
|||||||
MqttBroker(uint16_t port);
|
MqttBroker(uint16_t port);
|
||||||
~MqttBroker();
|
~MqttBroker();
|
||||||
|
|
||||||
void begin() { if (server) server->begin(); }
|
void begin() { server->begin(); }
|
||||||
void loop();
|
void loop();
|
||||||
|
|
||||||
void connect(const std::string& host, uint16_t port=1883);
|
void connect(const std::string& host, uint16_t port=1883);
|
||||||
@@ -341,13 +325,11 @@ class MqttBroker
|
|||||||
|
|
||||||
void dump(std::string indent="")
|
void dump(std::string indent="")
|
||||||
{
|
{
|
||||||
for(auto& client: clients)
|
for(auto client: clients)
|
||||||
client->dump(indent);
|
client->dump(indent);
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t localClientsCount() const { return local_clients.size(); }
|
const std::vector<MqttClient*> getClients() const { return clients; }
|
||||||
using Clients = std::vector<MqttClient*>;
|
|
||||||
const Clients& getClients() const { return clients; }
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class MqttClient;
|
friend class MqttClient;
|
||||||
@@ -365,15 +347,14 @@ class MqttBroker
|
|||||||
MqttError subscribe(const Topic& topic, uint8_t qos);
|
MqttError subscribe(const Topic& topic, uint8_t qos);
|
||||||
|
|
||||||
// For clients that are added not by the broker itself (local clients)
|
// For clients that are added not by the broker itself (local clients)
|
||||||
void addClient(MqttClient* local) { local_clients.insert(local); }
|
void addClient(MqttClient* client);
|
||||||
void removeClient(MqttClient* client);
|
void removeClient(MqttClient* client);
|
||||||
|
|
||||||
bool compareString(const char* good, const char* str, uint8_t str_len) const;
|
bool compareString(const char* good, const char* str, uint8_t str_len) const;
|
||||||
Clients clients;
|
std::vector<MqttClient*> clients;
|
||||||
std::set<MqttClient*> local_clients;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::unique_ptr<TcpServer> server;
|
TcpServer* server = nullptr;
|
||||||
|
|
||||||
const char* auth_user = "guest";
|
const char* auth_user = "guest";
|
||||||
const char* auth_password = "guest";
|
const char* auth_password = "guest";
|
||||||
|
|||||||
@@ -1,30 +1,28 @@
|
|||||||
SUB=
|
|
||||||
|
|
||||||
tests:
|
tests:
|
||||||
@set -e; \
|
set -e; \
|
||||||
for i in ${SUB}*-tests/Makefile; do \
|
for i in *-tests/Makefile; do \
|
||||||
echo '==== Making:' $$(dirname $$i); \
|
echo '==== Making:' $$(dirname $$i); \
|
||||||
$(MAKE) -C $$(dirname $$i) -j; \
|
$(MAKE) -C $$(dirname $$i) -j; \
|
||||||
done
|
done
|
||||||
|
|
||||||
debugtest:
|
debugtest:
|
||||||
@set -e; \
|
set -e; \
|
||||||
$(MAKE) clean; \
|
$(MAKE) clean; \
|
||||||
$(MAKE) -C debug-mode -j; \
|
$(MAKE) -C debug-mode -j; \
|
||||||
debug-mode/debug-tests.out
|
debug-mode/debug-tests.out
|
||||||
|
|
||||||
runtests: debugtest
|
runtests: debugtest
|
||||||
@$(MAKE) clean
|
$(MAKE) clean
|
||||||
@$(MAKE) tests
|
$(MAKE) tests
|
||||||
@set -e; \
|
set -e; \
|
||||||
for i in ${SUB}*-tests/Makefile; do \
|
for i in *-tests/Makefile; do \
|
||||||
echo '==== Running:' $$(dirname $$i); \
|
echo '==== Running:' $$(dirname $$i); \
|
||||||
$$(dirname $$i)/$$(dirname $$i).out; \
|
$$(dirname $$i)/$$(dirname $$i).out; \
|
||||||
done
|
done
|
||||||
|
|
||||||
clean:
|
clean:
|
||||||
@set -e; \
|
set -e; \
|
||||||
for i in ${SUB}*-tests/Makefile; do \
|
for i in *-tests/Makefile; do \
|
||||||
echo '==== Cleaning:' $$(dirname $$i); \
|
echo '==== Cleaning:' $$(dirname $$i); \
|
||||||
$(MAKE) -C $$(dirname $$i) clean; \
|
$(MAKE) -C $$(dirname $$i) clean; \
|
||||||
done
|
done
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_TESTS
|
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_TESTS
|
||||||
|
|
||||||
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
||||||
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics -DEPOXY_TEST
|
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
|
||||||
|
|
||||||
APP_NAME := classbind-tests
|
APP_NAME := classbind-tests
|
||||||
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole
|
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole
|
||||||
|
|||||||
@@ -127,52 +127,8 @@ void reset_and_start_servers(int n, bool early_accept = true)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
test(classbind_two_subscribers_binded_one_sender_wildcard)
|
|
||||||
{
|
|
||||||
EpoxyTest::set_millis(0);
|
|
||||||
reset_and_start_servers(2, true);
|
|
||||||
assertEqual(WiFi.status(), WL_CONNECTED);
|
|
||||||
|
|
||||||
MqttBroker broker(1883);
|
|
||||||
broker.begin();
|
|
||||||
IPAddress ip_broker = WiFi.localIP();
|
|
||||||
|
|
||||||
// We have a 2nd ESP in order to test through wifi (opposed to local)
|
|
||||||
ESP8266WiFiClass::selectInstance(2);
|
|
||||||
MqttClient mqtt_a(&broker, "mqtt_a");
|
|
||||||
MqttClient mqtt_b(&broker, "mqtt_a");
|
|
||||||
MqttClient mqtt_sender(&broker, "sender");
|
|
||||||
|
|
||||||
broker.loop();
|
|
||||||
|
|
||||||
assertTrue(mqtt_a.connected());
|
|
||||||
assertTrue(mqtt_b.connected());
|
|
||||||
|
|
||||||
TestReceiver receiver("receiver");
|
|
||||||
MqttClassBinder<TestReceiver>::onPublish(&mqtt_a, &receiver);
|
|
||||||
MqttClassBinder<TestReceiver>::onPublish(&mqtt_b, &receiver);
|
|
||||||
|
|
||||||
mqtt_a.subscribe("#");
|
|
||||||
mqtt_b.subscribe("#");
|
|
||||||
mqtt_sender.publish("a/b", "ab");
|
|
||||||
|
|
||||||
for (int i =0; i<10; i++)
|
|
||||||
{
|
|
||||||
EpoxyTest::add_millis(100);
|
|
||||||
mqtt_a.loop();
|
|
||||||
mqtt_b.loop();
|
|
||||||
mqtt_sender.loop();
|
|
||||||
broker.loop();
|
|
||||||
}
|
|
||||||
|
|
||||||
assertEqual(TestReceiver::messages["receiver"], 2);
|
|
||||||
assertEqual(unrouted, 0);
|
|
||||||
EpoxyTest::set_real_time();
|
|
||||||
}
|
|
||||||
|
|
||||||
test(classbind_one_client_receives_the_message)
|
test(classbind_one_client_receives_the_message)
|
||||||
{
|
{
|
||||||
EpoxyTest::set_millis(0);
|
|
||||||
reset_and_start_servers(2, true);
|
reset_and_start_servers(2, true);
|
||||||
assertEqual(WiFi.status(), WL_CONNECTED);
|
assertEqual(WiFi.status(), WL_CONNECTED);
|
||||||
|
|
||||||
@@ -182,7 +138,7 @@ test(classbind_one_client_receives_the_message)
|
|||||||
|
|
||||||
// We have a 2nd ESP in order to test through wifi (opposed to local)
|
// We have a 2nd ESP in order to test through wifi (opposed to local)
|
||||||
ESP8266WiFiClass::selectInstance(2);
|
ESP8266WiFiClass::selectInstance(2);
|
||||||
MqttClient client("sender");
|
MqttClient client;
|
||||||
client.connect(ip_broker.toString().c_str(), 1883);
|
client.connect(ip_broker.toString().c_str(), 1883);
|
||||||
broker.loop();
|
broker.loop();
|
||||||
assertTrue(client.connected());
|
assertTrue(client.connected());
|
||||||
@@ -195,14 +151,12 @@ test(classbind_one_client_receives_the_message)
|
|||||||
|
|
||||||
for (int i =0; i<10; i++)
|
for (int i =0; i<10; i++)
|
||||||
{
|
{
|
||||||
EpoxyTest::add_millis(100);
|
|
||||||
client.loop();
|
client.loop();
|
||||||
broker.loop();
|
broker.loop();
|
||||||
}
|
}
|
||||||
|
|
||||||
assertEqual(TestReceiver::messages["receiver"], 1);
|
assertEqual(TestReceiver::messages["receiver"], 1);
|
||||||
assertEqual(unrouted, 0);
|
assertEqual(unrouted, 0);
|
||||||
EpoxyTest::set_real_time();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test(classbind_routes_should_be_empty_when_receiver_goes_out_of_scope)
|
test(classbind_routes_should_be_empty_when_receiver_goes_out_of_scope)
|
||||||
@@ -384,7 +338,7 @@ void setup() {
|
|||||||
while(!Serial);
|
while(!Serial);
|
||||||
*/
|
*/
|
||||||
|
|
||||||
Serial.println("=============[ CLASS BINDER TinyMqtt TESTS ]========================");
|
Serial.println("=============[ FAKE NETWORK TinyMqtt TESTS ]========================");
|
||||||
|
|
||||||
WiFi.mode(WIFI_STA);
|
WiFi.mode(WIFI_STA);
|
||||||
WiFi.begin("network", "password");
|
WiFi.begin("network", "password");
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# See https://github.com/bxparks/EpoxyDuino for documentation about this
|
# See https://github.com/bxparks/EpoxyDuino for documentation about this
|
||||||
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
|
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
|
||||||
|
|
||||||
EXTRA_CXXFLAGS=-g3 -O0
|
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEBUG
|
||||||
|
|
||||||
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
||||||
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
|
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
# See https://github.com/bxparks/EpoxyDuino for documentation about this
|
# See https://github.com/bxparks/EpoxyDuino for documentation about this
|
||||||
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
|
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
|
||||||
|
|
||||||
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEFAULT_ALIVE=1 -DEPOXY_TEST
|
EXTRA_CXXFLAGS=-g3 -O0
|
||||||
|
|
||||||
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
||||||
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
|
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
|
||||||
|
|||||||
@@ -14,6 +14,7 @@
|
|||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
MqttBroker broker(1883);
|
||||||
|
|
||||||
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
||||||
|
|
||||||
@@ -30,55 +31,18 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
|
|||||||
|
|
||||||
test(local_client_should_unregister_when_destroyed)
|
test(local_client_should_unregister_when_destroyed)
|
||||||
{
|
{
|
||||||
MqttBroker broker(1883);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
|
||||||
{
|
{
|
||||||
|
assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected
|
||||||
MqttClient client(&broker);
|
MqttClient client(&broker);
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is now connected
|
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected
|
||||||
}
|
}
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
}
|
|
||||||
|
|
||||||
test(local_client_alive)
|
|
||||||
{
|
|
||||||
EpoxyTest::set_millis(0);
|
|
||||||
MqttBroker broker(1883);
|
|
||||||
MqttClient client(&broker);
|
|
||||||
|
|
||||||
broker.loop();
|
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is now connected
|
|
||||||
|
|
||||||
EpoxyTest::add_millis(TINY_MQTT_DEFAULT_ALIVE*1000/2);
|
|
||||||
broker.loop();
|
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is still connected
|
|
||||||
|
|
||||||
EpoxyTest::add_seconds(TINY_MQTT_DEFAULT_ALIVE*5);
|
|
||||||
broker.loop();
|
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is still connected
|
|
||||||
}
|
|
||||||
|
|
||||||
test(local_wildcard_subscribe)
|
|
||||||
{
|
|
||||||
EpoxyTest::set_millis(0);
|
|
||||||
MqttBroker broker(1883);
|
|
||||||
MqttClient client(&broker, "client");
|
|
||||||
MqttClient sender(&broker, "sender");
|
|
||||||
broker.loop();
|
|
||||||
|
|
||||||
client.subscribe("#");
|
|
||||||
client.subscribe("test");
|
|
||||||
client.setCallback(onPublish);
|
|
||||||
assertEqual(broker.localClientsCount(), (size_t)2);
|
|
||||||
|
|
||||||
sender.publish("test", "value");
|
|
||||||
broker.loop();
|
|
||||||
|
|
||||||
assertEqual(published.size(), (size_t)1); // client has received something
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test(local_client_do_not_disconnect_after_publishing)
|
test(local_client_do_not_disconnect_after_publishing)
|
||||||
{
|
{
|
||||||
EpoxyTest::set_millis(0);
|
set_millis(0);
|
||||||
MqttBroker broker(1883);
|
MqttBroker broker(1883);
|
||||||
MqttClient client(&broker, "client");
|
MqttClient client(&broker, "client");
|
||||||
MqttClient sender(&broker, "sender");
|
MqttClient sender(&broker, "sender");
|
||||||
@@ -87,17 +51,17 @@ test(local_client_do_not_disconnect_after_publishing)
|
|||||||
client.subscribe("#");
|
client.subscribe("#");
|
||||||
client.subscribe("test");
|
client.subscribe("test");
|
||||||
client.setCallback(onPublish);
|
client.setCallback(onPublish);
|
||||||
assertEqual(broker.localClientsCount(), (size_t)2);
|
assertEqual(broker.clientsCount(), (size_t)2);
|
||||||
|
|
||||||
sender.publish("test", "value");
|
sender.publish("test", "value");
|
||||||
broker.loop();
|
broker.loop();
|
||||||
|
|
||||||
EpoxyTest::add_seconds(60);
|
add_seconds(60);
|
||||||
client.loop();
|
client.loop();
|
||||||
sender.loop();
|
sender.loop();
|
||||||
broker.loop();
|
broker.loop();
|
||||||
|
|
||||||
assertEqual(broker.localClientsCount(), (size_t)2);
|
assertEqual(broker.clientsCount(), (size_t)2);
|
||||||
assertEqual(sender.connected(), true);
|
assertEqual(sender.connected(), true);
|
||||||
assertEqual(client.connected(), true);
|
assertEqual(client.connected(), true);
|
||||||
|
|
||||||
@@ -105,10 +69,39 @@ test(local_client_do_not_disconnect_after_publishing)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
test(local_connect)
|
||||||
|
{
|
||||||
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
|
MqttClient client;
|
||||||
|
assertTrue(client.connected());
|
||||||
|
assertEqual(broker.clientsCount(), (size_t)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(local_publish_should_be_dispatched)
|
||||||
|
{
|
||||||
|
published.clear();
|
||||||
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
|
MqttClient subscriber;
|
||||||
|
subscriber.subscribe("a/b");
|
||||||
|
subscriber.subscribe("a/c");
|
||||||
|
subscriber.setCallback(onPublish);
|
||||||
|
|
||||||
|
MqttClient publisher;
|
||||||
|
publisher.publish("a/b");
|
||||||
|
publisher.publish("a/c");
|
||||||
|
publisher.publish("a/c");
|
||||||
|
|
||||||
|
assertEqual(published.size(), (size_t)1); // 1 client has received something
|
||||||
|
assertEqual(published[""]["a/b"], 1);
|
||||||
|
assertEqual(published[""]["a/c"], 2);
|
||||||
|
}
|
||||||
|
|
||||||
test(local_publish_should_be_dispatched_to_local_clients)
|
test(local_publish_should_be_dispatched_to_local_clients)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber_a("A");
|
MqttClient subscriber_a("A");
|
||||||
subscriber_a.setCallback(onPublish);
|
subscriber_a.setCallback(onPublish);
|
||||||
@@ -133,7 +126,7 @@ test(local_publish_should_be_dispatched_to_local_clients)
|
|||||||
test(local_unsubscribe)
|
test(local_unsubscribe)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber;
|
MqttClient subscriber;
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -153,7 +146,7 @@ test(local_unsubscribe)
|
|||||||
test(local_nocallback_when_destroyed)
|
test(local_nocallback_when_destroyed)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient publisher;
|
MqttClient publisher;
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
EXTRA_CXXFLAGS=-g3 -O0
|
EXTRA_CXXFLAGS=-g3 -O0
|
||||||
|
|
||||||
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
# Remove flto flag from EpoxyDuino (too many <optimized out>)
|
||||||
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics -DEPOXY_TEST
|
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
|
||||||
|
|
||||||
APP_NAME := network-tests
|
APP_NAME := network-tests
|
||||||
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole
|
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole
|
||||||
|
|||||||
@@ -9,16 +9,6 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
|
|
||||||
uint32_t getClientKeepAlive(MqttBroker& broker)
|
|
||||||
{
|
|
||||||
if (broker.getClients().size() == 1)
|
|
||||||
for (auto& it : broker.getClients())
|
|
||||||
return it->keepAlive();
|
|
||||||
|
|
||||||
return 9999;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TinyMqtt network unit tests.
|
* TinyMqtt network unit tests.
|
||||||
*
|
*
|
||||||
@@ -152,46 +142,6 @@ test(suback)
|
|||||||
assertEqual(MqttClient::counters[MqttMessage::Type::SubAck], 1);
|
assertEqual(MqttClient::counters[MqttMessage::Type::SubAck], 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
test(network_client_alive)
|
|
||||||
{
|
|
||||||
const uint32_t keep_alive=1;
|
|
||||||
start_servers(2, true);
|
|
||||||
assertEqual(WiFi.status(), WL_CONNECTED);
|
|
||||||
EpoxyTest::set_millis(0); // Enter simulated time
|
|
||||||
|
|
||||||
MqttBroker broker(1883);
|
|
||||||
broker.begin();
|
|
||||||
IPAddress broker_ip = WiFi.localIP();
|
|
||||||
|
|
||||||
ESP8266WiFiClass::selectInstance(2);
|
|
||||||
MqttClient client;
|
|
||||||
client.connect(broker_ip.toString().c_str(), 1883, keep_alive);
|
|
||||||
broker.loop();
|
|
||||||
client.loop();
|
|
||||||
|
|
||||||
assertTrue(broker.clientsCount() == 1);
|
|
||||||
assertTrue(client.connected());
|
|
||||||
|
|
||||||
uint32_t ka = getClientKeepAlive(broker);
|
|
||||||
assertEqual(ka, keep_alive);
|
|
||||||
assertEqual(broker.clientsCount(), (size_t)1);
|
|
||||||
|
|
||||||
// All is going well if we call client.loop()
|
|
||||||
// The client is able to send PingReq to the broker
|
|
||||||
EpoxyTest::add_seconds(keep_alive);
|
|
||||||
client.loop();
|
|
||||||
broker.loop();
|
|
||||||
assertEqual(broker.clientsCount(), (size_t)1);
|
|
||||||
|
|
||||||
// Now simulate that the client is frozen for
|
|
||||||
// a too long time
|
|
||||||
EpoxyTest::add_seconds(TINY_MQTT_CLIENT_ALIVE_TOLERANCE*2);
|
|
||||||
broker.loop();
|
|
||||||
assertEqual(broker.clientsCount(), (size_t)0);
|
|
||||||
|
|
||||||
EpoxyTest::set_real_time();
|
|
||||||
}
|
|
||||||
|
|
||||||
test(network_client_keep_alive_high)
|
test(network_client_keep_alive_high)
|
||||||
{
|
{
|
||||||
const uint32_t keep_alive=1000;
|
const uint32_t keep_alive=1000;
|
||||||
@@ -222,7 +172,7 @@ test(network_client_keep_alive_high)
|
|||||||
uint32_t sz = broker.getClients().size();
|
uint32_t sz = broker.getClients().size();
|
||||||
assertEqual(sz , (uint32_t)1);
|
assertEqual(sz , (uint32_t)1);
|
||||||
|
|
||||||
uint32_t ka = getClientKeepAlive(broker);
|
uint32_t ka = broker.getClients()[0]->keepAlive();
|
||||||
assertEqual(ka, keep_alive);
|
assertEqual(ka, keep_alive);
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -312,14 +262,14 @@ test(network_one_client_one_broker_hudge_publish_and_subscribe_through_network)
|
|||||||
assertEqual((unsigned int)lastLength, (unsigned int)sent.size());
|
assertEqual((unsigned int)lastLength, (unsigned int)sent.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
test(network_local_client_should_unregister_when_destroyed)
|
test(network_client_should_unregister_when_destroyed)
|
||||||
{
|
{
|
||||||
assertEqual(broker.clientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
{
|
{
|
||||||
MqttClient client(&broker);
|
MqttClient client(&broker);
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1);
|
assertEqual(broker.clientsCount(), (size_t)1);
|
||||||
}
|
}
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@@ -332,13 +282,13 @@ test(network_connect)
|
|||||||
|
|
||||||
MqttClient client(&broker);
|
MqttClient client(&broker);
|
||||||
assertTrue(client.connected());
|
assertTrue(client.connected());
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1);
|
assertEqual(broker.clientsCount(), (size_t)1);
|
||||||
}
|
}
|
||||||
|
|
||||||
test(network_publish_should_be_dispatched)
|
test(network_publish_should_be_dispatched)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker);
|
MqttClient subscriber(&broker);
|
||||||
subscriber.subscribe("a/b");
|
subscriber.subscribe("a/b");
|
||||||
@@ -439,12 +389,11 @@ test(network_small_payload)
|
|||||||
|
|
||||||
test(network_hudge_payload)
|
test(network_hudge_payload)
|
||||||
{
|
{
|
||||||
// const char* payload="This payload is hudge, just because its length exceeds 127. Thus when encoding length, we have to encode it on two bytes at min. This should not prevent the message from being encoded and decoded successfully !";
|
const char* payload="This payload is hudge, just because its length exceeds 127. Thus when encoding length, we have to encode it on two bytes at min. This should not prevent the message from being encoded and decoded successfully !";
|
||||||
const char* payload="This was decoded successfully !";
|
|
||||||
|
|
||||||
MqttClient subscriber(&broker);
|
MqttClient subscriber(&broker);
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
subscriber.subscribe("a/b"); // Note -> this does not send any byte .... (nowhere to send) TODO
|
subscriber.subscribe("a/b"); // Note -> this does not send any byte .... (nowhere to send)
|
||||||
|
|
||||||
MqttClient publisher(&broker);
|
MqttClient publisher(&broker);
|
||||||
publisher.publish("a/b", payload); // This publish is received
|
publisher.publish("a/b", payload); // This publish is received
|
||||||
@@ -453,13 +402,11 @@ test(network_hudge_payload)
|
|||||||
assertEqual(payload, lastPayload);
|
assertEqual(payload, lastPayload);
|
||||||
assertEqual(lastLength, strlen(payload));
|
assertEqual(lastLength, strlen(payload));
|
||||||
assertEqual(strcmp(payload, lastPayload), 0);
|
assertEqual(strcmp(payload, lastPayload), 0);
|
||||||
std::cout << "payload : " << payload << std::endl;
|
|
||||||
std::cout << "received: " << lastPayload << std::endl;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test(connack)
|
test(connack)
|
||||||
{
|
{
|
||||||
const bool view = true;
|
const bool view = false;
|
||||||
|
|
||||||
NetworkObserver check(
|
NetworkObserver check(
|
||||||
[this](const WiFiClient*, const uint8_t* buffer, size_t length)
|
[this](const WiFiClient*, const uint8_t* buffer, size_t length)
|
||||||
|
|||||||
@@ -32,27 +32,27 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
|
|||||||
|
|
||||||
test(nowifi_client_should_unregister_when_destroyed)
|
test(nowifi_client_should_unregister_when_destroyed)
|
||||||
{
|
{
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
{
|
{
|
||||||
MqttClient client(&broker);
|
MqttClient client(&broker);
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1);
|
assertEqual(broker.clientsCount(), (size_t)1);
|
||||||
}
|
}
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
}
|
}
|
||||||
|
|
||||||
test(nowifi_connect)
|
test(nowifi_connect)
|
||||||
{
|
{
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient client(&broker);
|
MqttClient client(&broker);
|
||||||
assertTrue(client.connected());
|
assertTrue(client.connected());
|
||||||
assertEqual(broker.localClientsCount(), (size_t)1);
|
assertEqual(broker.clientsCount(), (size_t)1);
|
||||||
}
|
}
|
||||||
|
|
||||||
test(nowifi_publish_should_be_dispatched)
|
test(nowifi_publish_should_be_dispatched)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker);
|
MqttClient subscriber(&broker);
|
||||||
subscriber.subscribe("a/b");
|
subscriber.subscribe("a/b");
|
||||||
@@ -72,7 +72,7 @@ test(nowifi_publish_should_be_dispatched)
|
|||||||
test(nowifi_publish_should_be_dispatched_to_clients)
|
test(nowifi_publish_should_be_dispatched_to_clients)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber_a(&broker, "A");
|
MqttClient subscriber_a(&broker, "A");
|
||||||
subscriber_a.setCallback(onPublish);
|
subscriber_a.setCallback(onPublish);
|
||||||
@@ -97,7 +97,7 @@ test(nowifi_publish_should_be_dispatched_to_clients)
|
|||||||
test(nowifi_subscribe_with_star_wildcard)
|
test(nowifi_subscribe_with_star_wildcard)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker, "A");
|
MqttClient subscriber(&broker, "A");
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -118,7 +118,7 @@ test(nowifi_subscribe_with_star_wildcard)
|
|||||||
test(nowifi_subscribe_with_plus_wildcard)
|
test(nowifi_subscribe_with_plus_wildcard)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker, "A");
|
MqttClient subscriber(&broker, "A");
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -139,7 +139,7 @@ test(nowifi_subscribe_with_plus_wildcard)
|
|||||||
test(nowifi_should_not_receive_sys_msg)
|
test(nowifi_should_not_receive_sys_msg)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker, "A");
|
MqttClient subscriber(&broker, "A");
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -154,7 +154,7 @@ test(nowifi_should_not_receive_sys_msg)
|
|||||||
test(nowifi_subscribe_with_mixed_wildcards)
|
test(nowifi_subscribe_with_mixed_wildcards)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker, "A");
|
MqttClient subscriber(&broker, "A");
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -173,7 +173,7 @@ test(nowifi_subscribe_with_mixed_wildcards)
|
|||||||
test(nowifi_unsubscribe_with_wildcards)
|
test(nowifi_unsubscribe_with_wildcards)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker, "A");
|
MqttClient subscriber(&broker, "A");
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -195,7 +195,7 @@ test(nowifi_unsubscribe_with_wildcards)
|
|||||||
test(nowifi_unsubscribe)
|
test(nowifi_unsubscribe)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient subscriber(&broker);
|
MqttClient subscriber(&broker);
|
||||||
subscriber.setCallback(onPublish);
|
subscriber.setCallback(onPublish);
|
||||||
@@ -215,7 +215,7 @@ test(nowifi_unsubscribe)
|
|||||||
test(nowifi_nocallback_when_destroyed)
|
test(nowifi_nocallback_when_destroyed)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.localClientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
|
|
||||||
MqttClient publisher(&broker);
|
MqttClient publisher(&broker);
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user