Compare commits

...

38 Commits

Author SHA1 Message Date
hsaturn
2249ddef7f Release 0.7.5 2021-04-28 18:57:24 +02:00
hsaturn
e193929f8f changed every command 2021-04-28 18:56:17 +02:00
hsaturn
e00e31de33 Fix build in Esp8266 mode
Modify dump() functions
2021-04-28 18:55:57 +02:00
hsaturn
20292b7b7b Fix example syntax 2021-04-28 08:24:44 +02:00
hsaturn
26de3befa8 TinyTest Esp32 port
Update library definition
2021-04-28 08:19:55 +02:00
hsaturn
1098466055 Update README.md 2021-04-28 08:14:46 +02:00
hsaturn
2d3663e78c Update README.md 2021-04-28 07:32:56 +02:00
hsaturn
5e16282ad0 Disable ci.yml 2021-04-21 01:10:35 +02:00
hsaturn
e35a43c4a4 Trying to remove unsupported platform that break ci.yml 2021-04-21 01:05:31 +02:00
hsaturn
087a203ba0 Create ci.yml 2021-04-21 00:56:49 +02:00
hsaturn
5d313bbf5e Rewrite examples 2021-04-12 00:27:25 +02:00
hsaturn
ce896f02c4 Merge pull request #6 from hsaturn/AsyncAndWifi
AsyncTcp can be activated by removing the command on TCP_ASYNC in TinyMqtt.h
But the code is not bug free yet.
2021-04-11 23:33:29 +02:00
hsaturn
d3210c3c93 Merge branch 'main' into AsyncAndWifi 2021-04-11 23:30:13 +02:00
hsaturn
23f1207718 Lot of new functions for tinytest
- command every allowing to add peridic evaluations
  very usefull for benchmarks and load tests
- on/off command
2021-04-11 23:27:15 +02:00
hsaturn
122ab88960 Rewrite client-with-wifi.ino 2021-04-11 23:26:49 +02:00
hsaturn
28b0ac1611 Fix missing receive loop for mqttclient 2021-04-11 21:21:48 +02:00
hsaturn
1cfb5cfab1 Allow multiple command per line separated by ; 2021-04-11 19:19:06 +02:00
hsaturn
b023cd67a9 Fix AUnit in debug mode / Not async 2021-04-11 17:02:24 +02:00
hsaturn
24ee6b5201 Fixes in WiFiClient mode 2021-04-11 16:33:12 +02:00
hsaturn
2e92a98db2 Trying to fuse togeter Async and not async version 2021-04-11 15:51:33 +02:00
hsaturn
c59bddfd39 Implementation of Unsuback (unless MqttClient disconnects) 2021-04-11 01:58:44 +02:00
hsaturn
7bdb9cc0cd Tinytest, allow to blink output 0 2021-04-11 01:57:58 +02:00
hsaturn
be62699702 Fix connect problem with MqttClient 2021-04-11 00:48:04 +02:00
hsaturn
77da47e1da Update README.md 2021-04-10 18:02:48 +02:00
hsaturn
88797bfafd Added AUnit 2021-04-10 18:02:28 +02:00
hsaturn
1e3b37623d Fix AUnit build 2021-04-10 17:54:46 +02:00
hsaturn
ba6a96976a ESP32 version that could work 2021-04-10 17:47:21 +02:00
hsaturn
6afd3939b3 Merge remote-tracking branch 'origin/AsyncTcp' into main 2021-04-10 17:23:53 +02:00
hsaturn
2ffe0c13fa README.md update 2021-04-10 17:23:46 +02:00
hsaturn
48eb0daf9a Fix bug in unsubscription list 2021-04-10 17:16:25 +02:00
hsaturn
34c05bc37a Fix compilation in DEBUG mode 2021-04-10 17:16:25 +02:00
hsaturn
7c96c4a5cc Fix warning 2021-04-10 17:16:25 +02:00
hsaturn
b280196395 Added mqDns to tinytest 2021-04-10 17:16:25 +02:00
hsaturn
c75f4893e8 AsyncTcp
AsyncTcp
2021-04-10 17:16:25 +02:00
hsaturn
d666f6a53b AsyncTCP (to be continued) 2021-04-10 17:16:25 +02:00
hsaturn
7ef18de755 Very promising async commit 2021-04-10 17:16:25 +02:00
hsaturn
838df3a34a Added unsubscribe to tinytest 2021-04-10 17:15:21 +02:00
hsaturn
8a25155fd8 Bad merge fix 2021-04-10 16:06:45 +02:00
11 changed files with 710 additions and 394 deletions

