Compare commits

..

15 Commits

Author SHA1 Message Date
hsaturn
a0435b2cfb Release 0.9.12 unit test build upgrade 2023-01-03 04:33:47 +01:00
hsaturn
bda041417d Release 0.9.12 2023-01-03 04:32:14 +01:00
hsaturn
baffda8a6d MqttClient - fix local disconnect after pulish + ka 2023-01-03 04:25:26 +01:00
Francois BIOT
09e3a3e45f Rename MqttBroker to remote_broker 2022-12-29 13:39:34 +01:00
Francois BIOT
f17ece3376 MqttClient::client renamed to tcp_client 2022-12-29 12:58:08 +01:00
hsaturn
0db07df27b Remove useless comment 2022-12-29 12:54:58 +01:00
hsaturn
292592c3dd Added missing Makefile for unit test of MqttClassBinder 2022-12-29 02:17:54 +01:00
Francois BIOT
1f267c135b fix erroneous sizeof multimap comment 2022-12-29 02:15:18 +01:00
Francois BIOT
2b92833ea5 Remove spaces to end of lines 2022-12-28 21:22:19 +01:00
Francois BIOT
42fc054c94 release 0.9.11 2022-12-28 20:30:41 +01:00
hsaturn
7f12ecfd6d Update mqtt_class_binder.ino 2022-12-28 20:29:33 +01:00
Francois BIOT
3ae1afec27 Release 0.9.10 2022-12-28 20:23:58 +01:00
Francois BIOT
9608ed9fdf Added example for MqttClassBinder 2022-12-28 20:22:31 +01:00
hsaturn
220e904ae9 Add MqttClassBinder 2022-12-28 19:34:29 +01:00
hsaturn
49b696315c Fix TINY_MQTT_DEBUG compilation 2022-12-28 19:30:16 +01:00
15 changed files with 689 additions and 86 deletions

View File

@@ -9,7 +9,7 @@ on: [push]
jobs:
build:
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
@@ -18,7 +18,7 @@ jobs:
run: |
cd ..
git clone https://github.com/hsaturn/TinyConsole
git clone https://github.com/bxparks/EpoxyDuino
git clone https://github.com/hsaturn/EpoxyDuino
git clone https://github.com/bxparks/AceRoutine
git clone https://github.com/bxparks/AUnit
git clone https://github.com/bxparks/AceCommon

View File

@@ -0,0 +1,134 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttClassBinder.h>
/**
* Example on how to bind a class:onPublish function
*
* Local broker that accept connections and two local clients
*
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
* | +-------->| broker | | 1883 <--- External client/s
* | | +--------+ |
* | | ^ |
* | | | |
* | | | | -----
* | v v | ---
* | +----------+ +----------+ | -
* | | internal | | internal | +-------* Wifi
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
* - MqttClassBinder allows to mix together many mqtt sources
*
* cons - Takes more memory (24 more bytes for the one MqttClassBinder<Class>
* - a bit hard to understand
*
*/
const char *ssid = "";
const char *password = "";
std::string topic_b="sensor/btemp";
std::string topic_sender= "sensor/counter";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
MqttClient mqtt_sender(&broker);
class MqttReceiver: public MqttClassBinder<MqttReceiver>
{
public:
void onPublish(const MqttClient* source, const Topic& topic, const char* payload, size_t /* length */)
{
Serial
<< " * MqttReceiver received topic (" << topic.c_str() << ")"
<< " from (" << source->id() << "), "
<< " payload: (" << payload << ')' << endl;
}
};
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{
Serial << '-'; delay(500);
if (strlen(ssid)==0)
Serial << "****** PLEASE EDIT THE EXAMPLE AND MODIFY ssid/password *************" << endl;
}
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
MqttReceiver* receiver = new MqttReceiver;
// receiver will receive both publication from two MqttClient
// (that could be connected to two different brokers)
MqttClassBinder<MqttReceiver>::onPublish(&mqtt_a, receiver);
MqttClassBinder<MqttReceiver>::onPublish(&mqtt_b, receiver);
mqtt_a.id("mqtt_a");
mqtt_b.id("mqtt_b");
mqtt_sender.id("sender");
mqtt_a.subscribe(topic_b);
mqtt_b.subscribe(topic_sender);
}
void loop()
{
broker.loop(); // Don't forget to add loop for every broker and clients
mqtt_a.loop();
mqtt_b.loop();
mqtt_sender.loop();
// ============= client A publish ================
{
static const int interval = 5000; // publishes every 5s (please avoid usage of delay())
static uint32_t timer = millis() + interval;
if (millis() > timer)
{
static int counter = 0;
Serial << "Sender is publishing " << topic_sender.c_str() << endl;
timer += interval;
mqtt_sender.publish(topic_sender, "sent by Sender, message #"+std::string(String(counter++).c_str()));
}
}
// ============= client B publish ================
{
static const int interval = 7000; // will send topic each 7s
static uint32_t timer = millis() + interval;
static int temperature;
if (millis() > timer)
{
Serial << "B is publishing " << topic_b.c_str() << endl;
timer += interval;
mqtt_b.publish(topic_b, "sent by B: temp="+std::string(String(16+temperature++%6).c_str()));
}
}
}

