Compare commits

..

10 Commits

Author SHA1 Message Date
hsaturn
a0435b2cfb Release 0.9.12 unit test build upgrade 2023-01-03 04:33:47 +01:00
hsaturn
bda041417d Release 0.9.12 2023-01-03 04:32:14 +01:00
hsaturn
baffda8a6d MqttClient - fix local disconnect after pulish + ka 2023-01-03 04:25:26 +01:00
Francois BIOT
09e3a3e45f Rename MqttBroker to remote_broker 2022-12-29 13:39:34 +01:00
Francois BIOT
f17ece3376 MqttClient::client renamed to tcp_client 2022-12-29 12:58:08 +01:00
hsaturn
0db07df27b Remove useless comment 2022-12-29 12:54:58 +01:00
hsaturn
292592c3dd Added missing Makefile for unit test of MqttClassBinder 2022-12-29 02:17:54 +01:00
Francois BIOT
1f267c135b fix erroneous sizeof multimap comment 2022-12-29 02:15:18 +01:00
Francois BIOT
2b92833ea5 Remove spaces to end of lines 2022-12-28 21:22:19 +01:00
Francois BIOT
42fc054c94 release 0.9.11 2022-12-28 20:30:41 +01:00
15 changed files with 139 additions and 98 deletions

View File

@@ -9,7 +9,7 @@ on: [push]
jobs:
build:
runs-on: ubuntu-18.04
runs-on: ubuntu-20.04
steps:
- uses: actions/checkout@v2
@@ -18,7 +18,7 @@ jobs:
run: |
cd ..
git clone https://github.com/hsaturn/TinyConsole
git clone https://github.com/bxparks/EpoxyDuino
git clone https://github.com/hsaturn/EpoxyDuino
git clone https://github.com/bxparks/AceRoutine
git clone https://github.com/bxparks/AUnit
git clone https://github.com/bxparks/AceCommon

View File

@@ -1,6 +1,6 @@
name: "CI"
on:
jobs:
ci:
runs-on: ubuntu-20.04

View File