18
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
name: "CI"
on:
jobs:
ci:
runs-on: ubuntu-20.04
steps:
- name: Checkout this repository
uses: actions/checkout@v2.3.4
- name: Cache for arduino-ci
uses: actions/cache@v2.1.3
with:
path: |
~/.arduino15
key: ${{ runner.os }}-arduino
- name: Install nix
uses: cachix/install-nix-action@v12
- run: nix-shell -I nixpkgs=channel:nixpkgs-unstable -p arduino-ci --run "arduino-ci"

View File

@@ -1,17 +1,19 @@
# TinyMqtt # TinyMqtt
![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt) ![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
[![AUnit Tests](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml/badge.svg)](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml)
![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt) ![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![Esp8266](https://img.shields.io/badge/platform-ESP8266-green) ![Esp8266](https://img.shields.io/badge/platform-ESP8266-green)
![Esp32](https://img.shields.io/badge/platform-ESP32-green)
![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt) ![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![Mqtt 3.1.1](https://img.shields.io/badge/Mqtt-%203.1.1-yellow) ![Mqtt 3.1.1](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small, fast and capable Mqtt Broker and Client TinyMqtt is a small, fast and capable Mqtt Broker and Client for Esp8266 / Esp32 / Esp WROOM
## Features ## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two - Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost. clients that had subscribed (payload ~15 bytes ESP8266). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription) The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client - Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported - Mqtt 3.1.1 / Qos 0 supported
@@ -48,7 +50,7 @@ After a while one ESP naturally becomes a 'master' and all ESP are connected tog
No problem if the master dies, a new master will be choosen soon. No problem if the master dies, a new master will be choosen soon.
## TODO List ## TODO List
* Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP) * ~~Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)~~
* Implement zeroconf mode (needs async) * Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be * Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)

View File

@@ -3,20 +3,32 @@
/** /**
* Local broker that accept connections and two local clients * Local broker that accept connections and two local clients
* *
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
* | +-------->| broker | | 1883 <--- External client/s
* | | +--------+ |
* | | ^ |
* | | | |
* | | | | -----
* | v v | ---
* | +----------+ +----------+ | -
* | | internal | | internal | +-------* Wifi
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
* - No need to have an external broker * - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written) * - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients * - accepts external clients
* *
* cons - Takes more memory * cons - Takes more memory
* - a bit hard to understand * - a bit hard to understand
* *
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/ */
#include <my_credentials.h> #include <my_credentials.h>
@@ -37,8 +49,8 @@ void onPublishB(const MqttClient* source, const Topic& topic, const char* payloa
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
delay(500); delay(500);
Serial << "Clients with wifi " << endl; Serial << "Clients with wifi " << endl;
WiFi.mode(WIFI_STA); WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password); WiFi.begin(ssid, password);
@@ -47,8 +59,8 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin(); broker.begin();
mqtt_a.setCallback(onPublishA); mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic); mqtt_a.subscribe(topic);
@@ -58,30 +70,30 @@ void setup()
void loop() void loop()
{ {
broker.loop(); broker.loop(); // Don't forget to add loop for every broker and clients
mqtt_a.loop(); mqtt_a.loop();
mqtt_b.loop(); mqtt_b.loop();
// ============= client A publish ================ // ============= client A publish ================
static const int intervalA = 5000; // publishes every 5s static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA; static uint32_t timerA = millis() + intervalA;
if (millis() > timerA) if (millis() > timerA)
{ {
Serial << "A is publishing " << topic.c_str() << endl; Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA; timerA += intervalA;
mqtt_a.publish(topic); mqtt_a.publish(topic);
} }
// ============= client B publish ================ // ============= client B publish ================
static const int intervalB = 7000; // will send topic each 7s static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB; static uint32_t timerB = millis() + intervalB;
if (millis() > timerB) if (millis() > timerB)
{ {
Serial << "B is publishing " << topic.c_str() << endl; Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB; timerB += intervalB;
mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str()));
} }
} }

