Compare commits

..

51 Commits
0.3.0 ... 0.7.0

Author SHA1 Message Date
hsaturn
82c5b971e9 Release 0.7.0 2021-03-28 23:34:54 +02:00
hsaturn
01998e74ec Test linter 2021-03-28 22:59:42 +02:00
hsaturn
5f46fd304c Release 0.7.0 2021-03-28 22:47:13 +02:00
hsaturn
213d637eaf Code cleaning 2021-03-28 21:31:10 +02:00
hsaturn
3bb2dd5a81 Code cleaning 2021-03-28 21:29:02 +02:00
hsaturn
7d9ab6381d Disconnect added (finally) 2021-03-28 21:28:06 +02:00
hsaturn
470cde62da Version 0.6.0 2021-03-27 10:26:41 +01:00
hsaturn
3fb9b6317d README.md modified (typo) 2021-03-27 02:00:49 +01:00
hsaturn
ee9ad93bfd README.md modified 2021-03-27 02:00:00 +01:00
hsaturn
0b1a932244 MqttClient resubscribe on reconnect 2021-03-27 01:55:03 +01:00
hsaturn
972759237c Speed and stability improved 2021-03-27 01:40:49 +01:00
hsaturn
cb00d7f82a MqttClient / UnSubscribe message implemented 2021-03-27 01:39:37 +01:00
hsaturn
0b735d22a5 Less debug as the code is more stable now 2021-03-27 01:38:32 +01:00
hsaturn
9178aac02c MqttClient client length augmented to 60 (was not passing MqttBox tests 2021-03-26 01:59:08 +01:00
hsaturn
c706fbcff2 Update readme 2021-03-26 01:52:31 +01:00
hsaturn
a0c41a0ccb Smart ip feature for connect 2021-03-26 01:51:30 +01:00
hsaturn
b780dcf99c test fix, was unable to use set when replacements occurs 2021-03-26 01:10:33 +01:00
hsaturn
f122d5e902 relase 0.5.1 2021-03-25 01:26:27 +01:00
hsaturn
d63793cf77 Avoid to use message member, minor changes 2021-03-25 01:26:03 +01:00
hsaturn
8386779e92 tinytest great enhancements 2021-03-25 01:24:46 +01:00
hsaturn
1b988a06a2 Relase 0.5.0 2021-03-24 21:21:46 +01:00
hsaturn
d92aa1fe3c Minor change to publish 2021-03-24 21:20:07 +01:00
hsaturn
0d6e194560 test client enhancements 2021-03-24 21:19:44 +01:00
hsaturn
7107da2cce Client supports (does not disconnect) Suback / Puback 2021-03-24 21:18:27 +01:00
hsaturn
28c8713415 Client keep_alive is now parameterized 2021-03-24 21:17:08 +01:00
hsaturn
70cf8137de Fixed build of client-without-wifi 2021-03-24 18:35:57 +01:00
hsaturn
5ab315e472 Removed dependency with Streaming.h 2021-03-24 18:35:11 +01:00
hsaturn
b96b36f10c README update 2021-03-24 01:33:45 +01:00
hsaturn
ba831ea366 README update 2021-03-24 01:32:47 +01:00
hsaturn
4020393f90 MqttClient can subscribe and receive publishes from distant broker 2021-03-24 01:30:56 +01:00
hsaturn
7b20e7deb5 Supports multiple subscriptions 2021-03-23 23:51:33 +01:00
hsaturn
efe6a05bbd MqttStreaming.h, streaming with fixes 2021-03-23 23:41:00 +01:00
hsaturn
84dbb80106 Release 0.4.0 2021-03-22 02:45:57 +01:00
hsaturn
47bc06f0ce removed need of Streaming.h if no debug 2021-03-22 02:44:30 +01:00
hsaturn
07c96c19a5 Better simple-client example 2021-03-22 02:35:34 +01:00
hsaturn
de8813f9f6 No more serial prints 2021-03-22 02:35:10 +01:00
hsaturn
fbc24c94e3 MqttClient::publish with String added 2021-03-22 02:34:45 +01:00
hsaturn
169abf8099 allow MqttClient to be constructed with nothing 2021-03-22 02:34:23 +01:00
hsaturn
5cee67095e Fix payload content 2021-03-22 02:33:54 +01:00
hsaturn
0cb2e99b4b Better debug defines 2021-03-22 02:32:45 +01:00
hsaturn
54c905a32f API Changed
Fix too long time
2021-03-22 02:10:54 +01:00
hsaturn
befab9dd6e More TODOs (happy betas) 2021-03-22 01:59:40 +01:00
hsaturn
bd2e7cc5f6 Fix crash on MqttClient timeout when not linked to a broker 2021-03-22 01:59:17 +01:00
hsaturn
18b5f0c27b Better client creation 2021-03-22 01:19:50 +01:00
hsaturn
e71a4d5e87 MqttClient was unable to publish in some cases 2021-03-22 01:19:26 +01:00
hsaturn
620dbf31af Rewrite interpreter, can handle brokers now 2021-03-22 00:28:05 +01:00
hsaturn
52690ec7e7 Fix some rare case crashes 2021-03-22 00:27:23 +01:00
hsaturn
9f28e7f92f Version 0.3.0 library files 2021-03-21 19:34:40 +01:00
hsaturn
ed9efbb5ce Removed buffer 256 thus less memory is needed for MqttClient instances 2021-03-21 17:21:36 +01:00
hsaturn
4fd34bfffa More commands, and dot notation added 2021-03-21 16:34:14 +01:00
hsaturn
7be4d86f46 TODO list changed 2021-03-21 16:33:53 +01:00
16 changed files with 1357 additions and 280 deletions

25
.github/workflows/superlinter.yml vendored Normal file
View File

@@ -0,0 +1,25 @@
name: Super-Linter
# Run this workflow every time a new commit pushed to your repository
on: push
jobs:
# Set the job key. The key is displayed as the job name
# when a job name is not provided
super-lint:
# Name the Job
name: Lint code base
# Set the type of machine to run on
runs-on: ubuntu-latest
steps:
# Checks out a copy of your repository on the ubuntu-latest machine
- name: Checkout code
uses: actions/checkout@v2
# Runs the Super-Linter action
- name: Run Super-Linter
uses: github/super-linter@v3
env:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

1
.gitignore vendored
View File

@@ -1 +1,2 @@
*~ *~
src/my_credentials.h

View File

@@ -2,14 +2,17 @@
![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt) ![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![](https://img.shields.io/github/issues/hsaturn/TinyMqtt) ![](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/paltform-ESP8266-green) ![](https://img.shields.io/badge/platform-ESP8266-green)
![](https://img.shields.io/github/license/hsaturn/TinyMqtt) ![](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow) ![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small and very capable Mqtt Broker and Client ESP 8266 is a small, fast and capable Mqtt Broker and Client
## Features ## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client - Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported - Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode) - Standalone (can work without WiFi) (degraded/local mode)
@@ -23,12 +26,18 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
* Implement zeroconf mode (needs async) * Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be * Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed. * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* ~~MqttClient does not support more than one subscription at time~~
* ~~MqttClient auto re-subscribe (::resubscribe works bad on broker.emqx.io)~~
* MqttClient auto reconnection
* ~~MqttClient unsubscribe~~
* MqttClient does not sent payload to callback...
* MqttClient user/password
* Wildcards (I may implement only # as I'm not interrested by a clever and cpu consuming matching)
## Quickstart ## Quickstart
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt) * install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup) * modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)

View File

@@ -1,10 +1,7 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
// TODO should be renamed to most-complete setup
/** /**
* Local broker that accept connections * Local broker that accept connections and two local clients
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
@@ -31,11 +28,11 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker); MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker); MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length) void onPublishA(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; } { Serial << endl << "---------> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length) void onPublishB(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; } { Serial << endl << "---------> B Received " << topic.c_str() << endl; }
void setup() void setup()
{ {
@@ -67,7 +64,7 @@ void loop()
mqtt_b.loop(); mqtt_b.loop();
// ============= client A publish ================ // ============= client A publish ================
static const int intervalA = 50000; static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA; static uint32_t timerA = millis() + intervalA;
if (millis() > timerA) if (millis() > timerA)
@@ -78,7 +75,7 @@ void loop()
} }
// ============= client B publish ================ // ============= client B publish ================
static const int intervalB = 30000; // will send topic each 5000 ms static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB; static uint32_t timerB = millis() + intervalB;
if (millis() > timerB) if (millis() > timerB)

View File

@@ -1,5 +1,4 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** TinyMQTT allows a disconnected mode: /** TinyMQTT allows a disconnected mode:
* *
@@ -15,10 +14,10 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker); MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker); MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length) void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; } { Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length) void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; } { Serial << "--> B Received " << topic.c_str() << endl; }
void setup() void setup()

View File

@@ -1,5 +1,4 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <my_credentials.h> #include <my_credentials.h>

View File

@@ -1,6 +1,5 @@
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** Simple Client /** Simple Client
* *
@@ -16,6 +15,9 @@
#include <my_credentials.h> #include <my_credentials.h>
static float temp=19;
static MqttClient client;
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
@@ -29,8 +31,22 @@ void setup()
{ delay(500); Serial << '.'; } { delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port
} }
void loop() void loop()
{ {
client.loop();
delay(1000);
auto rnd=random(100);
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
client.publish("sensor/temperature", String(temp));
} }

View File

@@ -0,0 +1,39 @@
// vim: ts=40
Exemple of commands that can be sent via the serial monitor to tinymqtt-test
----------------------------------------------------------------------------
Commands can usually be abbreviated to their first letters.
ex: cl for client, a / a.con / a.sub / a.p for publish.
--------
set name value set variable name to value (later replaced)
set name if no value, then var is erased
set view all vars
reserved keywords are forbidden
client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883
a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic
delete a destroy the client
* note, if 'server' is a number, then it replaces the end of the local ip.
i.e. if local ip is 192.168.1.10, connect 2.35 becomes 192.168.2.35
--------
example:
--------
client c
c.connect broker.emqx.io
set topic sensor/temperature
c.subscribe topic
c.publish topic 15
c.publish topic 20
macro exansion example
----------------------
set temp publish sensor/temperature
c.temp 20 -> c.publish sensor/temperature 20

View File

@@ -1,9 +1,11 @@
#define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming #include <MqttStreaming.h>
#include <sstream>
#include <map> #include <map>
/** /**
* Local broker that accept connections * Console allowing to make any kind of test.
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
@@ -14,28 +16,33 @@
* cons - Takes more memory * cons - Takes more memory
* - a bit hard to understand * - a bit hard to understand
* *
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/ */
#include <my_credentials.h> #include <my_credentials.h>
std::string topic="sensor/temperature"; std::string topic="sensor/temperature";
MqttBroker broker(1883);
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; } {
Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str();
if (payload) Serial << ", payload[" << length << "]=[";
while(length--)
{
const char c=*payload++;
if (c!=10 and c!=13 and c <32) Serial << '?';
Serial << *payload++;
}
Serial<< endl;
}
std::map<std::string, MqttClient*> clients;
std::map<std::string, MqttBroker*> brokers;
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
delay(500); delay(500);
Serial << endl << endl << endl Serial << endl << endl << endl
<< "Demo started. Type help for more..." << endl
<< "Connecting to '" << ssid << "' "; << "Connecting to '" << ssid << "' ";
WiFi.mode(WIFI_STA); WiFi.mode(WIFI_STA);
@@ -45,22 +52,142 @@ void setup()
{ Serial << '-'; delay(500); } { Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
Serial << "Type help for more..." << endl;
broker.begin(); MqttBroker* broker = new MqttBroker(1883);
broker->begin();
brokers["broker"] = broker;
} }
std::string getword(std::string& str, const char* if_empty=nullptr) int getint(std::string& str, const int if_empty=0)
{ {
std::string sword; std::string sword;
while(str.length() && str[0]!=' ') while(str.length() && str[0]>='0' && str[0]<='9')
{ {
sword += str[0]; str.erase(0,1); sword += str[0]; str.erase(0,1);
} }
while(str[0]==' ') str.erase(0,1); while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty; if (if_empty and sword.length()==0) return if_empty;
return atoi(sword.c_str());
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string sword;
while(str.length() && str[0]!=sep)
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return sword; return sword;
} }
bool isaddr(std::string s)
{
if (s.length()==0 or s.length()>3) return false;
for(char c: s)
if (c<'0' or c>'9') return false;
return true;
}
std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string addr=getword(str, if_empty, sep);
std::string ip=addr;
std::vector<std::string> build;
while(ip.length())
{
std::string b=getword(ip,nullptr,'.');
if (isaddr(b) && build.size()<4)
{
build.push_back(b);
}
else
return addr;
}
IPAddress local=WiFi.localIP();
addr="";
while(build.size()!=4)
{
std::stringstream b;
b << (int)local[3-build.size()];
build.insert(build.begin(), b.str());
}
for(std::string s: build)
{
if (addr.length()) addr += '.';
addr += s;
}
return addr;
}
std::map<std::string, std::string> vars;
std::set<std::string> commands = {
"auto", "broker", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "view"
};
void getCommand(std::string& search)
{
while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return;
std::string matches;
int count=0;
for(std::string cmd: commands)
{
if (cmd.substr(0, search.length()) == search)
{
if (count) matches +=", ";
count++;
matches += cmd;
}
}
if (count==1)
search = matches;
else if (count>1)
{
Serial << "Ambiguous command: " << matches << endl;
search="";
}
}
void replace(const char* d, std::string& str, std::string srch, std::string to)
{
if (d[0] && d[1])
{
srch=d[0]+srch+d[1];
to=d[0]+to+d[1];
size_t pos = 0;
while((pos=str.find(srch, pos)) != std::string::npos)
{
str.erase(pos, srch.length());
str.insert(pos, to);
pos += to.length();
}
}
}
void replaceVars(std::string& cmd)
{
cmd = ' '+cmd+' ';
for(auto it: vars)
{
replace("..", cmd, it.first, it.second);
replace(". ", cmd, it.first, it.second);
replace(" .", cmd, it.first, it.second);
replace(" ", cmd, it.first, it.second);
}
cmd.erase(0, cmd.find_first_not_of(" "));
cmd.erase(cmd.find_last_not_of(" ")+1);
}
// publish at regular interval
class automatic class automatic
{ {
public: public:
@@ -128,7 +255,7 @@ class automatic
autop->bon=false; autop->bon=false;
else if (s=="interval") else if (s=="interval")
{ {
int32_t i=atol(getword(cmd).c_str()); int32_t i=getint(cmd);
if (i) if (i)
autop->interval(atol(s.c_str())); autop->interval(atol(s.c_str()));
else else
@@ -162,6 +289,7 @@ class automatic
{ {
Serial << " auto [$id] on/off" << endl; Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl; Serial << " auto [$id] view" << endl;
Serial << " auto [$id] interval [s]" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl; Serial << " auto [$id] create [millis] [topic]" << endl;
} }
@@ -172,6 +300,7 @@ class automatic
std::string topic_; std::string topic_;
bool bon=false; bool bon=false;
static std::map<MqttClient*, automatic*> autos; static std::map<MqttClient*, automatic*> autos;
float temp=19;
}; };
std::map<MqttClient*, automatic*> automatic::autos; std::map<MqttClient*, automatic*> automatic::autos;
@@ -181,35 +310,18 @@ bool compare(std::string s, const char* cmd)
return strncmp(cmd, s.c_str(), s.length())==0; return strncmp(cmd, s.c_str(), s.length())==0;
} }
std::map<std::string, MqttClient*> clients;
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false)
{
std::string s=getword(cmd);
bool found = clients.find(s) != clients.end();
if (canBeNull && found==false)
{
cmd += ' ' + s;
}
if (found or canBeNull)
{
MqttClient* publish = publish = clients[s];
func(cmd, publish);
}
else
{
Serial << "client not found (" << s.c_str() << ")" << endl;
cmd="";
}
}
void loop() void loop()
{ {
broker.loop(); static long count;
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers)
it.second->loop();
for(auto it: clients) for(auto it: clients)
it.second->loop(); it.second->loop();
@@ -223,90 +335,246 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "------------------------------------------------------" << endl;
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
while(cmd.length()) while(cmd.length())
{ {
std::string s = getword(cmd); MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
clients.erase(s);
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
Serial << (int32_t)it.second << '/' << (int32_t)broker << endl;
if (broker != it.second) continue;
Serial << "deleted" << endl;
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect")) if (compare(s,"connect"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) Serial << "NYI" << endl;
{ publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); });
}
else if (compare(s,"publish"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->publish(getword(cmd, topic.c_str())); });
}
else if (compare(s,"subscribe"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->subscribe(getword(cmd, topic.c_str())); });
} }
else if (compare(s, "view")) else if (compare(s, "view"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) broker->dump();
{ publish->dump(); }); }
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
} }
else if (compare(s, "auto")) else if (compare(s, "auto"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) automatic::command(client, cmd);
{ automatic::command(publish, cmd); if (client == nullptr)
if (publish == nullptr)
cmd.clear(); cmd.clear();
}, true);
} }
else if (compare(s, "new")) else if (compare(s, "broker"))
{ {
std::string id=getword(cmd); std::string id=getword(cmd);
if (id.length()) if (id.length() or brokers.find(id)!=brokers.end())
{ {
MqttClient* client = new MqttClient(&broker); int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id); client->id(id);
clients[id]=client; clients[id]=client;
client->setCallback(onPublish); client->setCallback(onPublish);
client->subscribe(topic); client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
} }
else else
Serial << "missing id" << endl; Serial << "Missing or existing client name" << endl;
cmd+=" ls"; cmd+=" ls";
} }
else if (compare(s, "delete")) else if (compare(s, "set"))
{ {
s = getword(cmd); std::string name(getword(cmd));
auto it=clients.find(s); if (name.length()==0)
if (it != clients.end())
{ {
delete it->second; for(auto it: vars)
clients.erase(it); {
cmd+=" ls"; Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
} }
else else
Serial << "Unknown client (" << s.c_str() << ")" << endl;
}
else if (compare(s, "ls"))
{ {
Serial << "main : " << clients.size() << " client/s." << endl; if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients) for(auto it: clients)
{ {
Serial << " "; it.second->dump(); Serial << " "; it.second->dump();
} }
broker.dump();
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
} }
else if (compare(s, "reset")) else if (compare(s, "reset"))
ESP.restart(); ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help")) else if (compare(s,"help"))
{ {
Serial << "syntax:" << endl; Serial << "syntax:" << endl;
Serial << " new/delete $id" << endl; Serial << " MqttBroker:" << endl;
Serial << " connect $id [ip]" << endl; Serial << " broker {name} {port} : create a new broker" << endl;
Serial << " subscribe $id [topic]" << endl; Serial << endl;
Serial << " publish $id [topic]" << endl; Serial << " MqttClient:" << endl;
Serial << " view $id " << endl; Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help(); automatic::help();
Serial << endl; Serial << endl;
Serial << " help" << endl; Serial << " help" << endl;
Serial << " ls" << endl; Serial << " ls / ip / reset" << endl;
Serial << " reset" << endl; Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl; Serial << endl;
Serial << " $id : name of the client." << endl; Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl; Serial << " default topic is '" << topic.c_str() << "'" << endl;
@@ -314,8 +582,15 @@ void loop()
} }
else else
{ {
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl; Serial << "Unknown command (" << s.c_str() << ")" << endl;
} }
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
} }
} }
else else

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.2", "version": "0.7.0",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,9 +1,9 @@
name=TinyMqtt name=TinyMqtt
version=0.2.0 version=0.7.0
author=HSaturn <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages and to jhost a broker in your ESP. It does support MQTT 3.1.1 without any QOS.
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*