View File

@@ -1,5 +1,5 @@
name=TinyMqtt
version=0.9.9
version=0.9.12
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging.

72
src/MqttClassBinder.h Normal file
View File

@@ -0,0 +1,72 @@
// MqttReceiver must implement onPublish(...)
template <class MqttReceiver>
class MqttClassBinder
{
public:
MqttClassBinder()
{
unregister(this);
}
~MqttClassBinder() { unregister(this); }
static void onUnpublished(MqttClient::CallBack handler)
{
unrouted_handler = handler;
}
static void onPublish(MqttClient* client, MqttReceiver* dest)
{
routes.insert(std::pair<MqttClient*, MqttReceiver*>(client, dest));
client->setCallback(onRoutePublish);
}
void onPublish(const MqttClient* client, const Topic& topic, const char* payload, size_t length)
{
static_cast<MqttReceiver*>(this)->MqttReceiver::onPublish(client, topic, payload, length);
}
static size_t size() { return routes.size(); }
static void reset() { routes.clear(); }
private:
static void onRoutePublish(const MqttClient* client, const Topic& topic, const char* payload, size_t length)
{
bool unrouted = true;
auto receivers = routes.equal_range(client);
for(auto it = receivers.first; it != receivers.second; ++it)
{
it->second->onPublish(client, topic, payload, length);
unrouted = false;
}
if (unrouted and unrouted_handler)
{
unrouted_handler(client, topic, payload, length);
}
}
private:
void unregister(MqttClassBinder<MqttReceiver>* which)
{
if (routes.size()==0) return; // bug in map stl
for(auto it=routes.begin(); it!=routes.end(); it++)
if (it->second == which)
{
routes.erase(it);
return;
}
}
static std::multimap<const MqttClient*, MqttClassBinder<MqttReceiver>*> routes;
static MqttClient::CallBack unrouted_handler;
};
template<class MqttReceiver>
std::multimap<const MqttClient*, MqttClassBinder<MqttReceiver>*> MqttClassBinder<MqttReceiver>::routes;
template<class MqttReceiver>
MqttClient::CallBack MqttClassBinder<MqttReceiver>::unrouted_handler = nullptr;

View File