View File

@@ -1,12 +1,30 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/** TinyMQTT allows a disconnected mode: /** TinyMQTT allows a disconnected mode:
* *
* In this example, local clients A and B are talking together, no need to be connected. * +-----------------------------+
* A single ESP can use this to be able to comunicate with itself with the power * | ESP |
* of MQTT, and once connected still continue to work with others. * | +--------+ |
* * | +-------->| broker | |
*/ * | | +--------+ |
* | | ^ |
* | | | |
* | v v |
* | +----------+ +----------+ |
* | | internal | | internal | |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* In this example, local clients A and B are talking together, no need to be connected.
*
* A single ESP can use this to be able to comunicate with itself with the power
* of MQTT, and once connected still continue to work with others.
*
* The broker may still be conected if wifi is on.
*
*/
std::string topic="sensor/temperature"; std::string topic="sensor/temperature";

View File

@@ -5,6 +5,16 @@
#define PORT 1883 #define PORT 1883
MqttBroker broker(PORT); MqttBroker broker(PORT);
/** Basic Mqtt Broker
*
* +-----------------------------+
* | ESP |
* | +--------+ |
* | | broker | | 1883 <--- External client/s
* | +--------+ |
* | |
* +-----------------------------+
*/
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);

View File

@@ -1,9 +1,23 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
/** Simple Client /** Simple Client (The simplest configuration)
* *
* This is the simplest Mqtt client configuration *
* +--------+
* +------>| broker |<--- < Other client
* | +--------+
* |
* +-----------------+
* | ESP | |
* | +----------+ |
* | | internal | |
* | | client | |
* | +----------+ |
* | |
* +-----------------+
*
* 1 - edit my_credentials.h to setup wifi essid/password
* 2 - change BROKER values (or keep emqx.io test broker)
* *
* pro - small memory footprint (both ram and flash) * pro - small memory footprint (both ram and flash)
* - very simple to setup and use * - very simple to setup and use
@@ -13,6 +27,9 @@
* - local publishes takes more time (because they go outside) * - local publishes takes more time (because they go outside)
*/ */
const char* BROKER = "broker.emqx.io";
const uint16_t BROKER_PORT = 1883;
#include <my_credentials.h> #include <my_credentials.h>
static float temp=19; static float temp=19;
@@ -32,8 +49,7 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port client.connect(BROKER, BROKER_PORT); // Put here your broker ip / port
} }
void loop() void loop()

View File