412
src/MqttStreaming.h Normal file
View File

@@ -0,0 +1,412 @@
/* MqttStreaming.h - Fork of Streaming.h adding std::string and with some minor fixes
* (I have to speek to the author in order to include my changes to his library if possible)
**/
/*
Streaming.h - Arduino library for supporting the << streaming operator
Copyright (c) 2010-2012 Mikal Hart. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Version 6 library changes
Copyright (c) 2019 Gazoodle. All rights reserved.
1. _BASED moved to template to remove type conversion to long and
sign changes which break int8_t and int16_t negative numbers.
The print implementation still upscales to long for it's internal
print routine.
2. _PAD added to allow padding & filling of characters to the stream
3. _WIDTH & _WIDTHZ added to allow width printing with space padding
and zero padding for numerics
4. Simple _FMT mechanism ala printf, but without the typeunsafetyness
and no internal buffers for replaceable stream printing
*/
#ifndef ARDUINO_STREAMING
#define ARDUINO_STREAMING
#if defined(ARDUINO) && ARDUINO >= 100
#include "Arduino.h"
#else
#ifndef STREAMING_CONSOLE
#include "WProgram.h"
#endif
#endif
#include <string>
#if defined(ARDUINO_ARCH_AVR) || defined(ARDUINO_ARCH_MEGAAVR)
// No stl library, so need trivial version of std::is_signed ...
namespace std {
template<typename T>
struct is_signed { static const bool value = false; };
template<>
struct is_signed<int8_t> { static const bool value = true; };
template<>
struct is_signed<int16_t> { static const bool value = true; };
template<>
struct is_signed<int32_t> { static const bool value = true; };
};
#else
#include <type_traits>
#endif
#define STREAMING_LIBRARY_VERSION 6
#if !defined(typeof)
#define typeof(x) __typeof__(x)
#endif
// PrintBuffer implementation of Print, a small buffer to print in
// see its use with pad_float()
template <size_t N>
class PrintBuffer : public Print
{
size_t pos = 0;
char str[N] {};
public:
inline const char *operator() ()
{ return str; };
// inline void clear()
// { pos = 0; str[0] = '\0'; };
inline size_t write(uint8_t c)
{ return write(&c, 1); };
inline size_t write(const uint8_t *buffer, size_t size)
{
size_t s = std::min(size, N-1 - pos); // need a /0 left
if (s)
{
memcpy(&str[pos], buffer, s);
pos += s;
}
return s;
};
};
// Generic template
template<class T>
inline Print &operator <<(Print &stream, const T &arg)
{ stream.print(arg); return stream; }
// TODO sfinae maybe could do the trick ?
inline Print &operator <<(Print &stream, const std::string &str)
{ stream.print(str.c_str()); return stream; }
template<typename T>
struct _BASED
{
T val;
int base;
_BASED(T v, int b): val(v), base(b)
{}
};
#if ARDUINO >= 100
struct _BYTE_CODE
{
byte val;
_BYTE_CODE(byte v) : val(v)
{}
};
#define _BYTE(a) _BYTE_CODE(a)
inline Print &operator <<(Print &obj, const _BYTE_CODE &arg)
{ obj.write(arg.val); return obj; }
#else
#define _BYTE(a) _BASED<typeof(a)>(a, BYTE)
#endif
#define _HEX(a) _BASED<typeof(a)>(a, HEX)
#define _DEC(a) _BASED<typeof(a)>(a, DEC)
#define _OCT(a) _BASED<typeof(a)>(a, OCT)
#define _BIN(a) _BASED<typeof(a)>(a, BIN)
// Specialization for class _BASED
// Thanks to Arduino forum user Ben Combee who suggested this
// clever technique to allow for expressions like
// Serial << _HEX(a);
template<typename T>
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
{ obj.print(arg.val, arg.base); return obj; }
#if ARDUINO >= 18
// Specialization for class _FLOAT
// Thanks to Michael Margolis for suggesting a way
// to accommodate Arduino 0018's floating point precision
// feature like this:
// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision
struct _FLOAT
{
double val; // only Print::print(double)
int digits;
_FLOAT(double v, int d): val(v), digits(d)
{}
};
inline Print &operator <<(Print &obj, const _FLOAT &arg)
{ obj.print(arg.val, arg.digits); return obj; }
#endif
// Specialization for enum _EndLineCode
// Thanks to Arduino forum user Paul V. who suggested this
// clever technique to allow for expressions like
// Serial << "Hello!" << endl;
enum _EndLineCode { endl };
inline Print &operator <<(Print &obj, _EndLineCode)
{ obj.println(); return obj; }
// Specialization for padding & filling, mainly utilized
// by the width printers
//
// Use like
// Serial << _PAD(10,' '); // Will output 10 spaces
// Serial << _PAD(4, '0'); // Will output 4 zeros
struct _PAD
{
int8_t width;
char chr;
_PAD(int8_t w, char c) : width(w), chr(c) {}
};
inline Print &operator <<(Print& stm, const _PAD &arg)
{
for(int8_t i = 0; i < arg.width; i++)
stm.print(arg.chr);
return stm;
}
// Specialization for width printing
//
// Use like Result
// -------- ------
// Serial << _WIDTH(1,5) " 1"
// Serial << _WIDTH(10,5) " 10"
// Serial << _WIDTH(100,5) " 100"
// Serial << _WIDTHZ(1,5) "00001"
//
// Great for times & dates, or hex dumps
//
// Serial << _WIDTHZ(hour,2) << ':' << _WIDTHZ(min,2) << ':' << _WIDTHZ(sec,2)
//
// for(int index=0; index<byte_array_size; index++)
// Serial << _WIDTHZ(_HEX(byte_array[index]))
template<typename T>
struct __WIDTH
{
const T val;
int8_t width;
char pad;
__WIDTH(const T& v, int8_t w, char p) : val(v), width(w), pad(p) {}
};
// Count digits in an integer of specific base
template<typename T>
inline uint8_t digits(T v, int8_t base = 10)
{
uint8_t digits = 0;
if ( std::is_signed<T>::value )
{
if ( v < 0 )
{
digits++;
v = -v; // v needs to be postive for the digits counter to work
}
}
do
{
v /= base;
digits++;
} while( v > 0 );
return digits;
}
// Generic get the width of a value in base 10
template<typename T>
inline uint8_t get_value_width(T val)
{ return digits(val); }
inline uint8_t get_value_width(const char * val)
{ return strlen(val); }
#ifdef ARDUINO
inline uint8_t get_value_width(const __FlashStringHelper * val)
{ return strlen_P(reinterpret_cast<const char *>(val)); }
#endif
// _BASED<T> get the width of a value
template<typename T>
inline uint8_t get_value_width(_BASED<T> b)
{ return digits(b.val, b.base); }
// Constructor wrapper to allow automatic template parameter deduction
template<typename T>
__WIDTH<T> _WIDTH(T val, int8_t width) { return __WIDTH<T>(val, width, ' '); }
template<typename T>
__WIDTH<T> _WIDTHZ(T val, int8_t width) { return __WIDTH<T>(val, width, '0'); }
// Operator overload to handle width printing.
template<typename T>
inline Print &operator <<(Print &stm, const __WIDTH<T> &arg)
{ stm << _PAD(arg.width - get_value_width(arg.val), arg.pad) << arg.val; return stm; }
// explicit Operator overload to handle width printing of _FLOAT, double and float
template<typename T>
inline Print &pad_float(Print &stm, const __WIDTH<T> &arg, const double val, const int digits = 2) // see Print::print(double, int = 2)
{
PrintBuffer<32> buf; // it's only ~45B on the stack, no allocation, leak or fragmentation
size_t size = buf.print(val, digits); // print in buf
return stm << _PAD(arg.width - size, arg.pad) << buf(); // pad and concat what's in buf
}
inline Print &operator <<(Print &stm, const __WIDTH<float> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<double> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<_FLOAT> &arg)
{ auto& f = arg.val; return pad_float(stm, arg, f.val, f.digits); }
// a less verbose _FLOATW for _WIDTH(_FLOAT)
#define _FLOATW(val, digits, width) _WIDTH<_FLOAT>(_FLOAT((val), (digits)), (width))
// Specialization for replacement formatting
//
// Designed to be similar to printf that everyone knows and loves/hates. But without
// the internal buffers and type agnosticism. This version only has placeholders in
// the format string, the actual values are supplied using the stream safe operators
// defined in this library.
//
// Use like this:
//
// Serial << FMT(F("Replace % with %"), 1, 2 )
// Serial << FMT("Time is %:%:%", _WIDTHZ(hours,2), _WIDTHZ(minutes,2), _WIDTHZ(seconds,2))
// Serial << FMT("Your score is %\\%", score); // Note the \\ to escape the % sign
// Ok, hold your hats. This is a foray into C++11's variadic template engine ...
inline char get_next_format_char(const char *& format_string)
{
char format_char = *format_string;
if ( format_char > 0 ) format_string++;
return format_char;
}
#ifdef ARDUINO
inline char get_next_format_char(const __FlashStringHelper*& format_string)
{
char format_char = pgm_read_byte(format_string);
if ( format_char > 0 ) format_string = reinterpret_cast<const __FlashStringHelper*>(reinterpret_cast<const char *>(format_string)+1);
return format_char;
}
#endif
template<typename Ft>
inline bool check_backslash(char& format_char, Ft& format_string)
{
if ( format_char == '\\')
{
format_char = get_next_format_char(format_string);
return true;
}
return false;
}
// The template tail printer helper
template<typename Ft, typename... Ts>
struct __FMT
{
Ft format_string;
__FMT(Ft f, Ts ... args) : format_string(f) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
check_backslash(c, format);
if ( c )
stm.print(c);
}
}
};
// The variadic template helper
template<typename Ft, typename T, typename... Ts>
struct __FMT<Ft, T, Ts...> : __FMT<Ft, Ts...>
{
T val;
__FMT(Ft f, T t, Ts... ts) : __FMT<Ft, Ts...>(f, ts...), val(t) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
if (!check_backslash(c, format))
{
if ( c == '%')
{
stm << val;
// Variadic recursion ... compiler rolls this out during
// template argument pack expansion
__FMT<Ft, Ts...>::tstreamf(stm, format);
return;
}
}
if (c)
stm.print(c);
}
}
};
// The actual operator should you only instanciate the FMT
// helper with a format string and no parameters
template<typename Ft, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// The variadic stream helper
template<typename Ft, typename T, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, T, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// As we don't have C++17, we can't get a constructor to use
// automatic argument deduction, but ... this little trick gets
// around that ...
template<typename Ft, typename... Ts>
__FMT<Ft, Ts...> _FMT(Ft format, Ts ... args) { return __FMT<Ft, Ts...>(format, args...); }
#endif