@@ -38,12 +38,12 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
{
debug("MqttClient private with broker");
#ifdef TINY_MQTT_ASYNC
client = new_client;
client->onData(onData, this);
tcp_client = new_client;
tcp_client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
tcp_client = new WiFiClient(*new_client);
#endif
#ifdef EPOXY_DUINO
alive = millis()+500000;
@@ -55,7 +55,8 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
: local_broker(local_broker), clientId(id)
{
client = nullptr;
alive = 0;
keep_alive = 0;
if (local_broker) local_broker->addClient(this);
}
@@ -63,22 +64,23 @@ MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
MqttClient::~MqttClient()
{
close();
delete client;
delete tcp_client;
debug("*** MqttClient delete()");
}
void MqttClient::close(bool bSendDisconnect)
{
debug("close " << id().c_str());
mqtt_connected = false;
if (client) // connected to a remote broker
if (tcp_client) // connected to a remote broker
{
if (bSendDisconnect and client->connected())
if (bSendDisconnect and tcp_client->connected())
{
message.create(MqttMessage::Type::Disconnect);
message.hexdump("close");
message.sendTo(this);
}
client->stop();
tcp_client->stop();
}
if (local_broker)
@@ -100,18 +102,18 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
debug("MqttClient::connect_to_host " << broker << ':' << port);
keep_alive = ka;
close();
if (client) delete client;
client = new TcpClient;
if (tcp_client) delete tcp_client;
tcp_client = new TcpClient;
#ifdef TINY_MQTT_ASYNC
client->onData(onData, this);
client->onConnect(onConnect, this);
client->connect(broker.c_str(), port, ka);
tcp_client->onData(onData, this);
tcp_client->onConnect(onConnect, this);
tcp_client->connect(broker.c_str(), port, ka);
#else
if (client->connect(broker.c_str(), port))
if (tcp_client->connect(broker.c_str(), port))
{
debug("link established");
onConnect(this, client);
onConnect(this, tcp_client);
}
else
{
@@ -129,9 +131,9 @@ void MqttBroker::addClient(MqttClient* client)
void MqttBroker::connect(const std::string& host, uint16_t port)
{
debug("MqttBroker::connect");
if (broker == nullptr) broker = new MqttClient;
broker->connect(host, port);
broker->local_broker = this; // Because connect removed the link
if (remote_broker == nullptr) remote_broker = new MqttClient;
remote_broker->connect(host, port);
remote_broker->local_broker = this; // Because connect removed the link
}
void MqttBroker::removeClient(MqttClient* remove)
@@ -175,19 +177,16 @@ void MqttBroker::loop()
onClient(this, &client);
}
#endif
if (broker)
if (remote_broker)
{
// TODO should monitor broker's activity.
// 1 When broker disconnect and reconnect we have to re-subscribe
broker->loop();
remote_broker->loop();
}
// for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop
for(size_t i=0; i<clients.size(); i++)
{
auto client = clients[i];
MqttClient* client = clients[i];
if (client->connected())
{
client->loop();
@@ -205,9 +204,9 @@ void MqttBroker::loop()
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
debug("MqttBroker::subscribe");
if (broker && broker->connected())
if (remote_broker && remote_broker->connected())
{
return broker->subscribe(topic, qos);
return remote_broker->subscribe(topic, qos);
}
return MqttNowhereToSend;
}
@@ -222,19 +221,19 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
{
i++;
#if TINY_MQTT_DEBUG
Console << __LINE__ << " broker:" << (broker && broker->connected() ? "linked" : "alone") <<
Console << __LINE__ << " broker:" << (remote_broker && remote_broker->connected() ? "linked" : "alone") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false;
if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
if (remote_broker && remote_broker->connected()) // this (MqttBroker) is connected (to a external broker)
{
// ext_broker -> clients or clients -> ext_broker
if (source == broker) // external broker -> internal clients
if (source == remote_broker) // external broker -> internal clients
doit = true;
else // external clients -> this broker
{
// As this broker is connected to another broker, simply forward the msg
MqttError ret = broker->publishIfSubscribed(topic, msg);
MqttError ret = remote_broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret;
}
}
@@ -285,7 +284,7 @@ void MqttClient::clientAlive(uint32_t more_seconds)
void MqttClient::loop()
{
if (alive && (millis() > alive))
if (keep_alive && (millis() >= alive))
{
if (local_broker)
{
@@ -293,11 +292,11 @@ void MqttClient::loop()
close();
debug(red << "closed");
}
else if (client && client->connected())
else if (tcp_client && tcp_client->connected())
{
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((const char*)(&pingreq), 2);
tcp_client->write((const char*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local broker
@@ -305,9 +304,9 @@ void MqttClient::loop()
}
}
#ifndef TINY_MQTT_ASYNC
while(client && client->available()>0)
while(tcp_client && tcp_client->available()>0)
{
message.incoming(client->read());
message.incoming(tcp_client->read());
if (message.type())
{
processMessage(&message);
@@ -520,11 +519,11 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::PingReq:
if (!mqtt_connected) break;
if (client)
if (tcp_client)
{
uint16_t pingreq = MqttMessage::Type::PingResp;
debug(cyan << "Ping response to client ");
client->write((const char*)(&pingreq), 2);
tcp_client->write((const char*)(&pingreq), 2);
bclose = false;
}
else
@@ -586,9 +585,9 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::Publish:
#if TINY_MQTT_DEBUG
Console << "publish " << mqtt_connected << '/' << (long) client << endl;
Console << "publish " << mqtt_connected << '/' << (long) tcp_client << endl;
#endif
if (mqtt_connected or client == nullptr)
if (mqtt_connected or tcp_client == nullptr)
{
uint8_t qos = mesg->flags();
payload = header;
@@ -604,7 +603,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
// TODO reset DUP
// TODO reset RETAIN
if (local_broker==nullptr or client==nullptr) // internal MqttClient receives publish
if (local_broker==nullptr or tcp_client==nullptr) // internal MqttClient receives publish
{
#if TINY_MQTT_DEBUG
if (TinyMqtt::debug >= 2)
@@ -730,7 +729,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
{
return local_broker->publish(this, topic, msg);
}
else if (client)
else if (tcp_client)
return msg.sendTo(this);
else
return MqttNowhereToSend;
@@ -744,7 +743,7 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg)
debug("mqttclient publishIfSubscribed " << topic.c_str() << ' ' << subscriptions.size());
if (isSubscribedTo(topic))
{
if (client)
if (tcp_client)
retval = msg.sendTo(this);
else
{

View File

@@ -1,6 +1,9 @@
// vim: ts=2 sw=2 expandtab
#pragma once
#ifndef TINY_MQTT_DEBUG
#define TINY_MQTT_DEBUG 0
#endif
// TODO Should add a AUnit with both TINY_MQTT_ASYNC and not TINY_MQTT_ASYNC
// #define TINY_MQTT_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
@@ -39,7 +42,7 @@
#include <TinyStreaming.h>
#if TINY_MQTT_DEBUG
include <TinyConsole.h> // https://github.com/hsaturn/TinyConsole
#include <TinyConsole.h> // https://github.com/hsaturn/TinyConsole
struct TinyMqtt
{
static int debug;
@@ -162,7 +165,6 @@ class MqttMessage
class MqttBroker;
class MqttClient
{
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
enum __attribute__((packed)) Flags
{
FlagUserName = 128,
@@ -174,6 +176,9 @@ class MqttClient
FlagReserved = 1
};
public:
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
/** Constructor. Broker is the adress of a local broker if not null
If you want to connect elsewhere, leave broker null and use connect() **/
MqttClient(MqttBroker* broker = nullptr, const std::string& id = TINY_MQTT_DEFAULT_CLIENT_ID);
@@ -188,12 +193,13 @@ class MqttClient
// no negociation occurred
bool connected()
{
return (local_broker!=nullptr and client==nullptr) or (client and client->connected());
return (local_broker!=nullptr and tcp_client==nullptr)
or (tcp_client and tcp_client->connected());
}
void write(const char* buf, size_t length)
{
if (client) client->write(buf, length);
if (tcp_client) tcp_client->write(buf, length);
}
const std::string& id() const { return clientId; }
@@ -224,7 +230,7 @@ class MqttClient
// connected to local broker
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
bool isLocal() const { return tcp_client == nullptr; }
void dump(std::string indent="")
{
@@ -233,9 +239,9 @@ class MqttClient
uint32_t ms=millis();
Console << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF");
Console << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive << ' ';
if (client)
if (tcp_client)
{
if (client->connected())
if (tcp_client->connected())
Console << TinyConsole::green << "connected";
else
Console << TinyConsole::red << "disconnected";
@@ -290,7 +296,7 @@ class MqttClient
// when MqttBroker uses MqttClient for each external connexion
MqttBroker* local_broker=nullptr;
TcpClient* client=nullptr; // connection to remote broker
TcpClient* tcp_client=nullptr; // connection to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback = nullptr;
@@ -352,7 +358,7 @@ class MqttBroker
const char* auth_user = "guest";
const char* auth_password = "guest";
MqttClient* broker = nullptr;
MqttClient* remote_broker = nullptr;
State state = Disconnected;
};

View File

@@ -0,0 +1,13 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_TESTS
# Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
APP_NAME := classbind-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole
ARDUINO_LIB_DIRS := ../../../EspMock/libraries
EPOXY_CORE := EPOXY_CORE_ESP8266
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -0,0 +1,351 @@
// vim: ts=2 sw=2 expandtab
#include <Arduino.h>
#include <AUnit.h>
#include <TinyMqtt.h>
#include <MqttClassBinder.h>
#include <map>
#include <iostream>
#include <iomanip>
#include <sstream>
#include <string>
#include <iostream>
// --------------------- CUT HERE - MQTT MESSAGE ROUTER FILE ----------------------------
class TestReceiver : public MqttClassBinder<TestReceiver>
{
public:
TestReceiver(const char* name) : MqttClassBinder(), name_(name) {}
void onPublish(const MqttClient* /* source */, const Topic& topic, const char* payload, size_t /* length */)
{
Serial << "--> routed message received by " << name_ << ':' << topic.c_str() << " = " << payload << endl;
messages[name_]++;
}
private:
const std::string name_;
public:
static std::map<std::string, int> messages;
};
std::map<std::string, int> TestReceiver::messages;
static int unrouted = 0;
void onUnrouted(const MqttClient*, const Topic& topic, const char*, size_t)
{
Serial << "--> unrouted: " << topic.c_str() << endl;
unrouted++;
}
static std::string topic="sensor/temperature";
/**
* TinyMqtt network unit tests.
*
* No wifi connection unit tests.
* Checks with a local broker. Clients must connect to the local broker
**/
// if ascii_pos = 0, no ascii dump, else ascii dump starts after column ascii_pos
std::string bufferToHexa(const uint8_t* buffer, size_t length, char sep = 0, size_t ascii_pos = 0)
{
std::stringstream out;
std::string ascii;
std::string h("0123456789ABCDEF");
for(size_t i=0; i<length; i++)
{
uint8_t c = buffer[i];
out << h[ c >> 4] << h[ c & 0x0F ];
if (sep) out << sep;
if (ascii_pos)
{
if (c>=32)
ascii += c;
else
ascii +='.';
}
}
std::string ret(out.str());
if (ascii_pos)
{
while(ret.length() < ascii_pos)
ret += ' ';
ret +='[' + ascii + ']';
}
return ret;
}
void dumpMqttMessage(const uint8_t* buffer, size_t length)
{
std::map<int, std::string> pkt =
{ { MqttMessage::Unknown , "Unknown " },
{ MqttMessage::Connect , "Connect " },
{ MqttMessage::ConnAck , "ConnAck " },
{ MqttMessage::Publish , "Publish " },
{ MqttMessage::PubAck , "PubAck " },
{ MqttMessage::Subscribe , "Subscribe " },
{ MqttMessage::SubAck , "SubAck " },
{ MqttMessage::UnSubscribe , "Unsubscribe " },
{ MqttMessage::UnSuback , "UnSubAck " },
{ MqttMessage::PingReq , "PingReq " },
{ MqttMessage::PingResp , "PingResp " },
{ MqttMessage::Disconnect , "Disconnect " } };
std::cout << " | data sent " << std::setw(3) << length << " : ";
auto it = pkt.find(buffer[0] & 0xF0);
if (it == pkt.end())
std::cout << pkt[MqttMessage::Unknown];
else
std::cout << it->second;
std::cout << bufferToHexa(buffer, length, ' ', 60) << std::endl;
}
String toString(const IPAddress& ip)
{
return String(ip[0])+'.'+String(ip[1])+'.'+String(ip[2])+'.'+String(ip[3]);
}
MqttBroker broker(1883);
void reset_and_start_servers(int n, bool early_accept = true)
{
MqttClassBinder<TestReceiver>::reset();
TestReceiver::messages.clear();
unrouted = 0;
ESP8266WiFiClass::resetInstances();
ESP8266WiFiClass::earlyAccept = early_accept;
while(n)
{
ESP8266WiFiClass::selectInstance(n--);
WiFi.mode(WIFI_STA);
WiFi.begin("fake_ssid", "fake_pwd");
}
}
test(classbind_one_client_receives_the_message)
{
reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress ip_broker = WiFi.localIP();
// We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2);
MqttClient client;
client.connect(ip_broker.toString().c_str(), 1883);
broker.loop();
assertTrue(client.connected());
TestReceiver receiver("receiver");
MqttClassBinder<TestReceiver>::onPublish(&client, &receiver);
client.subscribe("a/b");
client.publish("a/b", "ab");
for (int i =0; i<10; i++)
{
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver"], 1);
assertEqual(unrouted, 0);
}
test(classbind_routes_should_be_empty_when_receiver_goes_out_of_scope)
{
reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress ip_broker = WiFi.localIP();
// We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2);
MqttClient client;
client.connect(ip_broker.toString().c_str(), 1883);
broker.loop();
assertTrue(client.connected());
// Make a receiver going out of scope
{
TestReceiver receiver("receiver");
MqttClassBinder<TestReceiver>::onPublish(&client, &receiver);
assertEqual(MqttClassBinder<TestReceiver>::size(), (size_t)1);
}
client.subscribe("a/b");
client.publish("a/b", "ab");
for (int i =0; i<10; i++)
{
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver"], 0);
assertEqual(MqttClassBinder<TestReceiver>::size(), (size_t)0);
}
test(classbind_publish_should_be_dispatched_to_many_receivers)
{
reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress ip_broker = WiFi.localIP();
// We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2);
MqttClient client;
client.connect(ip_broker.toString().c_str(), 1883);
broker.loop();
assertTrue(client.connected());
TestReceiver receiver_1("receiver_1");
TestReceiver receiver_2("receiver_2");
MqttClassBinder<TestReceiver>::onPublish(&client, &receiver_1);
MqttClassBinder<TestReceiver>::onPublish(&client, &receiver_2);
client.subscribe("a/b");
client.publish("a/b", "ab");
for (int i =0; i<10; i++)
{
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver_1"], 1);
assertEqual(TestReceiver::messages["receiver_2"], 1);
}
test(classbind_register_to_many_clients)
{
reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress ip_broker = WiFi.localIP();
// We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2);
MqttClient client_1;
client_1.connect(ip_broker.toString().c_str(), 1883);
broker.loop();
MqttClient client_2;
client_2.connect(ip_broker.toString().c_str(), 1883);
broker.loop();
assertTrue(client_1.connected());
assertTrue(client_2.connected());
TestReceiver receiver("receiver");
MqttClassBinder<TestReceiver>::onPublish(&client_1, &receiver);
MqttClassBinder<TestReceiver>::onPublish(&client_2, &receiver);
auto loop = [&client_1, &client_2, &broker]()
{
client_1.loop();
client_2.loop();
broker.loop();
};
client_1.subscribe("a/b");
client_2.subscribe("a/b");
// Ensure subscribptions are passed
for (int i =0; i<5; i++) loop();
client_1.publish("a/b", "from 1");
client_2.publish("a/b", "from 2");
// Ensure publishes are processed
for (int i =0; i<5; i++) loop();
assertEqual(TestReceiver::messages["receiver"], 4);
}
test(classbind_unrouted_fallback)
{
reset_and_start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress ip_broker = WiFi.localIP();
// We have a 2nd ESP in order to test through wifi (opposed to local)
ESP8266WiFiClass::selectInstance(2);
MqttClient client;
client.connect(ip_broker.toString().c_str(), 1883);
broker.loop();
assertTrue(client.connected());
MqttClassBinder<TestReceiver>::onUnpublished(onUnrouted);
{
TestReceiver receiver("receiver");
MqttClassBinder<TestReceiver>::onPublish(&client, &receiver);
}
client.subscribe("a/b");
client.publish("a/b", "from 2");
// Ensure subscribptions are passed
for (int i =0; i<5; i++)
{
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver"], 0);
assertEqual(unrouted, 1);
}
test(classbind_should_cleanup_when_MqttClient_dies)
{
reset_and_start_servers(2, true);
TestReceiver receiver("receiver");
{
MqttClient client;
MqttClassBinder<TestReceiver>::onPublish(&client, &receiver);
assertEqual(MqttClassBinder<TestReceiver>::size(), (size_t)1);
}
assertEqual(MqttClassBinder<TestReceiver>::size(), (size_t)1);
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
/* delay(1000);
Serial.begin(115200);
while(!Serial);
*/
Serial.println("=============[ FAKE NETWORK TinyMqtt TESTS ]========================");
WiFi.mode(WIFI_STA);
WiFi.begin("network", "password");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}

View File

@@ -40,6 +40,34 @@ test(local_client_should_unregister_when_destroyed)
assertEqual(broker.clientsCount(), (size_t)0);
}
test(local_client_do_not_disconnect_after_publishing)
{
set_millis(0);
MqttBroker broker(1883);
MqttClient client(&broker, "client");
MqttClient sender(&broker, "sender");
broker.loop();
client.subscribe("#");
client.subscribe("test");
client.setCallback(onPublish);
assertEqual(broker.clientsCount(), (size_t)2);
sender.publish("test", "value");
broker.loop();
add_seconds(60);
client.loop();
sender.loop();
broker.loop();
assertEqual(broker.clientsCount(), (size_t)2);
assertEqual(sender.connected(), true);
assertEqual(client.connected(), true);
assertEqual(published.size(), (size_t)1); // client has received something
}
#if 0
test(local_connect)
{