Compare commits

...

15 Commits

Author SHA1 Message Date
hsaturn
d8105ee224 EpoxyDuino upgrade 2023-01-04 22:02:00 +01:00
hsaturn
670d67c024 MqttClient - fix local disconnect after pulish + ka 2023-01-03 04:01:17 +01:00
hsaturn
3c3b19882f Merge branch 'main' into alive2 2023-01-02 19:37:39 +01:00
hsaturn
8c55356bd9 Use my fork of EpoxyDuino 2023-01-02 03:09:34 +01:00
hsaturn
82d3b913bb classbind-tests added a test that looks likes the advanced example 2023-01-02 02:58:27 +01:00
hsaturn
8b62b5a3b7 local-tests add topic # local test 2023-01-02 02:40:35 +01:00
hsaturn
3a2db664a8 Split clients in two collections 2023-01-02 02:18:16 +01:00
hsaturn
0c454bfe3a Class Binder typo 2023-01-02 00:39:23 +01:00
hsaturn
d517cf2627 MqttBroker::server is now a unique_ptr 2023-01-02 00:15:14 +01:00
hsaturn
b8022f58a4 tcp_client is now a unique_ptr 2023-01-02 00:10:40 +01:00
Francois BIOT
09e3a3e45f Rename MqttBroker to remote_broker 2022-12-29 13:39:34 +01:00
Francois BIOT
f17ece3376 MqttClient::client renamed to tcp_client 2022-12-29 12:58:08 +01:00
hsaturn
0db07df27b Remove useless comment 2022-12-29 12:54:58 +01:00
hsaturn
292592c3dd Added missing Makefile for unit test of MqttClassBinder 2022-12-29 02:17:54 +01:00
Francois BIOT
1f267c135b fix erroneous sizeof multimap comment 2022-12-29 02:15:18 +01:00
12 changed files with 230 additions and 159 deletions

View File

@@ -18,7 +18,7 @@ jobs:
run: | run: |
cd .. cd ..
git clone https://github.com/hsaturn/TinyConsole git clone https://github.com/hsaturn/TinyConsole
git clone https://github.com/bxparks/EpoxyDuino git clone https://github.com/hsaturn/EpoxyDuino
git clone https://github.com/bxparks/AceRoutine git clone https://github.com/bxparks/AceRoutine
git clone https://github.com/bxparks/AUnit git clone https://github.com/bxparks/AUnit
git clone https://github.com/bxparks/AceCommon git clone https://github.com/bxparks/AceCommon

View File