View File

@@ -2,7 +2,6 @@
#include <map> #include <map>
#include <string> #include <string>
#include <string.h> #include <string.h>
#include <Streaming.h>
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
/*** /***
@@ -36,7 +35,7 @@ class StringIndexer
{ {
strings[index].str = std::string(str, len); strings[index].str = std::string(str, len);
strings[index].used++; strings[index].used++;
Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl; // Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
return index; return index;
} }
} }
@@ -66,7 +65,7 @@ class StringIndexer
if (it->second.used == 0) if (it->second.used == 0)
{ {
strings.erase(it); strings.erase(it);
Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl; // Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
} }
} }
} }
@@ -105,7 +104,7 @@ class IndexedString
const std::string& str() const { return StringIndexer::str(index); } const std::string& str() const { return StringIndexer::str(index); }
const StringIndexer::index_t getIndex() const { return index; } const StringIndexer::index_t& getIndex() const { return index; }
private: private:
StringIndexer::index_t index; StringIndexer::index_t index;

View File

@@ -1,12 +1,5 @@
#include "TinyMqtt.h" #include "TinyMqtt.h"
#include <sstream> #include <sstream>
#include <Streaming.h>
#if 1
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
void outstring(const char* prefix, const char*p, uint16_t len) void outstring(const char* prefix, const char*p, uint16_t len)
{ {
@@ -20,10 +13,19 @@ MqttBroker::MqttBroker(uint16_t port) : server(port)
{ {
} }
MqttBroker::~MqttBroker()
{
while(clients.size())
{
delete clients[0];
}
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent) : parent(parent)
{ {
client = new_client ? new WiFiClient(new_client) : nullptr; client = new WiFiClient(new_client);
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
@@ -32,21 +34,27 @@ MqttClient::MqttClient(MqttBroker* parent)
{ {
client = nullptr; client = nullptr;
parent->addClient(this); if (parent) parent->addClient(this);
} }
MqttClient::~MqttClient() MqttClient::~MqttClient()
{ {
close(); close();
delete client; delete client;
Serial << "Client deleted" << endl;
} }
void MqttClient::close() void MqttClient::close(bool bSendDisconnect)
{ {
debug("close " << id().c_str()); debug("close " << id().c_str());
mqtt_connected = false; mqtt_connected = false;
if (client) if (client)
{ {
if (bSendDisconnect and client->connected())
{
message.create(MqttMessage::Type::Disconnect);
message.sendTo(this);
}
client->stop(); client->stop();
} }
@@ -57,28 +65,30 @@ void MqttClient::close()
} }
} }
void MqttClient::connect(std::string broker, uint16_t port) void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{ {
debug("cnx: closing"); debug("cnx: closing");
close(); close();
debug("cnx: closed");
if (client) delete client; if (client) delete client;
client = new WiFiClient; client = new WiFiClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); debug("cnx: connecting");
message.create(MqttMessage::Type::Connect); MqttMessage msg(MqttMessage::Type::Connect);
message.add("MQTT",4); msg.add("MQTT",4);
message.add(0x4); // Mqtt protocol version 3.1.1 msg.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name msg.add(0x0); // Connect flags TODO user / name
keep_alive = 1; keep_alive = ka;
message.add(0x00); // keep_alive msg.add(0x00); // keep_alive
message.add((char)keep_alive); msg.add((char)keep_alive);
message.add(clientId); msg.add(clientId);
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
message.sendTo(this); msg.sendTo(this);
msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent); debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0); clientAlive(0);
} }
} }
@@ -116,10 +126,10 @@ void MqttBroker::loop()
// for(auto it=clients.begin(); it!=clients.end(); it++) // for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop // use index because size can change during the loop
for(int i=0; i<clients.size(); i++) for(size_t i=0; i<clients.size(); i++)
{ {
auto client = clients[i]; auto client = clients[i];
if(client->connected()) if (client->connected())
{ {
client->loop(); client->loop();
} }
@@ -133,35 +143,45 @@ void MqttBroker::loop()
} }
} }
void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{ {
MqttError retval = MqttOk;
debug("publish "); debug("publish ");
int i=0; int i=0;
for(auto client: clients) for(auto client: clients)
{ {
i++; i++;
#if TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected(); " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6 if (broker && broker->connected()) // Broker is connected
{ {
// ext broker -> clients or // ext broker -> clients or
// or clients -> ext broker // or clients -> ext broker
if (source == broker) // broker -> clients if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // clients -> broker
broker->publish(topic, msg); {
MqttError ret = broker->publish(topic, msg);
if (ret != MqttOk) retval = ret;
}
} }
else // Disconnected: R7 else // Disconnected: R7
{ {
// All is allowed // All is allowed
doit = true; doit = true;
} }
#if TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
#endif
if (doit) client->publish(topic, msg); if (doit) retval = client->publish(topic, msg);
debug(""); debug("");
} }
return retval;
} }
bool MqttBroker::compareString( bool MqttBroker::compareString(
@@ -174,10 +194,10 @@ bool MqttBroker::compareString(
return *good==0; return *good==0;
} }
void MqttMessage::getString(char* &buffer, uint16_t& len) void MqttMessage::getString(const char* &buff, uint16_t& len)
{ {
len = (buffer[0]<<8)|(buffer[1]); len = (buff[0]<<8)|(buff[1]);
buffer+=2; buff+=2;
} }
void MqttClient::clientAlive(uint32_t more_seconds) void MqttClient::clientAlive(uint32_t more_seconds)
@@ -198,9 +218,11 @@ void MqttClient::loop()
{ {
debug("timeout client"); debug("timeout client");
close(); close();
debug("closed");
} }
else else if (client && client->connected())
{ {
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2); client->write((uint8_t*)(&pingreq), 2);
clientAlive(0); clientAlive(0);
@@ -220,13 +242,65 @@ void MqttClient::loop()
} }
} }
void MqttClient::resubscribe()
{
// TODO resubscription limited to 256 bytes
if (subscriptions.size())
{
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
for(auto topic: subscriptions)
{
msg.add(topic);
msg.add(0); // TODO qos
}
msg.sendTo(this); // TODO return value
}
}
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
debug("subsribe(" << topic.c_str() << ")");
MqttError ret = MqttOk;
subscriptions.insert(topic);
if (parent==nullptr) // remote broker ?
{
debug("remote subscribe");
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic);
msg.add(qos);
ret = msg.sendTo(this);
// TODO we should wait (state machine) for SUBACK
}
return ret;
}
long MqttClient::counter=0;
void MqttClient::processMessage() void MqttClient::processMessage()
{ {
std::string error; counter++;
std::string s; #if TINY_MQTT_DEBUG
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// message.hexdump("Incoming");
}
#endif
auto header = message.getVHeader(); auto header = message.getVHeader();
char* payload; const char* payload;
uint16_t len; uint16_t len;
bool bclose=true; bool bclose=true;
@@ -254,20 +328,6 @@ void MqttClient::processMessage()
// ClientId // ClientId
message.getString(payload, len); message.getString(payload, len);
debug("client id len=" << len);
if (len>30)
{
Serial << '(';
for(int i=0; i<30; i++)
{
if (i%5==0) Serial << ' ';
char c=*(header+i);
Serial << (c < 32 ? '.' : c);
}
Serial << " )" << endl;
debug("Bad client id length");
break;
}
clientId = std::string(payload, len); clientId = std::string(payload, len);
payload += len; payload += len;
@@ -298,11 +358,30 @@ void MqttClient::processMessage()
Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl; Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl;
bclose = false; bclose = false;
mqtt_connected=true; mqtt_connected=true;
// Reuse received msg {
message.create(MqttMessage::Type::Connack); MqttMessage msg(MqttMessage::Type::ConnAck);
message.add(0); // Session present (not implemented) msg.add(0); // Session present (not implemented)
message.add(0); // Connection accepted msg.add(0); // Connection accepted
message.sendTo(this); msg.sendTo(this);
}
break;
case MqttMessage::Type::ConnAck:
mqtt_connected = true;
bclose = false;
resubscribe();
break;
case MqttMessage::Type::SubAck:
case MqttMessage::Type::PubAck:
if (!mqtt_connected) break;
// Ignore acks
bclose = false;
break;
case MqttMessage::Type::PingResp:
// TODO: no PingResp is suspicious (server dead)
bclose = false;
break; break;
case MqttMessage::Type::PingReq: case MqttMessage::Type::PingReq:
@@ -320,14 +399,35 @@ void MqttClient::processMessage()
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
case MqttMessage::Type::UnSubscribe:
{
if (!mqtt_connected) break; if (!mqtt_connected) break;
payload = header+2; payload = header+2;
message.getString(payload, len); // Topic
outstring("Subscribes", payload, len);
subscribe(Topic(payload, len)); debug("subscribe loop");
while(payload < message.end())
{
message.getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len));
Topic topic(payload, len);
if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe)
subscriptions.insert(topic);
else
{
auto it=subscriptions.find(topic);
if (it != subscriptions.end())
subscriptions.erase(it);
}
payload += len;
uint8_t qos = *payload++;
debug(" qos=" << qos);
}
debug("end loop");
bclose = false; bclose = false;
// TODO SUBACK // TODO SUBACK
}
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
@@ -344,16 +444,28 @@ void MqttClient::processMessage()
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
if (parent)
{
debug("publishing to parent"); debug("publishing to parent");
parent->publish(this, published, message); parent->publish(this, published, message);
// TODO should send PUBACK }
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
message.create(MqttMessage::Type::PubAck);
// TODO re-add packet identifier if any
message.sendTo(this);
bclose = false; bclose = false;
} }
break; break;
case MqttMessage::Type::PubAck: case MqttMessage::Type::Disconnect:
// TODO should discard any will message
if (!mqtt_connected) break; if (!mqtt_connected) break;
bclose = false; mqtt_connected = false;
close(false);
bclose=false;
break; break;
default: default:
@@ -363,13 +475,13 @@ void MqttClient::processMessage()
if (bclose) if (bclose)
{ {
Serial << "*************** Error msg 0x" << _HEX(message.type()); Serial << "*************** Error msg 0x" << _HEX(message.type());
if (error.length()) Serial << ':' << error.c_str(); message.hexdump("-------ERROR ------");
Serial << endl; Serial << endl;
close(); close();
} }
else else
{ {
clientAlive(5); clientAlive(parent ? 5 : 0);
} }
message.reset(); message.reset();
} }
@@ -382,53 +494,53 @@ bool Topic::matches(const Topic& topic) const
} }
// publish from local client // publish from local client
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{ {
message.create(MqttMessage::Publish); MqttMessage msg(MqttMessage::Publish);
message.add(topic); msg.add(topic);
message.add(payload, pay_length); msg.add(payload, pay_length, false);
if (parent) if (parent)
parent->publish(this, topic, message); return parent->publish(this, topic, msg);
else if (client) else if (client)
publish(topic, message); return msg.sendTo(this);
else else
Serial << " Should not happen" << endl; return MqttNowhereToSend;
} }
// republish a received publish if it matches any in subscriptions // republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg) MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
{ {
MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ';
if (subscription.matches(topic)) if (subscription.matches(topic))
{ {
Serial << " match/send"; debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ');
if (client) if (client)
{ {
msg.sendTo(this); retval = msg.sendTo(this);
} }
else if (callback) else if (callback)
{ {
callback(this, topic, nullptr, 0); // TODO callback(this, topic, nullptr, 0); // TODO Payload
} }
} }
Serial << endl;
} }
return retval;
} }
void MqttMessage::reset() void MqttMessage::reset()
{ {
curr=buffer; buffer.clear();
*curr=0; // Type Unknown
state=FixedHeader; state=FixedHeader;
size=0; size=0;
} }
void MqttMessage::incoming(char in_byte) void MqttMessage::incoming(char in_byte)
{ {
*curr++ = in_byte; buffer += in_byte;
switch(state) switch(state)
{ {
case FixedHeader: case FixedHeader:
@@ -443,12 +555,19 @@ void MqttMessage::incoming(char in_byte)
} }
else if ((in_byte & 0x80) == 0) else if ((in_byte & 0x80) == 0)
{ {
vheader = curr; vheader = buffer.length();
if (size==0) if (size==0)
state = Complete; state = Complete;
else if (size > 500) // TODO magic
{
state = Error;
}
else else
{
buffer.reserve(size);
state = VariableHeader; state = VariableHeader;
} }
}
break; break;
case VariableHeader: case VariableHeader:
case PayLoad: case PayLoad:
@@ -464,22 +583,25 @@ void MqttMessage::incoming(char in_byte)
break; break;
case Complete: case Complete:
default: default:
curr--;
Serial << "Spurious " << _HEX(in_byte) << endl; Serial << "Spurious " << _HEX(in_byte) << endl;
state = Error; hexdump("spurious");
reset();
break; break;
} }
if (curr-buffer > 250) if (buffer.length() > MaxBufferLength) // TODO magic 256 ?
{ {
debug("Spurious byte " << _HEX(in_byte)); debug("Too long " << state);
curr=buffer; reset();
} }
} }
void MqttMessage::add(const char* p, size_t len) void MqttMessage::add(const char* p, size_t len, bool addLength)
{ {
if (addLength)
{
incoming(len>>8); incoming(len>>8);
incoming(len & 0xFF); incoming(len & 0xFF);
}
while(len--) incoming(*p++); while(len--) incoming(*p++);
} }
@@ -494,31 +616,58 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
void MqttMessage::sendTo(MqttClient* client) MqttError MqttMessage::sendTo(MqttClient* client)
{ {
if (curr-buffer-2 >= 0) if (buffer.size())
{ {
encodeLength(buffer+1, curr-buffer-2); debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2);
// hexdump("snd"); // hexdump("snd");
client->write(buffer, curr-buffer); client->write(&buffer[0], buffer.size());
} }
else else
{ {
Serial << "??? Invalid send" << endl; debug("??? Invalid send");
Serial << (long)end() << "-" << (long)buffer << endl; return MqttInvalidMessage;
} }
return MqttOk;
} }
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const
{ {
if (prefix) Serial << prefix << ' '; uint16_t addr=0;
Serial << (long)buffer << "-" << (long)curr << " : "; const int bytes_per_row = 8;
const char* p=buffer; const char* hex_to_str = " | ";
while(p!=curr) const char* separator = hex_to_str;
const char* half_sep = " - ";
std::string ascii;
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
for(const char chr: buffer)
{ {
if (*p<16) Serial << '0'; if ((addr % bytes_per_row) == 0)
Serial << _HEX(*p) << ' '; {
p++; if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
if (prefix) Serial << prefix << separator;
ascii.clear();
} }
addr++;
if (chr<16) Serial << '0';
Serial << _HEX(chr) << ' ';
ascii += (chr<32 ? '.' : chr);
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
}
if (ascii.length())
{
while(ascii.length() < bytes_per_row+strlen(half_sep))
{
Serial << " "; // spaces per hexa byte
ascii += ' ';
}
Serial << hex_to_str << ascii << separator;
}
Serial << endl; Serial << endl;
} }

View File

@@ -3,8 +3,22 @@
#include <set> #include <set>
#include <string> #include <string>
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h>
#define MaxBufferLength 255 #if 0
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#define TINY_MQTT_DEBUG 1
#else
#define TINY_MQTT_DEBUG 0
#define debug(what) {}
#endif
enum MqttError
{
MqttOk = 0,
MqttNowhereToSend=1,
MqttInvalidMessage=2,
};
class Topic : public IndexedString class Topic : public IndexedString
{ {
@@ -21,17 +35,21 @@ class Topic : public IndexedString
class MqttClient; class MqttClient;
class MqttMessage class MqttMessage
{ {
const uint16_t MaxBufferLength = 255;
public: public:
enum Type enum Type
{ {
Unknown = 0, Unknown = 0,
Connect = 0x10, Connect = 0x10,
Connack = 0x20, ConnAck = 0x20,
Publish = 0x30, Publish = 0x30,
PubAck = 0x40, PubAck = 0x40,
Subscribe = 0x80, Subscribe = 0x80,
SubAck = 0x90,
UnSubscribe = 0xA0,
PingReq = 0xC0, PingReq = 0xC0,
PingResp = 0xD0, PingResp = 0xD0,
Disconnect = 0xE0
}; };
enum State enum State
{ {
@@ -45,21 +63,21 @@ class MqttMessage
}; };
MqttMessage() { reset(); } MqttMessage() { reset(); }
MqttMessage(Type t) { create(t); } MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len); void add(const char* p, size_t len, bool addLength=true );
void add(const std::string& s) { add(s.c_str(), s.length()); } void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); } void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; } const char* end() const { return &buffer[0]+buffer.size(); }
char* end() const { return curr; } const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return curr-buffer; } uint16_t length() const { return buffer.size(); }
void reset(); void reset();
// buff is MSB/LSB/STRING // buff is MSB/LSB/STRING
// output buff+=2, len=length(str) // output buff+=2, len=length(str)
void getString(char* &buff, uint16_t& len); static void getString(const char* &buff, uint16_t& len);
Type type() const Type type() const
@@ -69,21 +87,20 @@ class MqttMessage
void create(Type type) void create(Type type)
{ {
buffer[0]=type; buffer=(char)type;
curr=buffer+2; buffer+='\0';
vheader=curr; vheader=2;
size=0; size=0;
state=Create; state=Create;
} }
void sendTo(MqttClient*); MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
void encodeLength(char* msb, int length); void encodeLength(char* msb, int length);
char buffer[256]; // TODO why 256 ? (should be replaced by a std::string) std::string buffer;
char* vheader; uint8_t vheader;
char* curr;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
State state; State state;
}; };
@@ -104,13 +121,17 @@ class MqttClient
}; };
public: public:
MqttClient(MqttBroker*); MqttClient(MqttBroker*);
MqttClient(MqttBroker* brk, const std::string& id) : MqttClient(brk) { clientId=id; }
MqttClient() : MqttClient(nullptr) {};
~MqttClient(); ~MqttClient();
void connect(MqttBroker* parent); void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port); void connect(std::string broker, uint16_t port, uint16_t ka=10);
bool connected() { return client==nullptr || client->connected(); } bool connected() { return
(parent!=nullptr and client==nullptr) or
(client and client->connected()); }
void write(const char* buf, size_t length) void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); } { if (client) client->write(buf, length); }
@@ -118,16 +139,17 @@ class MqttClient
void id(std::string& new_id) { clientId = new_id; } void id(std::string& new_id) { clientId = new_id; }
void loop(); void loop();
void close(); void close(bool bSendDisconnect=true);
void setCallback(CallBack fun) {callback=fun; }; void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world // Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length); MqttError publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); }
void publish(const Topic& t) { publish(t, nullptr, 0);}; MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); } MqttError subscribe(Topic topic, uint8_t qos=0);
void unsubscribe(Topic& topic); MqttError unsubscribe(Topic& topic);
// connected to local broker // connected to local broker
// TODO seems to be useless // TODO seems to be useless
@@ -135,24 +157,33 @@ class MqttClient
void dump() void dump()
{ {
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); << " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << " ["; Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << (client && client->connected() ? "" : "dis") << "connected";
message.hexdump("entrant msg");
bool c=false; bool c=false;
Serial << " [";
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
c=true; c=true;
} }
Serial << "]" << endl; Serial << "]" << endl;
} }
static long counter; // Number of messages sent
private: private:
void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg); MqttError publish(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds); void clientAlive(uint32_t more_seconds);
void processMessage(); void processMessage();
@@ -162,33 +193,18 @@ class MqttClient
uint32_t keep_alive; uint32_t keep_alive;
uint32_t alive; uint32_t alive;
MqttMessage message; MqttMessage message;
// TODO having a pointer on MqttBroker may produce larger binaries
// due to unecessary function linked if ever parent is not used
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
}; };
/***********************************************
* R1 - accept external cnx
* R2 - allows all clients pusblish to go outside
* R3 - allows ext publish to all clients
* R4 - allows local publish to local clients
* R5 - tries to connect elsewhere (*)
* R6 - disconnect external clients
* R7 - allows all publish to go everywhere
* ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* ================================================+
* | connected | not connected |
* -------------+---------------+------------------+
* proxy broker | R2 R3 R5 R6 | R5 R7 |
* normal broker| R2 R3 R5 R6 | R1 R5 R7 |
* -------------+---------------+------------------+
*
*/
class MqttBroker class MqttBroker
{ {
enum State enum State
@@ -200,6 +216,7 @@ class MqttBroker
public: public:
// TODO limit max number of clients // TODO limit max number of clients
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
~MqttBroker();
void begin() { server.begin(); } void begin() { server.begin(); }
void loop(); void loop();
@@ -209,9 +226,11 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883); void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
size_t clientsCount() const { return clients.size(); }
void dump() void dump()
{ {
Serial << "broker: " << clients.size() << " client/s" << endl; Serial << clients.size() << " client/s" << endl;
for(auto client: clients) for(auto client: clients)
{ {
Serial << " "; Serial << " ";
@@ -229,7 +248,7 @@ class MqttBroker
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
// For clients that are added not by the broker itself // For clients that are added not by the broker itself
void addClient(MqttClient* client); void addClient(MqttClient* client);

View File

@@ -0,0 +1,138 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt local unit tests.
*
* No wifi connection unit tests.
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
}
test(local_client_should_unregister_when_destroyed)
{
assertEqual(broker.clientsCount(), (size_t)0);
{
MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1);
}
assertEqual(broker.clientsCount(), (size_t)0);
}
test(local_connect)
{
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient client(&broker);
assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1);
}
test(local_publish_should_be_dispatched)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber(&broker);
subscriber.subscribe("a/b");
subscriber.subscribe("a/c");
subscriber.setCallback(onPublish);
MqttClient publisher(&broker);
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something
assertTrue(published[""]["a/b"] == 1);
assertTrue(published[""]["a/c"] == 2);
}
test(local_publish_should_be_dispatched_to_local_clients)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber_a(&broker, "A");
subscriber_a.setCallback(onPublish);
subscriber_a.subscribe("a/b");
subscriber_a.subscribe("a/c");
MqttClient subscriber_b(&broker, "B");
subscriber_b.setCallback(onPublish);
subscriber_b.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertTrue(published["A"]["a/b"] == 1);
assertTrue(published["A"]["a/c"] == 1);
assertTrue(published["B"]["a/b"] == 1);
assertTrue(published["B"]["a/c"] == 0);
}
test(local_unsubscribe)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b");
// subscriber.unsubscribe("a/b"); TODO not yet implemented
publisher.publish("a/b");
assertTrue(published[""]["a/b"] == 1); // Only one publish has been received
}
test(local_nocallback_when_destroyed)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
{
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
}
MqttClient publisher(&broker);
publisher.publish("a/b");
assertEqual(published.size(), (size_t)0); // Only one publish has been received
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ NO WIFI CONNECTION TinyMqtt TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}