@@ -1,12 +1,12 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttClassBinder.h>
/**
/**
* Example on how to bind a class:onPublish function
*
* Local broker that accept connections and two local clients
*
*
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
@@ -15,14 +15,14 @@
* | | ^ |
* | | | |
* | | | | -----
* | v v | ---
* | +----------+ +----------+ | -
* | v v | ---
* | +----------+ +----------+ | -
* | | internal | | internal | +-------* Wifi
* | | client | | client | |
* | +----------+ +----------+ |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
@@ -30,7 +30,7 @@
* - accepts external clients
* - MqttClassBinder allows to mix together many mqtt sources
*
* cons - Takes more memory (48 more bytes for the one MqttClassBinder<Class>
* cons - Takes more memory (24 more bytes for the one MqttClassBinder<Class>
* - a bit hard to understand
*
*/
@@ -50,7 +50,7 @@ MqttClient mqtt_sender(&broker);
class MqttReceiver: public MqttClassBinder<MqttReceiver>
{
public:
void onPublish(const MqttClient* source, const Topic& topic, const char* payload, size_t /* length */)
{
Serial

View File

@@ -1,9 +1,9 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/**
/**
* Local broker that accept connections and two local clients
*
*
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
@@ -12,14 +12,14 @@
* | | ^ |
* | | | |
* | | | | -----
* | v v | ---
* | +----------+ +----------+ | -
* | v v | ---
* | +----------+ +----------+ | -
* | | internal | | internal | +-------* Wifi
* | | client | | client | |
* | +----------+ +----------+ |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker

View File

@@ -1,7 +1,7 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/** TinyMQTT allows a disconnected mode:
*
*
* +-----------------------------+
* | ESP |
* | +--------+ |
@@ -12,8 +12,8 @@
* | v v |
* | +----------+ +----------+ |
* | | internal | | internal | |
* | | client | | client | |
* | +----------+ +----------+ |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*

View File

@@ -3,20 +3,20 @@
/** Simple Client (The simplest configuration)
*
*
* +--------+
*
* +--------+
* +------>| broker |<--- < Other client
* | +--------+
* |
* | +--------+
* |
* +-----------------+
* | ESP | |
* | +----------+ |
* | | internal | |
* | | client | |
* | +----------+ |
* | ESP | |
* | +----------+ |
* | | internal | |
* | | client | |
* | +----------+ |
* | |
* +-----------------+
*
*
* 1 - change the ssid/password
* 2 - change BROKER values (or keep emqx.io test broker)
* 3 - you can use mqtt-spy to connect to the same broker and
@@ -39,7 +39,7 @@ const char* password = "";
static float temp=19;
static MqttClient client;
void setup()
void setup()
{
Serial.begin(115200);
delay(500);
@@ -50,11 +50,11 @@ void setup()
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect(BROKER, BROKER_PORT);
}

View File

@@ -1,5 +1,5 @@
name=TinyMqtt
version=0.9.10
version=0.9.12
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging.

View File

@@ -8,7 +8,7 @@ class MqttClassBinder
unregister(this);
}
~MqttClassBinder() { unregister(this); }
static void onUnpublished(MqttClient::CallBack handler)
{
unrouted_handler = handler;

View File

@@ -38,12 +38,12 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
{
debug("MqttClient private with broker");
#ifdef TINY_MQTT_ASYNC
client = new_client;
client->onData(onData, this);
tcp_client = new_client;
tcp_client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
tcp_client = new WiFiClient(*new_client);
#endif
#ifdef EPOXY_DUINO
alive = millis()+500000;
@@ -55,30 +55,32 @@ MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
: local_broker(local_broker), clientId(id)
{
client = nullptr;
alive = 0;
keep_alive = 0;
if (local_broker) local_broker->addClient(this);
if (local_broker) local_broker->addClient(this);
}
MqttClient::~MqttClient()
{
close();
delete client;
delete tcp_client;
debug("*** MqttClient delete()");
}
void MqttClient::close(bool bSendDisconnect)
{
debug("close " << id().c_str());
mqtt_connected = false;
if (client) // connected to a remote broker
if (tcp_client) // connected to a remote broker
{
if (bSendDisconnect and client->connected())
if (bSendDisconnect and tcp_client->connected())
{
message.create(MqttMessage::Type::Disconnect);
message.hexdump("close");
message.sendTo(this);
}
client->stop();
tcp_client->stop();
}
if (local_broker)
@@ -100,18 +102,18 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
debug("MqttClient::connect_to_host " << broker << ':' << port);
keep_alive = ka;
close();
if (client) delete client;
client = new TcpClient;
if (tcp_client) delete tcp_client;
tcp_client = new TcpClient;
#ifdef TINY_MQTT_ASYNC
client->onData(onData, this);
client->onConnect(onConnect, this);
client->connect(broker.c_str(), port, ka);
tcp_client->onData(onData, this);
tcp_client->onConnect(onConnect, this);
tcp_client->connect(broker.c_str(), port, ka);
#else
if (client->connect(broker.c_str(), port))
if (tcp_client->connect(broker.c_str(), port))
{
debug("link established");
onConnect(this, client);
onConnect(this, tcp_client);
}
else
{
@@ -129,9 +131,9 @@ void MqttBroker::addClient(MqttClient* client)
void MqttBroker::connect(const std::string& host, uint16_t port)
{
debug("MqttBroker::connect");
if (broker == nullptr) broker = new MqttClient;
broker->connect(host, port);
broker->local_broker = this; // Because connect removed the link
if (remote_broker == nullptr) remote_broker = new MqttClient;
remote_broker->connect(host, port);
remote_broker->local_broker = this; // Because connect removed the link
}
void MqttBroker::removeClient(MqttClient* remove)
@@ -175,19 +177,16 @@ void MqttBroker::loop()
onClient(this, &client);
}
#endif
if (broker)
if (remote_broker)
{
// TODO should monitor broker's activity.
// 1 When broker disconnect and reconnect we have to re-subscribe
broker->loop();
remote_broker->loop();
}
// for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop
for(size_t i=0; i<clients.size(); i++)
{
auto client = clients[i];
MqttClient* client = clients[i];
if (client->connected())
{
client->loop();
@@ -205,9 +204,9 @@ void MqttBroker::loop()
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
debug("MqttBroker::subscribe");
if (broker && broker->connected())
if (remote_broker && remote_broker->connected())
{
return broker->subscribe(topic, qos);
return remote_broker->subscribe(topic, qos);
}
return MqttNowhereToSend;
}
@@ -222,19 +221,19 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
{
i++;
#if TINY_MQTT_DEBUG
Console << __LINE__ << " broker:" << (broker && broker->connected() ? "linked" : "alone") <<
Console << __LINE__ << " broker:" << (remote_broker && remote_broker->connected() ? "linked" : "alone") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false;
if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
if (remote_broker && remote_broker->connected()) // this (MqttBroker) is connected (to a external broker)
{
// ext_broker -> clients or clients -> ext_broker
if (source == broker) // external broker -> internal clients
if (source == remote_broker) // external broker -> internal clients
doit = true;
else // external clients -> this broker
{
// As this broker is connected to another broker, simply forward the msg
MqttError ret = broker->publishIfSubscribed(topic, msg);
MqttError ret = remote_broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret;
}
}
@@ -285,7 +284,7 @@ void MqttClient::clientAlive(uint32_t more_seconds)
void MqttClient::loop()
{
if (alive && (millis() > alive))
if (keep_alive && (millis() >= alive))
{
if (local_broker)
{
@@ -293,11 +292,11 @@ void MqttClient::loop()
close();
debug(red << "closed");
}
else if (client && client->connected())
else if (tcp_client && tcp_client->connected())
{
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((const char*)(&pingreq), 2);
tcp_client->write((const char*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local broker
@@ -305,9 +304,9 @@ void MqttClient::loop()
}
}
#ifndef TINY_MQTT_ASYNC
while(client && client->available()>0)
while(tcp_client && tcp_client->available()>0)
{
message.incoming(client->read());
message.incoming(tcp_client->read());
if (message.type())
{
processMessage(&message);
@@ -520,11 +519,11 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::PingReq:
if (!mqtt_connected) break;
if (client)
if (tcp_client)
{
uint16_t pingreq = MqttMessage::Type::PingResp;
debug(cyan << "Ping response to client ");
client->write((const char*)(&pingreq), 2);
tcp_client->write((const char*)(&pingreq), 2);
bclose = false;
}
else
@@ -538,7 +537,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
{
if (!mqtt_connected) break;
payload = header+2;
debug("un/subscribe loop");
std::string qoss;
while(payload < mesg->end())
@@ -586,9 +585,9 @@ void MqttClient::processMessage(MqttMessage* mesg)
case MqttMessage::Type::Publish:
#if TINY_MQTT_DEBUG
Console << "publish " << mqtt_connected << '/' << (long) client << endl;
Console << "publish " << mqtt_connected << '/' << (long) tcp_client << endl;
#endif
if (mqtt_connected or client == nullptr)
if (mqtt_connected or tcp_client == nullptr)
{
uint8_t qos = mesg->flags();
payload = header;
@@ -604,7 +603,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
// TODO reset DUP
// TODO reset RETAIN
if (local_broker==nullptr or client==nullptr) // internal MqttClient receives publish
if (local_broker==nullptr or tcp_client==nullptr) // internal MqttClient receives publish
{
#if TINY_MQTT_DEBUG
if (TinyMqtt::debug >= 2)
@@ -730,7 +729,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
{
return local_broker->publish(this, topic, msg);
}
else if (client)
else if (tcp_client)
return msg.sendTo(this);
else
return MqttNowhereToSend;
@@ -744,7 +743,7 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg)
debug("mqttclient publishIfSubscribed " << topic.c_str() << ' ' << subscriptions.size());
if (isSubscribedTo(topic))
{
if (client)
if (tcp_client)
retval = msg.sendTo(this);
else
{
@@ -862,7 +861,7 @@ void MqttMessage::encodeLength()
buffer[2] = (length >> 7);
vheader = 3;
}
// We could check that buffer[2] < 128 (end of length encoding)
state = Complete;
}

View File

@@ -193,12 +193,13 @@ class MqttClient
// no negociation occurred
bool connected()
{
return (local_broker!=nullptr and client==nullptr) or (client and client->connected());
return (local_broker!=nullptr and tcp_client==nullptr)
or (tcp_client and tcp_client->connected());
}
void write(const char* buf, size_t length)
{
if (client) client->write(buf, length);
if (tcp_client) tcp_client->write(buf, length);
}
const std::string& id() const { return clientId; }
@@ -229,7 +230,7 @@ class MqttClient
// connected to local broker
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
bool isLocal() const { return tcp_client == nullptr; }
void dump(std::string indent="")
{
@@ -238,9 +239,9 @@ class MqttClient
uint32_t ms=millis();
Console << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF");
Console << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive << ' ';
if (client)
if (tcp_client)
{
if (client->connected())
if (tcp_client->connected())
Console << TinyConsole::green << "connected";
else
Console << TinyConsole::red << "disconnected";
@@ -295,7 +296,7 @@ class MqttClient
// when MqttBroker uses MqttClient for each external connexion
MqttBroker* local_broker=nullptr;
TcpClient* client=nullptr; // connection to remote broker
TcpClient* tcp_client=nullptr; // connection to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback = nullptr;
@@ -357,7 +358,7 @@ class MqttBroker
const char* auth_user = "guest";
const char* auth_password = "guest";
MqttClient* broker = nullptr;
MqttClient* remote_broker = nullptr;
State state = Disconnected;
};

View File

@@ -0,0 +1,13 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
EXTRA_CXXFLAGS=-g3 -O0 -DTINY_MQTT_TESTS
# Remove flto flag from EpoxyDuino (too many <optimized out>)
CXXFLAGS = -Wextra -Wall -std=gnu++11 -fno-exceptions -fno-threadsafe-statics
APP_NAME := classbind-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP TinyConsole
ARDUINO_LIB_DIRS := ../../../EspMock/libraries
EPOXY_CORE := EPOXY_CORE_ESP8266
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -154,7 +154,7 @@ test(classbind_one_client_receives_the_message)
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver"], 1);
assertEqual(unrouted, 0);
}
@@ -190,7 +190,7 @@ test(classbind_routes_should_be_empty_when_receiver_goes_out_of_scope)
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver"], 0);
assertEqual(MqttClassBinder<TestReceiver>::size(), (size_t)0);
}
@@ -224,7 +224,7 @@ test(classbind_publish_should_be_dispatched_to_many_receivers)
client.loop();
broker.loop();
}
assertEqual(TestReceiver::messages["receiver_1"], 1);
assertEqual(TestReceiver::messages["receiver_2"], 1);
}
@@ -274,7 +274,7 @@ test(classbind_register_to_many_clients)
// Ensure publishes are processed
for (int i =0; i<5; i++) loop();
assertEqual(TestReceiver::messages["receiver"], 4);
}

View File

@@ -40,6 +40,34 @@ test(local_client_should_unregister_when_destroyed)
assertEqual(broker.clientsCount(), (size_t)0);
}
test(local_client_do_not_disconnect_after_publishing)
{
set_millis(0);
MqttBroker broker(1883);
MqttClient client(&broker, "client");
MqttClient sender(&broker, "sender");
broker.loop();
client.subscribe("#");
client.subscribe("test");
client.setCallback(onPublish);
assertEqual(broker.clientsCount(), (size_t)2);
sender.publish("test", "value");
broker.loop();
add_seconds(60);
client.loop();
sender.loop();
broker.loop();
assertEqual(broker.clientsCount(), (size_t)2);
assertEqual(sender.connected(), true);
assertEqual(client.connected(), true);
assertEqual(published.size(), (size_t)1); // client has received something
}
#if 0
test(local_connect)
{

View File

@@ -99,7 +99,7 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
{
if (srce)
published[srce->id()][topic]++;
if (lastPayload) free(lastPayload);
lastPayload = strdup(payload);
lastLength = length;
@@ -221,7 +221,7 @@ test(network_one_client_one_broker_publish_and_subscribe_through_network)
client.loop();
broker.loop();
}
assertEqual(published.size(), (size_t)1);
assertEqual((int)lastLength, (int)2); // sizeof(ab)
}
@@ -321,7 +321,7 @@ test(network_publish_should_be_dispatched_to_clients)
MqttClient publisher(&broker);
publisher.publish("a/b"); // A and B should receive this
publisher.publish("a/c"); // A should receive this
publisher.publish("a/c"); // A should receive this
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertEqual(published["A"]["a/b"], 1);

View File

@@ -24,7 +24,7 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
{
if (srce)
published[srce->id()][topic]++;
if (lastPayload) free(lastPayload);
lastPayload = strdup(payload);
lastLength = length;
@@ -85,7 +85,7 @@ test(nowifi_publish_should_be_dispatched_to_clients)
MqttClient publisher(&broker);
publisher.publish("a/b"); // A and B should receive this
publisher.publish("a/c"); // A should receive this
publisher.publish("a/c"); // A should receive this
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertEqual(published["A"]["a/b"], 1);