@@ -1,22 +1,24 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h> #include <MqttStreaming.h>
#include <ESP8266mDNS.h> #if defined(ESP8266)
#include <ESP8266mDNS.h>
#elif defined(ESP32)
#include <ESPmDNS.h>
#else
#error Unsupported platform
#endif
#include <sstream> #include <sstream>
#include <map> #include <map>
/** /** Very complex example
* Console allowing to make any kind of test. * Console allowing to make any kind of test.
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * Upload the sketch, the use the terminal.
* - Reduces wifi traffic * Press H for mini help.
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
* *
* tested with mqtt-spy-0.5.4
* TODO examples of scripts
*/ */
#include <my_credentials.h> #include <my_credentials.h>
@@ -117,7 +119,7 @@ std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
return addr; return addr;
} }
IPAddress local=WiFi.localIP(); IPAddress local=WiFi.localIP();
addr=""; addr.clear();
while(build.size()!=4) while(build.size()!=4)
{ {
std::stringstream b; std::stringstream b;
@@ -138,10 +140,10 @@ std::set<std::string> commands = {
"auto", "broker", "blink", "client", "connect", "auto", "broker", "blink", "client", "connect",
"create", "delete", "help", "interval", "create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set", "ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "unsubscribe", "view" "publish", "reset", "subscribe", "unsubscribe", "view", "every"
}; };
void getCommand(std::string& search) void convertToCommand(std::string& search)
{ {
while(search[0]==' ') search.erase(0,1); while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return; if (search.length()==0) return;
@@ -161,7 +163,7 @@ void getCommand(std::string& search)
else if (count>1) else if (count>1)
{ {
Serial << "Ambiguous command: " << matches << endl; Serial << "Ambiguous command: " << matches << endl;
search=""; search.clear();
} }
} }
@@ -316,22 +318,457 @@ std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd) bool compare(std::string s, const char* cmd)
{ {
if (s.length()==0 or s.length()>strlen(cmd)) return false; uint8_t p=0;
return strncmp(cmd, s.c_str(), s.length())==0; while(s[p++]==*cmd++)
{
if (*cmd==0 or s[p]==0) return true;
if (s[p]==' ') return true;
}
return false;
} }
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
struct Every
{
std::string cmd;
uint32_t ms;
uint32_t next;
uint32_t underrun=0;
bool active=true;
void dump()
{
Serial << (active ? "enabled " : "disabled ");
auto mill=millis();
Serial << ms << "ms [" << cmd << "] next in ";
if (mill > next)
Serial << "now";
else
Serial << next-mill << "ms";
}
};
uint32_t blink_ms_on[16]; uint32_t blink_ms_on[16];
uint32_t blink_ms_off[16]; uint32_t blink_ms_off[16];
uint32_t blink_next[16]; uint32_t blink_next[16];
bool blink_state[16]; bool blink_state[16];
int16_t blink; int16_t blink;
std::vector<Every> everies;
void eval(std::string& cmd)
{
while(cmd.length())
{
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd.clear();
}
}
}
s = getword(cmd);
if (s.length()) convertToCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
else
{
Serial << "Unknown broker command (" << s << ")" << endl;
s.clear();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd.clear(); // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
else
{
Serial << "Unknown client command (" << s << ")" << endl;
s.clear();
}
}
else if (compare(s, "on"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, HIGH);
}
else if (compare(s, "off"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
}
else if (compare(s, "every"))
{
uint32_t ms = getint(cmd, 0);
if (ms)
{
if (cmd.length())
{
Every every;
every.ms=ms;
every.cmd=cmd;
every.next=millis()+ms;
everies.push_back(every);
every.dump();
Serial << endl;
cmd.clear();
}
}
else if (compare(cmd, "off") or compare(cmd, "on"))
{
bool active=getword(cmd)=="on";
uint8_t ever;
if (compare(cmd, "all"))
ever=100;
else
ever=getint(cmd, 99);
uint8_t count=0;
if (ever == 99)
{
Serial << "Missing every number" << endl;
}
else
{
for(auto& every: everies)
{
if (count==ever or (ever==100))
{
if (every.active != active)
{
every.active = active;
every.underrun = 0;
}
ever = 99;
break;
}
count++;
}
if (ever != 99)
{
Serial << "Every not found" << endl;
}
}
}
else if (compare(cmd, "list") or cmd.length()==0)
{
getword(cmd);
Serial << "List of everies (ms=" << millis() << ")" << endl;
uint8_t count=0;
for(auto& every: everies)
{
Serial << count << ": ";
every.dump();
Serial << endl;
count++;
}
}
else if (compare(cmd, "remove"))
{
Serial << "Removing..." << endl;
getword(cmd);
int8_t every=getint(cmd, -1);
if (every==-1 and compare(cmd, "last") and everies.size())
{
getword(cmd);
everies.erase(everies.begin()+everies.size()-1);
}
else if (every==-1 and compare(cmd, "all"))
{
getword(cmd);
everies.clear();
}
else if (everies.size() > (uint8_t)every)
{
everies.erase(everies.begin()+every);
}
else
Serial << "Bad colmmand" << endl;
}
else
Serial << "Bad command" << endl;
}
else if (compare(s, "blink"))
{
int8_t blink_nr = getint(cmd, -1);
if (blink_nr >= 0)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< blink_nr;
else
{
blink &= ~(1<< blink_nr);
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
it.second->dump(" ");
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " +-- '" << it.first.c_str() << "' " << it.second->clientsCount() << " client/s."<< endl;
it.second->dump(" ");
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " every ms [command]; every list; every remove [nr|all], every [on|off] #" << endl;
Serial << " on {output}; off {output}" << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "# MQTT ERROR " << retval << endl;
}
}
}
void loop() void loop()
{ {
auto ms=millis(); auto ms=millis();
int8_t out=1; int8_t out=0;
int16_t blink_bits = blink; int16_t blink_bits = blink;
uint8_t e=0;
for(auto& every: everies)
{
if (not every.active) continue;
if (every.ms && every.cmd.length() && ms > every.next)
{
std::string cmd(every.cmd);
eval(cmd);
every.next += every.ms;
if (ms > every.next and ms > every.underrun)
{
Serial << "Underrun every #" << e << ", " << (ms - every.next) << "ms late" << endl;
every.underrun = ms+5000;
}
}
e++;
}
while(blink_bits) while(blink_bits)
{ {
if (blink_ms_on[out] and ms > blink_next[out]) if (blink_ms_on[out] and ms > blink_next[out])
@@ -353,7 +790,9 @@ void loop()
} }
static long count; static long count;
#if defined(ESP9266)
MDNS.update(); MDNS.update();
#endif
if (MqttClient::counter != count) if (MqttClient::counter != count)
{ {
@@ -375,7 +814,6 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl; Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd; static std::string last_cmd;
if (cmd=="!") if (cmd=="!")
@@ -384,277 +822,7 @@ void loop()
last_cmd=cmd; last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd); if (cmd.substr(0,3)!="set") replaceVars(cmd);
while(cmd.length()) eval(cmd);
{
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
}
else if (compare(s, "blink"))
{
uint8_t blink_nr = getint(cmd, 0);
if (blink_nr)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< (blink_nr-1);
else
{
blink &= ~(1<<(blink_nr-1));
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
}
} }
else else
{ {

View File

@@ -1,12 +1,12 @@
{ {
"name": "TinyMqtt", "name": "TinyMqtt",
"keywords": "ethernet, mqtt, m2m, iot", "keywords": "ethernet, mqtt, m2m, iot",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages. It does support MQTT 3.1.1 with QOS=0.", "description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive and host a broker for MQTT. It does support MQTT 3.1.1 with QOS=0 on ESP8266 and ESP32 WROOM platfrms.",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.7.3", "version": "0.7.5",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,9 +1,9 @@
name=TinyMqtt name=TinyMqtt
version=0.7.3 version=0.7.5
author=Francois BIOT, HSaturn, <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP. It does support MQTT 3.1.1 with QoS=0. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP 8266 and 32 WROOM. It does support MQTT 3.1.1 with QoS=0.
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*

View File

@@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
MqttBroker::MqttBroker(uint16_t port) MqttBroker::MqttBroker(uint16_t port)
{ {
server = new AsyncServer(port); server = new TcpServer(port);
#ifdef TCP_ASYNC
server->onClient(onClient, this); server->onClient(onClient, this);
#endif
} }
MqttBroker::~MqttBroker() MqttBroker::~MqttBroker()
@@ -25,12 +27,17 @@ MqttBroker::~MqttBroker()
} }
// private constructor used by broker only // private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
: parent(parent), client(new_client) : parent(parent)
{ {
#ifdef TCP_ASYNC
client = new_client;
client->onData(onData, this); client->onData(onData, this);
// client->onConnect() TODO // client->onConnect() TODO
// client->onDisconnect() TODO // client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
#endif
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
@@ -72,33 +79,22 @@ void MqttClient::close(bool bSendDisconnect)
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{ {
debug("cnx: closing"); debug("cnx: closing");
keep_alive = ka;
close(); close();
if (client) delete client; if (client) delete client;
client = new AsyncClient; client = new TcpClient;
debug("Trying to connect to " << broker.c_str() << ':' << port); debug("Trying to connect to " << broker.c_str() << ':' << port);
// TODO This may return immediately !!! #ifdef TCP_ASYNC
// TODO so I have to add onConnect and move this code to onConnect client->onData(onData, this);
// TODO also, as this is async now, I must take care of client->onConnect(onConnect, this);
// TODO the broker that may disconnect and delete the client immediately client->connect(broker.c_str(), port);
#else
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); onConnect(this, client);
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
keep_alive = ka;
msg.add(0x00); // keep_alive
msg.add((char)keep_alive);
msg.add(clientId);
debug("cnx: mqtt connecting");
msg.sendTo(this);
msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0);
} }
#endif
} }
void MqttBroker::addClient(MqttClient* client) void MqttBroker::addClient(MqttClient* client)
@@ -134,16 +130,24 @@ void MqttBroker::removeClient(MqttClient* remove)
debug("Error cannot remove client"); // TODO should not occur debug("Error cannot remove client"); // TODO should not occur
} }
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
{ {
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr); MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client)); broker->addClient(new MqttClient(broker, client));
debug("New client #" << broker->clients.size()); debug("New client");
} }
void MqttBroker::loop() void MqttBroker::loop()
{ {
#ifndef TCP_ASYNC
WiFiClient client = server->available();
if (client)
{
onClient(this, &client);
}
#endif
if (broker) if (broker)
{ {
// TODO should monitor broker's activity. // TODO should monitor broker's activity.
@@ -163,7 +167,7 @@ void MqttBroker::loop()
} }
else else
{ {
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent); debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent);
// Note: deleting a client not added by the broker itself will probably crash later. // Note: deleting a client not added by the broker itself will probably crash later.
delete client; delete client;
break; break;
@@ -263,13 +267,45 @@ void MqttClient::loop()
client->write((const char*)(&pingreq), 2); client->write((const char*)(&pingreq), 2);
clientAlive(0); clientAlive(0);
// TODO when many MqttClient passes through a local browser // TODO when many MqttClient passes through a local broker
// there is no need to send one PingReq per instance. // there is no need to send one PingReq per instance.
} }
} }
#ifndef TCP_ASYNC
while(client && client->available()>0)
{
message.incoming(client->read());
if (message.type())
{
processMessage(&message);
message.reset();
}
}
#endif
} }
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
{
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
debug("cnx: connecting");
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
msg.add(0x00); // keep_alive
msg.add((char)mqtt->keep_alive);
msg.add(mqtt->clientId);
debug("cnx: mqtt connecting");
msg.sendTo(mqtt);
msg.reset();
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent);
mqtt->clientAlive(0);
}
#ifdef TCP_ASYNC
void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len)
{ {
char* char_ptr = static_cast<char*>(data); char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr); MqttClient* client=static_cast<MqttClient*>(client_ptr);
@@ -284,6 +320,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
len--; len--;
} }
} }
#endif
void MqttClient::resubscribe() void MqttClient::resubscribe()
{ {
@@ -360,7 +397,7 @@ void MqttClient::processMessage(const MqttMessage* mesg)
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{ {
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming"); // mesg->hexdump("Incoming");
} }
#endif #endif
@@ -497,6 +534,11 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
} }
break; break;
case MqttMessage::Type::UnSuback:
if (!mqtt_connected) break;
bclose = false;
break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
if (mqtt_connected or client == nullptr) if (mqtt_connected or client == nullptr)
{ {

View File

@@ -1,6 +1,26 @@
#pragma once #pragma once
#include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h> // TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC
// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
#if defined(ESP8266) || defined(EPOXY_DUINO)
#ifdef TCP_ASYNC
#include <ESPAsyncTCP.h>
#else
#include <ESP8266WiFi.h>
#endif
#elif defined(ESP32)
#ifdef TCP_ASYNC
#include <AsyncTCP.h> // https://github.com/me-no-dev/AsyncTCP
#else
#include <WiFi.h>
#endif
#endif
#ifdef EPOXY_DUINO
#define dbg_ptr uint64_t
#else
#define dbg_ptr uint32_t
#endif
#include <vector> #include <vector>
#include <set> #include <set>
#include <string> #include <string>
@@ -15,6 +35,14 @@
#define debug(what) {} #define debug(what) {}
#endif #endif
#ifdef TCP_ASYNC
using TcpClient = AsyncClient;
using TcpServer = AsyncServer;
#else
using TcpClient = WiFiClient;
using TcpServer = WiFiServer;
#endif
enum MqttError enum MqttError
{ {
MqttOk = 0, MqttOk = 0,
@@ -49,6 +77,7 @@ class MqttMessage
Subscribe = 0x80, Subscribe = 0x80,
SubAck = 0x90, SubAck = 0x90,
UnSubscribe = 0xA0, UnSubscribe = 0xA0,
UnSuback = 0xB0,
PingReq = 0xC0, PingReq = 0xC0,
PingResp = 0xD0, PingResp = 0xD0,
Disconnect = 0xE0 Disconnect = 0xE0
@@ -160,35 +189,40 @@ class MqttClient
// TODO seems to be useless // TODO seems to be useless
bool isLocal() const { return client == nullptr; } bool isLocal() const { return client == nullptr; }
void dump() void dump(std::string indent="")
{ {
uint32_t ms=millis(); uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") " << (connected() ? " ON " : " OFF"); Serial << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF");
Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive; Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive << ' ';
Serial << (client && client->connected() ? "" : "dis") << "connected"; Serial << (client && client->connected() ? "" : "dis") << "connected";
message.hexdump("entrant msg"); if (subscriptions.size())
bool c=false;
Serial << " [";
for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); bool c = false;
c=true; Serial << " [";
for(auto s: subscriptions)
{
if (c) Serial << ", ";
Serial << s.str().c_str();
c=true;
}
Serial << ']';
} }
Serial << endl;
Serial << "]" << endl;
} }
/** Count the number of messages that have been sent **/ /** Count the number of messages that have been sent **/
static long counter; static long counter;
private: private:
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); static void onConnect(void * client_ptr, TcpClient*);
#ifdef TCP_ASYNC
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
#endif
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe(); void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, AsyncClient* client); MqttClient(MqttBroker* parent, TcpClient* client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
@@ -206,7 +240,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here) // (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
AsyncClient* client=nullptr; // connection to mqtt client or to remote broker TcpClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
@@ -233,20 +267,16 @@ class MqttBroker
size_t clientsCount() const { return clients.size(); } size_t clientsCount() const { return clients.size(); }
void dump() void dump(std::string indent="")
{ {
Serial << clients.size() << " client/s" << endl;
for(auto client: clients) for(auto client: clients)
{ client->dump(indent);
Serial << " ";
client->dump();
}
} }
private: private:
friend class MqttClient; friend class MqttClient;
static void onClient(void*, AsyncClient*); static void onClient(void*, TcpClient*);
bool checkUser(const char* user, uint8_t len) const bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); } { return compareString(auth_user, user, len); }
@@ -264,7 +294,7 @@ class MqttBroker
bool compareString(const char* good, const char* str, uint8_t str_len) const; bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients; std::vector<MqttClient*> clients;
AsyncServer* server; TcpServer* server;
const char* auth_user = "guest"; const char* auth_user = "guest";
const char* auth_password = "guest"; const char* auth_password = "guest";