@@ -17,7 +17,7 @@ int TinyMqtt::debug=2;
MqttBroker::MqttBroker(uint16_t port) MqttBroker::MqttBroker(uint16_t port)
{ {
server = new TcpServer(port); server.reset(new TcpServer(port));
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
server->onClient(onClient, this); server->onClient(onClient, this);
#endif #endif
@@ -29,21 +29,21 @@ MqttBroker::~MqttBroker()
{ {
delete clients[0]; delete clients[0];
} }
delete server;
} }
// private constructor used by broker only // private constructor used by broker only
MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client) MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
{ {
dclass;
connect(local_broker); connect(local_broker);
debug("MqttClient private with broker"); debug("MqttClient private with broker");
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
tcp_client = new_client; tcp_client.reset(new_client);
tcp_client->onData(onData, this); tcp_client->onData(onData, this);
// client->onConnect() TODO // client->onConnect() TODO
// client->onDisconnect() TODO // client->onDisconnect() TODO
#else #else
tcp_client = new WiFiClient(*new_client); tcp_client.reset(new WiFiClient(*new_client));
#endif #endif
alive = millis()+5000; alive = millis()+5000;
} }
@@ -51,15 +51,17 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id) MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
: local_broker(local_broker), clientId(id) : local_broker(local_broker), clientId(id)
{ {
dclass;
alive = 0; alive = 0;
keep_alive = 0;
if (local_broker) local_broker->addClient(this); if (local_broker) local_broker->addClient(this);
} }
MqttClient::~MqttClient() MqttClient::~MqttClient()
{ {
dtor;
close(); close();
delete tcp_client;
debug("*** MqttClient delete()"); debug("*** MqttClient delete()");
} }
@@ -99,8 +101,7 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
debug("MqttClient::connect_to_host " << broker << ':' << port); debug("MqttClient::connect_to_host " << broker << ':' << port);
keep_alive = ka; keep_alive = ka;
close(); close();
if (tcp_client) delete tcp_client; tcp_client.reset(new TcpClient);
tcp_client = new TcpClient;
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
tcp_client->onData(onData, this); tcp_client->onData(onData, this);
@@ -110,7 +111,7 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
if (tcp_client->connect(broker.c_str(), port)) if (tcp_client->connect(broker.c_str(), port))
{ {
debug("link established"); debug("link established");
onConnect(this, tcp_client); onConnect(this, tcp_client.get());
} }
else else
{ {
@@ -119,12 +120,6 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
#endif #endif
} }
void MqttBroker::addClient(MqttClient* client)
{
debug("MqttBroker::addClient");
clients.push_back(client);
}
void MqttBroker::connect(const std::string& host, uint16_t port) void MqttBroker::connect(const std::string& host, uint16_t port)
{ {
debug("MqttBroker::connect"); debug("MqttBroker::connect");
@@ -135,24 +130,13 @@ void MqttBroker::connect(const std::string& host, uint16_t port)
void MqttBroker::removeClient(MqttClient* remove) void MqttBroker::removeClient(MqttClient* remove)
{ {
debug("removeClient"); local_clients.erase(remove);
for(auto it=clients.begin(); it!=clients.end(); it++) for(auto it = clients.begin(); it!=clients.end(); it++)
if (*it == remove)
{ {
auto client=*it;
if (client==remove)
{
// TODO if this broker is connected to an external broker
// we have to unsubscribe remove's topics.
// (but doing this, check that other clients are not subscribed...)
// Unless -> we could receive useless messages
// -> we are using (memory) one IndexedString plus its string for nothing.
debug("Remove " << clients.size());
clients.erase(it); clients.erase(it);
debug("Client removed " << clients.size()); break;
return;
} }
}
debug(red << "Error cannot remove client"); // TODO should not occur
} }
void MqttBroker::onClient(void* broker_ptr, TcpClient* client) void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
@@ -160,13 +144,14 @@ void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
debug("MqttBroker::onClient"); debug("MqttBroker::onClient");
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr); MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client)); broker->clients.push_back(new MqttClient(broker, client));
debug("New client"); debug("New client");
} }
void MqttBroker::loop() void MqttBroker::loop()
{ {
#ifndef TINY_MQTT_ASYNC #ifndef TINY_MQTT_ASYNC
if (not server) return;
WiFiClient client = server->available(); WiFiClient client = server->available();
if (client) if (client)
@@ -181,9 +166,12 @@ void MqttBroker::loop()
remote_broker->loop(); remote_broker->loop();
} }
for(size_t i=0; i<clients.size(); i++) // keep track on size because loop can remove a client from containers
// loop on remote clients (connected through network)
auto size = clients.size();
for(auto it = clients.begin(); it!=clients.end(); it++)
{ {
MqttClient* client = clients[i]; MqttClient* client = *it;
if (client->connected()) if (client->connected())
{ {
client->loop(); client->loop();
@@ -192,9 +180,16 @@ void MqttBroker::loop()
{ {
debug("Client " << client->id().c_str() << " Disconnected, local_broker=" << (dbg_ptr)client->local_broker); debug("Client " << client->id().c_str() << " Disconnected, local_broker=" << (dbg_ptr)client->local_broker);
// Note: deleting a client not added by the broker itself will probably crash later. // Note: deleting a client not added by the broker itself will probably crash later.
delete client;
break;
} }
if (size != clients.size()) break;
}
// loop on local clients (on same device as the broker's)
size = local_clients.size();
for(auto& client: local_clients)
{
client->loop();
if (local_clients.size() != size) break;
} }
} }
@@ -210,41 +205,29 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk; // TODO here retval is badly computed
debug("MqttBroker::publish"); debug("MqttBroker::publish");
int i=0;
for(auto client: clients) if (remote_broker == nullptr or source == remote_broker) // external broker -> internal clients
{ {
i++; for(auto& client: clients)
#if TINY_MQTT_DEBUG
Console << __LINE__ << " broker:" << (remote_broker && remote_broker->connected() ? "linked" : "alone") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false;
if (remote_broker && remote_broker->connected()) // this (MqttBroker) is connected (to a external broker)
{ {
// ext_broker -> clients or clients -> ext_broker retval = client->publishIfSubscribed(topic, msg);
if (source == remote_broker) // external broker -> internal clients }
doit = true; for(auto& client: local_clients)
else // external clients -> this broker {
retval = client->publishIfSubscribed(topic, msg);
}
}
else
{
if (remote_broker && remote_broker->connected())
{ {
// As this broker is connected to another broker, simply forward the msg
MqttError ret = remote_broker->publishIfSubscribed(topic, msg); MqttError ret = remote_broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret; if (ret != MqttOk) retval = ret;
} }
} }
else // Disconnected
{
doit = true;
}
#if TINY_MQTT_DEBUG
Console << ", doit=" << doit << ' ';
#endif
if (doit) retval = client->publishIfSubscribed(topic, msg);
debug("");
}
return retval; return retval;
} }
@@ -277,7 +260,7 @@ void MqttClient::clientAlive()
void MqttClient::loop() void MqttClient::loop()
{ {
if (alive && (millis() >= alive)) if (keep_alive && (millis() >= alive))
{ {
if (local_broker) if (local_broker)
{ {
@@ -580,7 +563,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
#if TINY_MQTT_DEBUG #if TINY_MQTT_DEBUG
Console << "publish " << (mqtt_flags & FlagConnected) << '/' << (long) tcp_client << endl; Console << "publish " << (mqtt_flags & FlagConnected) << '/' << (long) tcp_client.get() << endl;
#endif #endif
if ((mqtt_flags & FlagConnected) or tcp_client == nullptr) if ((mqtt_flags & FlagConnected) or tcp_client == nullptr)
{ {

View File

@@ -57,6 +57,15 @@
#define debug(what) {} #define debug(what) {}
#endif #endif
#include <TinyConsole.h>
#if 0
#define dclass { Console << __LINE__ << ':' << __PRETTY_FUNCTION__ << ", this=" << (long)this << endl; }
#define dtor { Console << __LINE__ << ": ~" << __PRETTY_FUNCTION__ << ", this=" << (long)this << endl; }
#else
#define dclass
#define dtor
#endif
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
using TcpClient = AsyncClient; using TcpClient = AsyncClient;
using TcpServer = AsyncServer; using TcpServer = AsyncServer;
@@ -188,7 +197,7 @@ class MqttClient
/** Constructor. Broker is the adress of a local broker if not null /** Constructor. Broker is the adress of a local broker if not null
If you want to connect elsewhere, leave broker null and use connect() **/ If you want to connect elsewhere, leave broker null and use connect() **/
MqttClient(MqttBroker* broker = nullptr, const std::string& id = TINY_MQTT_DEFAULT_CLIENT_ID); MqttClient(MqttBroker* broker = nullptr, const std::string& id = TINY_MQTT_DEFAULT_CLIENT_ID);
MqttClient(const std::string& id) : MqttClient(nullptr, id){} MqttClient(const std::string& id) : MqttClient(nullptr, id){ dclass; }
~MqttClient(); ~MqttClient();
@@ -303,7 +312,7 @@ class MqttClient
// when MqttBroker uses MqttClient for each external connexion // when MqttBroker uses MqttClient for each external connexion
MqttBroker* local_broker=nullptr; MqttBroker* local_broker=nullptr;
TcpClient* tcp_client=nullptr; // connection to remote broker std::unique_ptr<TcpClient> tcp_client; // connection to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
@@ -322,7 +331,7 @@ class MqttBroker
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
~MqttBroker(); ~MqttBroker();
void begin() { server->begin(); } void begin() { if (server) server->begin(); }
void loop(); void loop();
void connect(const std::string& host, uint16_t port=1883); void connect(const std::string& host, uint16_t port=1883);
@@ -332,11 +341,13 @@ class MqttBroker
void dump(std::string indent="") void dump(std::string indent="")
{ {
for(auto client: clients) for(auto& client: clients)
client->dump(indent); client->dump(indent);
} }
const std::vector<MqttClient*> getClients() const { return clients; } size_t localClientsCount() const { return local_clients.size(); }
using Clients = std::vector<MqttClient*>;
const Clients& getClients() const { return clients; }
private: private:
friend class MqttClient; friend class MqttClient;
@@ -354,14 +365,15 @@ class MqttBroker
MqttError subscribe(const Topic& topic, uint8_t qos); MqttError subscribe(const Topic& topic, uint8_t qos);
// For clients that are added not by the broker itself (local clients) // For clients that are added not by the broker itself (local clients)
void addClient(MqttClient* client); void addClient(MqttClient* local) { local_clients.insert(local); }
void removeClient(MqttClient* client); void removeClient(MqttClient* client);
bool compareString(const char* good, const char* str, uint8_t str_len) const; bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients; Clients clients;
std::set<MqttClient*> local_clients;
private: private:
TcpServer* server = nullptr; std::unique_ptr<TcpServer> server;
const char* auth_user = "guest"; const char* auth_user = "guest";
const char* auth_password = "guest"; const char* auth_password = "guest";

View File

@@ -1,29 +1,29 @@
SUB= SUB=
tests: tests:
set -e; \ @set -e; \
for i in ${SUB}*-tests/Makefile; do \ for i in ${SUB}*-tests/Makefile; do \
echo '==== Making:' $$(dirname $$i); \ echo '==== Making:' $$(dirname $$i); \
$(MAKE) -C $$(dirname $$i) -j; \ $(MAKE) -C $$(dirname $$i) -j; \
done done
debugtest: debugtest:
set -e; \ @set -e; \
$(MAKE) clean; \ $(MAKE) clean; \
$(MAKE) -C debug-mode -j; \ $(MAKE) -C debug-mode -j; \
debug-mode/debug-tests.out debug-mode/debug-tests.out
runtests: debugtest runtests: debugtest
$(MAKE) clean @$(MAKE) clean
$(MAKE) tests @$(MAKE) tests
set -e; \ @set -e; \
for i in ${SUB}*-tests/Makefile; do \ for i in ${SUB}*-tests/Makefile; do \
echo '==== Running:' $$(dirname $$i); \ echo '==== Running:' $$(dirname $$i); \
$$(dirname $$i)/$$(dirname $$i).out; \ $$(dirname $$i)/$$(dirname $$i).out; \
done done
clean: clean:
set -e; \ @set -e; \
for i in ${SUB}*-tests/Makefile; do \ for i in ${SUB}*-tests/Makefile; do \
echo '==== Cleaning:' $$(dirname $$i); \ echo '==== Cleaning:' $$(dirname $$i); \
$(MAKE) -C $$(dirname $$i) clean; \ $(MAKE) -C $$(dirname $$i) clean; \

View File

@@ -4,7 +4,7 @@
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_TESTS EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_TESTS
# Remove flto flag from EpoxyDuino (too many <optimized out>) # Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics -DEPOXY_TEST
APP_NAME := classbind-tests APP_NAME := classbind-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole

View File

@@ -127,8 +127,9 @@ void reset_and_start_servers(int n, bool early_accept = true)
} }
} }
test(classbind_one_client_receives_the_message) test(classbind_two_subscribers_binded_one_sender_wildcard)
{ {
EpoxyTest::set_millis(0);
reset_and_start_servers(2, true); reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED); assertEqual(WiFi.status(), WL_CONNECTED);
@@ -138,7 +139,50 @@ test(classbind_one_client_receives_the_message)
// We have a 2nd ESP in order to test through wifi (opposed to local) // We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2); ESP8266WiFiClass::selectInstance(2);
MqttClient client; MqttClient mqtt_a(&broker, "mqtt_a");
MqttClient mqtt_b(&broker, "mqtt_a");
MqttClient mqtt_sender(&broker, "sender");
broker.loop();
assertTrue(mqtt_a.connected());
assertTrue(mqtt_b.connected());
TestReceiver receiver("receiver");
MqttClassBinder<TestReceiver>::onPublish(&mqtt_a, &receiver);
MqttClassBinder<TestReceiver>::onPublish(&mqtt_b, &receiver);
mqtt_a.subscribe("#");
mqtt_b.subscribe("#");
mqtt_sender.publish("a/b", "ab");
for (int i =0; i<10; i++)
{
EpoxyTest::add_millis(100);
mqtt_a.loop();
mqtt_b.loop();
mqtt_sender.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver"], 2);
assertEqual(unrouted, 0);
EpoxyTest::set_real_time();
}
test(classbind_one_client_receives_the_message)
{
EpoxyTest::set_millis(0);
reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress ip_broker = WiFi.localIP();
// We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2);
MqttClient client("sender");
client.connect(ip_broker.toString().c_str(), 1883); client.connect(ip_broker.toString().c_str(), 1883);
broker.loop(); broker.loop();
assertTrue(client.connected()); assertTrue(client.connected());
@@ -151,12 +195,14 @@ test(classbind_one_client_receives_the_message)
for (int i =0; i<10; i++) for (int i =0; i<10; i++)
{ {
EpoxyTest::add_millis(100);
client.loop(); client.loop();
broker.loop(); broker.loop();
} }
assertEqual(TestReceiver::messages["receiver"], 1); assertEqual(TestReceiver::messages["receiver"], 1);
assertEqual(unrouted, 0); assertEqual(unrouted, 0);
EpoxyTest::set_real_time();
} }
test(classbind_routes_should_be_empty_when_receiver_goes_out_of_scope) test(classbind_routes_should_be_empty_when_receiver_goes_out_of_scope)
@@ -338,7 +384,7 @@ void setup() {
while(!Serial); while(!Serial);
*/ */
Serial.println("=============[ FAKE NETWORK TinyMqtt TESTS ]========================"); Serial.println("=============[ CLASS BINDER TinyMqtt TESTS ]========================");
WiFi.mode(WIFI_STA); WiFi.mode(WIFI_STA);
WiFi.begin("network", "password"); WiFi.begin("network", "password");

View File

@@ -1,7 +1,7 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this # See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS. # Makefile to compile and run Arduino programs natively on Linux or MacOS.
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEBUG EXTRA_CXXFLAGS=-g3 -O0
# Remove flto flag from EpoxyDuino (too many <optimized out>) # Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics

View File

@@ -1,7 +1,7 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this # See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS. # Makefile to compile and run Arduino programs natively on Linux or MacOS.
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEFAULT_ALIVE=1 EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_DEFAULT_ALIVE=1 -DEPOXY_TEST
# Remove flto flag from EpoxyDuino (too many <optimized out>) # Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics

View File

@@ -31,67 +31,84 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
test(local_client_should_unregister_when_destroyed) test(local_client_should_unregister_when_destroyed)
{ {
MqttBroker broker(1883); MqttBroker broker(1883);
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
{ {
assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected
MqttClient client(&broker); MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is now connected
} }
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
} }
test(local_client_alive) test(local_client_alive)
{ {
set_millis(0); EpoxyTest::set_millis(0);
MqttBroker broker(1883); MqttBroker broker(1883);
MqttClient client(&broker); MqttClient client(&broker);
broker.loop(); broker.loop();
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is now connected
add_millis(TINY_MQTT_DEFAULT_ALIVE*1000/2); EpoxyTest::add_millis(TINY_MQTT_DEFAULT_ALIVE*1000/2);
broker.loop(); broker.loop();
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is still connected assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is still connected
add_seconds(TINY_MQTT_DEFAULT_ALIVE*5); EpoxyTest::add_seconds(TINY_MQTT_DEFAULT_ALIVE*5);
broker.loop(); broker.loop();
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is still connected assertEqual(broker.localClientsCount(), (size_t)1); // Ensure client is still connected
}
test(local_wildcard_subscribe)
{
EpoxyTest::set_millis(0);
MqttBroker broker(1883);
MqttClient client(&broker, "client");
MqttClient sender(&broker, "sender");
broker.loop();
client.subscribe("#");
client.subscribe("test");
client.setCallback(onPublish);
assertEqual(broker.localClientsCount(), (size_t)2);
sender.publish("test", "value");
broker.loop();
assertEqual(published.size(), (size_t)1); // client has received something
}
test(local_client_do_not_disconnect_after_publishing)
{
EpoxyTest::set_millis(0);
MqttBroker broker(1883);
MqttClient client(&broker, "client");
MqttClient sender(&broker, "sender");
broker.loop();
client.subscribe("#");
client.subscribe("test");
client.setCallback(onPublish);
assertEqual(broker.localClientsCount(), (size_t)2);
sender.publish("test", "value");
broker.loop();
EpoxyTest::add_seconds(60);
client.loop();
sender.loop();
broker.loop();
assertEqual(broker.localClientsCount(), (size_t)2);
assertEqual(sender.connected(), true);
assertEqual(client.connected(), true);
assertEqual(published.size(), (size_t)1); // client has received something
} }
#if 0 #if 0
test(local_connect)
{
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient client;
assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1);
}
test(local_publish_should_be_dispatched)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber;
subscriber.subscribe("a/b");
subscriber.subscribe("a/c");
subscriber.setCallback(onPublish);
MqttClient publisher;
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something
assertEqual(published[""]["a/b"], 1);
assertEqual(published[""]["a/c"], 2);
}
test(local_publish_should_be_dispatched_to_local_clients) test(local_publish_should_be_dispatched_to_local_clients)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber_a("A"); MqttClient subscriber_a("A");
subscriber_a.setCallback(onPublish); subscriber_a.setCallback(onPublish);
@@ -116,7 +133,7 @@ test(local_publish_should_be_dispatched_to_local_clients)
test(local_unsubscribe) test(local_unsubscribe)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber; MqttClient subscriber;
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -136,7 +153,7 @@ test(local_unsubscribe)
test(local_nocallback_when_destroyed) test(local_nocallback_when_destroyed)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient publisher; MqttClient publisher;
{ {

View File

@@ -4,7 +4,7 @@
EXTRA_CXXFLAGS=-g3 -O0 EXTRA_CXXFLAGS=-g3 -O0
# Remove flto flag from EpoxyDuino (too many <optimized out>) # Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics -DEPOXY_TEST
APP_NAME := network-tests APP_NAME := network-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole

View File

@@ -9,6 +9,16 @@
#include <string> #include <string>
#include <iostream> #include <iostream>
uint32_t getClientKeepAlive(MqttBroker& broker)
{
if (broker.getClients().size() == 1)
for (auto& it : broker.getClients())
return it->keepAlive();
return 9999;
}
/** /**
* TinyMqtt network unit tests. * TinyMqtt network unit tests.
* *
@@ -147,7 +157,7 @@ test(network_client_alive)
const uint32_t keep_alive=1; const uint32_t keep_alive=1;
start_servers(2, true); start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED); assertEqual(WiFi.status(), WL_CONNECTED);
set_millis(0); // Enter simulated time EpoxyTest::set_millis(0); // Enter simulated time
MqttBroker broker(1883); MqttBroker broker(1883);
broker.begin(); broker.begin();
@@ -162,24 +172,24 @@ test(network_client_alive)
assertTrue(broker.clientsCount() == 1); assertTrue(broker.clientsCount() == 1);
assertTrue(client.connected()); assertTrue(client.connected());
uint32_t ka = broker.getClients()[0]->keepAlive(); uint32_t ka = getClientKeepAlive(broker);
assertEqual(ka, keep_alive); assertEqual(ka, keep_alive);
assertEqual(broker.clientsCount(), (size_t)1); assertEqual(broker.clientsCount(), (size_t)1);
// All is going well if we call client.loop() // All is going well if we call client.loop()
// The client is able to send PingReq to the broker // The client is able to send PingReq to the broker
add_seconds(keep_alive); EpoxyTest::add_seconds(keep_alive);
client.loop(); client.loop();
broker.loop(); broker.loop();
assertEqual(broker.clientsCount(), (size_t)1); assertEqual(broker.clientsCount(), (size_t)1);
// Now simulate that the client is frozen for // Now simulate that the client is frozen for
// a too long time // a too long time
add_seconds(TINY_MQTT_CLIENT_ALIVE_TOLERANCE*2); EpoxyTest::add_seconds(TINY_MQTT_CLIENT_ALIVE_TOLERANCE*2);
broker.loop(); broker.loop();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
set_real_time(); EpoxyTest::set_real_time();
} }
test(network_client_keep_alive_high) test(network_client_keep_alive_high)
@@ -212,7 +222,7 @@ test(network_client_keep_alive_high)
uint32_t sz = broker.getClients().size(); uint32_t sz = broker.getClients().size();
assertEqual(sz , (uint32_t)1); assertEqual(sz , (uint32_t)1);
uint32_t ka = broker.getClients()[0]->keepAlive(); uint32_t ka = getClientKeepAlive(broker);
assertEqual(ka, keep_alive); assertEqual(ka, keep_alive);
} }
@@ -302,14 +312,14 @@ test(network_one_client_one_broker_hudge_publish_and_subscribe_through_network)
assertEqual((unsigned int)lastLength, (unsigned int)sent.size()); assertEqual((unsigned int)lastLength, (unsigned int)sent.size());
} }
test(network_client_should_unregister_when_destroyed) test(network_local_client_should_unregister_when_destroyed)
{ {
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
{ {
MqttClient client(&broker); MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1); assertEqual(broker.localClientsCount(), (size_t)1);
} }
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
} }
@@ -322,13 +332,13 @@ test(network_connect)
MqttClient client(&broker); MqttClient client(&broker);
assertTrue(client.connected()); assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1); assertEqual(broker.localClientsCount(), (size_t)1);
} }
test(network_publish_should_be_dispatched) test(network_publish_should_be_dispatched)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker); MqttClient subscriber(&broker);
subscriber.subscribe("a/b"); subscriber.subscribe("a/b");
@@ -429,11 +439,12 @@ test(network_small_payload)
test(network_hudge_payload) test(network_hudge_payload)
{ {
const char* payload="This payload is hudge, just because its length exceeds 127. Thus when encoding length, we have to encode it on two bytes at min. This should not prevent the message from being encoded and decoded successfully !"; // const char* payload="This payload is hudge, just because its length exceeds 127. Thus when encoding length, we have to encode it on two bytes at min. This should not prevent the message from being encoded and decoded successfully !";
const char* payload="This was decoded successfully !";
MqttClient subscriber(&broker); MqttClient subscriber(&broker);
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
subscriber.subscribe("a/b"); // Note -> this does not send any byte .... (nowhere to send) subscriber.subscribe("a/b"); // Note -> this does not send any byte .... (nowhere to send) TODO
MqttClient publisher(&broker); MqttClient publisher(&broker);
publisher.publish("a/b", payload); // This publish is received publisher.publish("a/b", payload); // This publish is received
@@ -442,11 +453,13 @@ test(network_hudge_payload)
assertEqual(payload, lastPayload); assertEqual(payload, lastPayload);
assertEqual(lastLength, strlen(payload)); assertEqual(lastLength, strlen(payload));
assertEqual(strcmp(payload, lastPayload), 0); assertEqual(strcmp(payload, lastPayload), 0);
std::cout << "payload : " << payload << std::endl;
std::cout << "received: " << lastPayload << std::endl;
} }
test(connack) test(connack)
{ {
const bool view = false; const bool view = true;
NetworkObserver check( NetworkObserver check(
[this](const WiFiClient*, const uint8_t* buffer, size_t length) [this](const WiFiClient*, const uint8_t* buffer, size_t length)

View File

@@ -32,27 +32,27 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
test(nowifi_client_should_unregister_when_destroyed) test(nowifi_client_should_unregister_when_destroyed)
{ {
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
{ {
MqttClient client(&broker); MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1); assertEqual(broker.localClientsCount(), (size_t)1);
} }
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
} }
test(nowifi_connect) test(nowifi_connect)
{ {
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient client(&broker); MqttClient client(&broker);
assertTrue(client.connected()); assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1); assertEqual(broker.localClientsCount(), (size_t)1);
} }
test(nowifi_publish_should_be_dispatched) test(nowifi_publish_should_be_dispatched)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker); MqttClient subscriber(&broker);
subscriber.subscribe("a/b"); subscriber.subscribe("a/b");
@@ -72,7 +72,7 @@ test(nowifi_publish_should_be_dispatched)
test(nowifi_publish_should_be_dispatched_to_clients) test(nowifi_publish_should_be_dispatched_to_clients)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber_a(&broker, "A"); MqttClient subscriber_a(&broker, "A");
subscriber_a.setCallback(onPublish); subscriber_a.setCallback(onPublish);
@@ -97,7 +97,7 @@ test(nowifi_publish_should_be_dispatched_to_clients)
test(nowifi_subscribe_with_star_wildcard) test(nowifi_subscribe_with_star_wildcard)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker, "A"); MqttClient subscriber(&broker, "A");
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -118,7 +118,7 @@ test(nowifi_subscribe_with_star_wildcard)
test(nowifi_subscribe_with_plus_wildcard) test(nowifi_subscribe_with_plus_wildcard)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker, "A"); MqttClient subscriber(&broker, "A");
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -139,7 +139,7 @@ test(nowifi_subscribe_with_plus_wildcard)
test(nowifi_should_not_receive_sys_msg) test(nowifi_should_not_receive_sys_msg)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker, "A"); MqttClient subscriber(&broker, "A");
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -154,7 +154,7 @@ test(nowifi_should_not_receive_sys_msg)
test(nowifi_subscribe_with_mixed_wildcards) test(nowifi_subscribe_with_mixed_wildcards)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker, "A"); MqttClient subscriber(&broker, "A");
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -173,7 +173,7 @@ test(nowifi_subscribe_with_mixed_wildcards)
test(nowifi_unsubscribe_with_wildcards) test(nowifi_unsubscribe_with_wildcards)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker, "A"); MqttClient subscriber(&broker, "A");
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -195,7 +195,7 @@ test(nowifi_unsubscribe_with_wildcards)
test(nowifi_unsubscribe) test(nowifi_unsubscribe)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient subscriber(&broker); MqttClient subscriber(&broker);
subscriber.setCallback(onPublish); subscriber.setCallback(onPublish);
@@ -215,7 +215,7 @@ test(nowifi_unsubscribe)
test(nowifi_nocallback_when_destroyed) test(nowifi_nocallback_when_destroyed)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.localClientsCount(), (size_t)0);
MqttClient publisher(&broker); MqttClient publisher(&broker);