Compare commits

...

82 Commits
0.5.0 ... 0.7.3

Author SHA1 Message Date
hsaturn
5834a278c7 Release 0.7.3 2021-04-04 06:36:47 +02:00
hsaturn
146d0de1d4 MqttClient: bug fix, connection lost at each publish received 2021-04-04 06:35:50 +02:00
hsaturn
297a22efb5 Big rewrite of MqttClient in order to avoid code duplicate 2021-04-04 05:57:48 +02:00
hsaturn
510ff514a9 [tests] Changed assertions 2021-04-04 01:07:12 +02:00
hsaturn
ad6f7155e5 Added test for StringIndexer 2021-04-03 21:11:54 +02:00
hsaturn
3ed5874373 Fix compilation 2021-04-02 19:57:04 +02:00
hsaturn
e1a936e081 Merge branch 'main' of github.com:hsaturn/TinyMqtt into main 2021-04-02 19:23:53 +02:00
hsaturn
0757a95fbf Keywords updated, code clean 2021-04-02 19:22:59 +02:00
hsaturn
7c8d71262f Fix test (not yet finished) 2021-04-02 18:59:07 +02:00
hsaturn
138ce973f2 Typos in libraries 2021-04-02 18:48:37 +02:00
hsaturn
bf499117b7 Merge branch 'main' of github.com:hsaturn/TinyMqtt into main 2021-03-31 19:12:28 +02:00
hsaturn
4ed6f72602 Merge branch 'test' into main 2021-03-31 19:11:24 +02:00
hsaturn
87a78c549f Fix crash at end of unit tests 2021-03-31 19:09:43 +02:00
hsaturn
5211360b91 Local tests added 2021-03-31 19:09:05 +02:00
hsaturn
549a23ffb7 Fix delete was not really deleting in tinytest 2021-03-31 10:40:06 +02:00
hsaturn
3a1af655d7 Fix compilation problem 2021-03-31 00:33:15 +02:00
hsaturn
e71ffefc5a Fixed a useless test and modified MqttClient constructors 2021-03-31 00:22:31 +02:00
hsaturn
b6a0dde2b1 json 2021-03-30 08:52:46 +02:00
hsaturn
babc391632 Added json 2021-03-30 08:52:10 +02:00
hsaturn
27bdbb9a0b str 2021-03-30 08:41:30 +02:00
hsaturn
6a9e158428 results 2021-03-30 08:39:02 +02:00
hsaturn
6fc6794dc3 result0.yaml 2021-03-30 08:37:25 +02:00
hsaturn
1eaa514579 result.yaml 2021-03-30 08:35:33 +02:00
hsaturn
7af4c2ca69 Added .o to gitignore 2021-03-30 08:21:56 +02:00
hsaturn
a340558460 Fix tests 2021-03-30 08:21:33 +02:00
hsaturn
9a7db237d3 Renamed local to nowifi as local will be used for local (127.0.0.1) tests 2021-03-30 08:15:13 +02:00
hsaturn
91e083e7b0 Merge branch 'linter' into main 2021-03-29 23:58:00 +02:00
hsaturn
97adc985e6 Code clean 2021-03-29 23:56:36 +02:00
hsaturn
6fcfc9dfc0 ptr 2021-03-29 23:20:49 +02:00
hsaturn
a6596ffc89 Fix ptr 2021-03-29 23:06:40 +02:00
hsaturn
533ab0c70d Try to mock Esp 2021-03-29 21:47:14 +02:00
hsaturn
d5dd896b45 MqttClient::unsubscribe implemented 2021-03-29 20:48:45 +02:00
hsaturn
bd7fa8f39c Update readme.md 2021-03-29 20:47:14 +02:00
hsaturn
6395e931ce refix EspWifi 2021-03-29 20:47:14 +02:00
hsaturn
635fee6f7c ESP8266WiFi lib added for aunit 2021-03-29 20:47:14 +02:00
hsaturn
dc2420d88e Fix makefile 2021-03-29 20:47:14 +02:00
hsaturn
2fbc46cbe2 Revert auint 2021-03-29 20:47:14 +02:00
hsaturn
a003156ae1 Fix aunit 2021-03-29 20:47:14 +02:00
hsaturn
913e1aa7ae Fixed make target 2021-03-29 20:46:59 +02:00
hsaturn
8272515bd7 Fixed make target 2021-03-29 20:46:46 +02:00
hsaturn
9a7f6a3020 Fixed make target 2021-03-29 20:46:46 +02:00
hsaturn
fead702d9f Added makefile for aunit 2021-03-29 20:46:46 +02:00
hsaturn
eaf938f2fd Test aunit 2021-03-29 20:46:46 +02:00
hsaturn
8eefa63f45 Lint fixes 2021-03-29 20:46:46 +02:00
hsaturn
9d48c436d8 test 2021-03-29 20:46:46 +02:00
hsaturn
792a28e831 deleted 2021-03-29 20:45:33 +02:00
hsaturn
9407193454 MqttClient::unsubscribe implemented 2021-03-29 20:39:10 +02:00
hsaturn
602050f309 Update readme.md 2021-03-29 11:45:51 +02:00
hsaturn
1a70c90af2 refix EspWifi 2021-03-29 00:01:32 +02:00
hsaturn
ed4091c53e ESP8266WiFi lib added for aunit 2021-03-28 23:57:06 +02:00
hsaturn
f2a805f724 Fix makefile 2021-03-28 23:50:50 +02:00
hsaturn
3083bcf071 Revert auint 2021-03-28 23:44:11 +02:00
hsaturn
d01f46dbc1 Fix aunit 2021-03-28 23:42:57 +02:00
hsaturn
39b2257619 Merge branch 'linter' of github.com:hsaturn/TinyMqtt into linter 2021-03-28 23:39:50 +02:00
hsaturn
60d385189b Fixed make target 2021-03-28 23:39:37 +02:00
hsaturn
82c5b971e9 Release 0.7.0 2021-03-28 23:34:54 +02:00
hsaturn
3f2c1c57e1 Fixed make target 2021-03-28 23:30:19 +02:00
hsaturn
e550197d0a Fixed make target 2021-03-28 23:28:48 +02:00
hsaturn
253bc9b3f5 Added makefile for aunit 2021-03-28 23:26:53 +02:00
hsaturn
96d8018960 Test aunit 2021-03-28 23:16:07 +02:00
hsaturn
505cacc2df Lint fixes 2021-03-28 23:09:10 +02:00
hsaturn
62848056a2 test 2021-03-28 23:01:14 +02:00
hsaturn
01998e74ec Test linter 2021-03-28 22:59:42 +02:00
hsaturn
5f46fd304c Release 0.7.0 2021-03-28 22:47:13 +02:00
hsaturn
213d637eaf Code cleaning 2021-03-28 21:31:10 +02:00
hsaturn
3bb2dd5a81 Code cleaning 2021-03-28 21:29:02 +02:00
hsaturn
7d9ab6381d Disconnect added (finally) 2021-03-28 21:28:06 +02:00
hsaturn
470cde62da Version 0.6.0 2021-03-27 10:26:41 +01:00
hsaturn
3fb9b6317d README.md modified (typo) 2021-03-27 02:00:49 +01:00
hsaturn
ee9ad93bfd README.md modified 2021-03-27 02:00:00 +01:00
hsaturn
0b1a932244 MqttClient resubscribe on reconnect 2021-03-27 01:55:03 +01:00
hsaturn
972759237c Speed and stability improved 2021-03-27 01:40:49 +01:00
hsaturn
cb00d7f82a MqttClient / UnSubscribe message implemented 2021-03-27 01:39:37 +01:00
hsaturn
0b735d22a5 Less debug as the code is more stable now 2021-03-27 01:38:32 +01:00
hsaturn
9178aac02c MqttClient client length augmented to 60 (was not passing MqttBox tests 2021-03-26 01:59:08 +01:00
hsaturn
c706fbcff2 Update readme 2021-03-26 01:52:31 +01:00
hsaturn
a0c41a0ccb Smart ip feature for connect 2021-03-26 01:51:30 +01:00
hsaturn
b780dcf99c test fix, was unable to use set when replacements occurs 2021-03-26 01:10:33 +01:00
hsaturn
f122d5e902 relase 0.5.1 2021-03-25 01:26:27 +01:00
hsaturn
d63793cf77 Avoid to use message member, minor changes 2021-03-25 01:26:03 +01:00
hsaturn
8386779e92 tinytest great enhancements 2021-03-25 01:24:46 +01:00
hsaturn
1b988a06a2 Relase 0.5.0 2021-03-24 21:21:46 +01:00
21 changed files with 1067 additions and 189 deletions

28
.github/workflows/aunit.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
# See https://docs.github.com/en/actions/guides for documentation about GitHub
# Actions.
name: AUnit Tests
# Run on all branches.
on: [push]
jobs:
build:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- name: Setup
run: |
cd ..
git clone https://github.com/bxparks/EpoxyDuino
git clone https://github.com/bxparks/AceRoutine
git clone https://github.com/bxparks/AUnit
git clone https://github.com/bxparks/AceCommon
git clone https://github.com/hsaturn/EspMock
- name: Verify tests
run: |
make -C tests
make -C tests runtests

26
.github/workflows/superlinter.yml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Super-Linter
# Run this workflow every time a new commit pushed to your repository
#
on: push
jobs:
# Set the job key. The key is displayed as the job name
# when a job name is not provided
super-lint:
# Name the Job
name: Lint code base
# Set the type of machine to run on
runs-on: ubuntu-latest
steps:
# Checks out a copy of your repository on the ubuntu-latest machine
- name: Checkout code
uses: actions/checkout@v2
# Runs the Super-Linter action
- name: Run Super-Linter
uses: github/super-linter@v3
env:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

3
.gitignore vendored
View File

@@ -1,2 +1,5 @@
*~ *~
src/my_credentials.h src/my_credentials.h
*.o
*.swp
*.out

View File

@@ -1,15 +1,18 @@
# TinyMqtt # TinyMqtt
![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt) ![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![](https://img.shields.io/github/issues/hsaturn/TinyMqtt) ![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/paltform-ESP8266-green) ![Esp8266](https://img.shields.io/badge/platform-ESP8266-green)
![](https://img.shields.io/github/license/hsaturn/TinyMqtt) ![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow) ![Mqtt 3.1.1](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small and very capable Mqtt Broker and Client ESP 8266 is a small, fast and capable Mqtt Broker and Client
## Features ## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client - Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported - Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode) - Standalone (can work without WiFi) (degraded/local mode)
@@ -23,13 +26,16 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
* Implement zeroconf mode (needs async) * Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be * Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)
* Why not a 'global' TinyMqtt::loop() instead of having to call loop for all broker/clients instances
* Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~ * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* ~~MqttClient does not support more than one subscription at time~~ * ~~MqttClient does not support more than one subscription at time~~
* MqttClient auto re-subscribe * ~~MqttClient auto re-subscribe (::resubscribe works bad on broker.emqx.io)~~
* MqttClient auto reconnection * MqttClient auto reconnection
* MqttClient does not callback payload... * ~~MqttClient unsubscribe~~
* MqttClient does not sent payload to callback...
* MqttClient user/password * MqttClient user/password
* Wildcards (I may implement only # as I'm not interrested by a clever and cpu consuming matching)
## Quickstart ## Quickstart

View File

@@ -1,6 +1,15 @@
// vim: ts=30 // vim: ts=40
Exemple of commands that can be sent via the serial monitor to tinymqtt-test Exemple of commands that can be sent via the serial monitor to tinymqtt-test
---------------------------------------------------------------------------- ----------------------------------------------------------------------------
Commands can usually be abbreviated to their first letters.
ex: cl for client, a / a.con / a.sub / a.p for publish.
--------
set name value set variable name to value (later replaced)
set name if no value, then var is erased
set view all vars
reserved keywords are forbidden
client a starts a client (not connected no internal broker) client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883 a.connect [server][port][alive] connects the client, default port=1883
@@ -8,4 +17,23 @@ a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic a.subscribe topic subscribes to a topic
delete a destroy the client delete a destroy the client
* note, if 'server' is a number, then it replaces the end of the local ip.
i.e. if local ip is 192.168.1.10, connect 2.35 becomes 192.168.2.35
--------
example:
--------
client c
c.connect broker.emqx.io
set topic sensor/temperature
c.subscribe topic
c.publish topic 15
c.publish topic 20
macro exansion example
----------------------
set temp publish sensor/temperature
c.temp 20 -> c.publish sensor/temperature 20

View File

@@ -1,6 +1,7 @@
#define TINY_MQTT_DEBUG #define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h> #include <MqttStreaming.h>
#include <sstream>
#include <map> #include <map>
/** /**
@@ -15,11 +16,6 @@
* cons - Takes more memory * cons - Takes more memory
* - a bit hard to understand * - a bit hard to understand
* *
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/ */
#include <my_credentials.h> #include <my_credentials.h>
@@ -27,18 +23,27 @@
std::string topic="sensor/temperature"; std::string topic="sensor/temperature";
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; } {
Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str();
if (payload) Serial << ", payload[" << length << "]=[";
while(length--)
{
const char c=*payload++;
if (c!=10 and c!=13 and c <32) Serial << '?';
Serial << *payload++;
}
Serial<< endl;
}
std::map<std::string, MqttClient*> clients; std::map<std::string, MqttClient*> clients;
std::map<std::string, MqttBroker*> brokers; std::map<std::string, MqttBroker*> brokers;
void setup() void setup()
{ {
WiFi.persistent(false); // https://github.com/esp8266/Arduino/issues/1054
Serial.begin(115200); Serial.begin(115200);
delay(500); delay(500);
Serial << endl << endl << endl Serial << endl << endl << endl
<< "Demo started. Type help for more..." << endl
<< "Connecting to '" << ssid << "' "; << "Connecting to '" << ssid << "' ";
WiFi.mode(WIFI_STA); WiFi.mode(WIFI_STA);
@@ -48,6 +53,7 @@ void setup()
{ Serial << '-'; delay(500); } { Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
Serial << "Type help for more..." << endl;
MqttBroker* broker = new MqttBroker(1883); MqttBroker* broker = new MqttBroker(1883);
broker->begin(); broker->begin();
@@ -78,6 +84,110 @@ std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' '
return sword; return sword;
} }
bool isaddr(std::string s)
{
if (s.length()==0 or s.length()>3) return false;
for(char c: s)
if (c<'0' or c>'9') return false;
return true;
}
std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string addr=getword(str, if_empty, sep);
std::string ip=addr;
std::vector<std::string> build;
while(ip.length())
{
std::string b=getword(ip,nullptr,'.');
if (isaddr(b) && build.size()<4)
{
build.push_back(b);
}
else
return addr;
}
IPAddress local=WiFi.localIP();
addr="";
while(build.size()!=4)
{
std::stringstream b;
b << (int)local[3-build.size()];
build.insert(build.begin(), b.str());
}
for(std::string s: build)
{
if (addr.length()) addr += '.';
addr += s;
}
return addr;
}
std::map<std::string, std::string> vars;
std::set<std::string> commands = {
"auto", "broker", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "view"
};
void getCommand(std::string& search)
{
while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return;
std::string matches;
int count=0;
for(std::string cmd: commands)
{
if (cmd.substr(0, search.length()) == search)
{
if (count) matches +=", ";
count++;
matches += cmd;
}
}
if (count==1)
search = matches;
else if (count>1)
{
Serial << "Ambiguous command: " << matches << endl;
search="";
}
}
void replace(const char* d, std::string& str, std::string srch, std::string to)
{
if (d[0] && d[1])
{
srch=d[0]+srch+d[1];
to=d[0]+to+d[1];
size_t pos = 0;
while((pos=str.find(srch, pos)) != std::string::npos)
{
str.erase(pos, srch.length());
str.insert(pos, to);
pos += to.length();
}
}
}
void replaceVars(std::string& cmd)
{
cmd = ' '+cmd+' ';
for(auto it: vars)
{
replace("..", cmd, it.first, it.second);
replace(". ", cmd, it.first, it.second);
replace(" .", cmd, it.first, it.second);
replace(" ", cmd, it.first, it.second);
}
cmd.erase(0, cmd.find_first_not_of(" "));
cmd.erase(cmd.find_last_not_of(" ")+1);
}
// publish at regular interval // publish at regular interval
class automatic class automatic
{ {
@@ -180,6 +290,7 @@ class automatic
{ {
Serial << " auto [$id] on/off" << endl; Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl; Serial << " auto [$id] view" << endl;
Serial << " auto [$id] interval [s]" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl; Serial << " auto [$id] create [millis] [topic]" << endl;
} }
@@ -204,6 +315,12 @@ using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void loop() void loop()
{ {
static long count;
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers) for(auto it: brokers)
it.second->loop(); it.second->loop();
@@ -219,12 +336,15 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl; Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd; static std::string last_cmd;
if (cmd=="!") if (cmd=="!")
cmd=last_cmd; cmd=last_cmd;
else else
last_cmd=cmd; last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
while(cmd.length()) while(cmd.length())
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk;
@@ -235,27 +355,34 @@ void loop()
// client.function notation // client.function notation
// ("a.fun " becomes "fun a ") // ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos) if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{ {
s=getword(cmd, nullptr, '.'); s=getword(cmd, nullptr, '.');
if (clients.find(s) != clients.end()) if (s.length())
{ {
client = clients[s]; if (clients.find(s) != clients.end())
} {
else if (brokers.find(s) != brokers.end()) client = clients[s];
{ }
broker = brokers[s]; else if (brokers.find(s) != brokers.end())
} {
else broker = brokers[s];
{ }
Serial << "Unknown class (" << s.c_str() << ")" << endl; else
cmd=""; {
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
} }
} }
s = getword(cmd); s = getword(cmd);
if (compare(s, "delete")) if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{ {
if (client==nullptr && broker==nullptr) if (client==nullptr && broker==nullptr)
{ {
@@ -273,11 +400,11 @@ void loop()
} }
if (client) if (client)
{ {
clients.erase(s);
for (auto it: clients) for (auto it: clients)
{ {
if (it.second != client) continue; if (it.second != client) continue;
Serial << "deleted" << endl; Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first); clients.erase(it.first);
break; break;
} }
@@ -287,9 +414,9 @@ void loop()
{ {
for(auto it: brokers) for(auto it: brokers)
{ {
Serial << (int32_t)it.second << '/' << (int32_t)broker << endl;
if (broker != it.second) continue; if (broker != it.second) continue;
Serial << "deleted" << endl; Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first); brokers.erase(it.first);
break; break;
} }
@@ -313,7 +440,7 @@ void loop()
{ {
if (compare(s,"connect")) if (compare(s,"connect"))
{ {
client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl; Serial << (client->connected() ? "connected." : "not connected") << endl;
} }
else if (compare(s,"publish")) else if (compare(s,"publish"))
@@ -384,6 +511,32 @@ void loop()
Serial << "Missing or existing client name" << endl; Serial << "Missing or existing client name" << endl;
cmd+=" ls"; cmd+=" ls";
} }
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view")) else if (compare(s, "ls") or compare(s, "view"))
{ {
Serial << "--< " << clients.size() << " client/s. >--" << endl; Serial << "--< " << clients.size() << " client/s. >--" << endl;
@@ -421,6 +574,7 @@ void loop()
Serial << endl; Serial << endl;
Serial << " help" << endl; Serial << " help" << endl;
Serial << " ls / ip / reset" << endl; Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl; Serial << " ! repeat last command" << endl;
Serial << endl; Serial << endl;
Serial << " $id : name of the client." << endl; Serial << " $id : name of the client." << endl;

View File

@@ -9,13 +9,19 @@
TinyMqtt KEYWORD1 TinyMqtt KEYWORD1
MqttBroker KEYWORD1 MqttBroker KEYWORD1
connect KEYWORD2
clientsCount KEYWORD2
begin KEYWORD2 begin KEYWORD2
loop KEYWORD2 loop KEYWORD2
port KEYWORD2
MqttClient KEYWORD1 MqttClient KEYWORD1
connect KEYWORD2
connected KEYWORD2
publish KEYWORD2 publish KEYWORD2
setCallback KEYWORD2 setCallback KEYWORD2
subscribe KEYWORD2 subscribe KEYWORD2
unsubscribe KEYWORD2
Topic KEYWORD1 Topic KEYWORD1
matches KEYWORD2 matches KEYWORD2

View File

@@ -1,12 +1,12 @@
{ {
"name": "TinyMqtt", "name": "TinyMqtt",
"keywords": "ethernet, mqtt, m2m, iot", "keywords": "ethernet, mqtt, m2m, iot",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS.", "description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages. It does support MQTT 3.1.1 without QOS=0.",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.4.0", "version": "0.7.3",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,10 +1,11 @@
name=TinyMqtt name=TinyMqtt
version=0.4.0 version=0.7.3
author=Francois BIOT, HSaturn, <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages and to jhost a broker in your ESP. It does support MQTT 3.1.1 without any QOS. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP. It does support MQTT 3.1.1 without QoS=0.
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*
includes=TinyMqtt.h includes=TinyMqtt.h
depends=

View File

@@ -15,6 +15,16 @@ class StringIndexer
std::string str; std::string str;
uint8_t used=0; uint8_t used=0;
friend class StringIndexer; friend class StringIndexer;
#if EPOXY_DUINO
public:
// Workaround to avoid coredump in Indexer::release
// when destroying a Topic after the deletion of
// StringIndexer::strings map (which can occurs only with AUnit,
// never in the ESP itself, because ESP never ends)
// (I hate static vars)
~StringCounter() { used=255; }
#endif
}; };
public: public:
using index_t=uint8_t; using index_t=uint8_t;
@@ -29,7 +39,7 @@ class StringIndexer
return it->first; return it->first;
} }
} }
for(index_t index=0; index<255; index++) for(index_t index=1; index; index++)
{ {
if (strings.find(index)==strings.end()) if (strings.find(index)==strings.end())
{ {
@@ -70,6 +80,8 @@ class StringIndexer
} }
} }
static uint16_t count() { return strings.size(); }
private: private:
static std::map<index_t, StringCounter> strings; static std::map<index_t, StringCounter> strings;
}; };
@@ -88,6 +100,8 @@ class IndexedString
index=StringIndexer::strToIndex(str, len); index=StringIndexer::strToIndex(str, len);
} }
IndexedString(const std::string& str) : IndexedString(str.c_str(), str.length()) {};
~IndexedString() { StringIndexer::release(index); } ~IndexedString() { StringIndexer::release(index); }
IndexedString& operator=(const IndexedString& source) IndexedString& operator=(const IndexedString& source)
@@ -102,9 +116,14 @@ class IndexedString
return i1.index < i2.index; return i1.index < i2.index;
} }
friend bool operator==(const IndexedString& i1, const IndexedString& i2)
{
return i1.index == i2.index;
}
const std::string& str() const { return StringIndexer::str(index); } const std::string& str() const { return StringIndexer::str(index); }
const StringIndexer::index_t getIndex() const { return index; } const StringIndexer::index_t& getIndex() const { return index; }
private: private:
StringIndexer::index_t index; StringIndexer::index_t index;

View File

@@ -19,6 +19,7 @@ MqttBroker::~MqttBroker()
{ {
delete clients[0]; delete clients[0];
} }
server.close();
} }
// private constructor used by broker only // private constructor used by broker only
@@ -29,8 +30,8 @@ MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
MqttClient::MqttClient(MqttBroker* parent) MqttClient::MqttClient(MqttBroker* parent, const std::string& id)
: parent(parent) : parent(parent), clientId(id)
{ {
client = nullptr; client = nullptr;
@@ -43,12 +44,17 @@ MqttClient::~MqttClient()
delete client; delete client;
} }
void MqttClient::close() void MqttClient::close(bool bSendDisconnect)
{ {
debug("close " << id().c_str()); debug("close " << id().c_str());
mqtt_connected = false; mqtt_connected = false;
if (client) if (client)
{ {
if (bSendDisconnect and client->connected())
{
message.create(MqttMessage::Type::Disconnect);
message.sendTo(this);
}
client->stop(); client->stop();
} }
@@ -69,18 +75,18 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); debug("cnx: connecting");
message.create(MqttMessage::Type::Connect); MqttMessage msg(MqttMessage::Type::Connect);
message.add("MQTT",4); msg.add("MQTT",4);
message.add(0x4); // Mqtt protocol version 3.1.1 msg.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name msg.add(0x0); // Connect flags TODO user / name
keep_alive = ka; // TODO not configurable keep_alive = ka;
message.add(0x00); // keep_alive msg.add(0x00); // keep_alive
message.add((char)keep_alive); msg.add((char)keep_alive);
message.add(clientId); msg.add(clientId);
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
message.sendTo(this); msg.sendTo(this);
message.reset(); msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent); debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0); clientAlive(0);
@@ -92,6 +98,13 @@ void MqttBroker::addClient(MqttClient* client)
clients.push_back(client); clients.push_back(client);
} }
void MqttBroker::connect(const std::string& host, uint16_t port)
{
if (broker == nullptr) broker = new MqttClient;
broker->connect(host, port);
broker->parent = this; // Because connect removed the link
}
void MqttBroker::removeClient(MqttClient* remove) void MqttBroker::removeClient(MqttClient* remove)
{ {
for(auto it=clients.begin(); it!=clients.end(); it++) for(auto it=clients.begin(); it!=clients.end(); it++)
@@ -99,6 +112,11 @@ void MqttBroker::removeClient(MqttClient* remove)
auto client=*it; auto client=*it;
if (client==remove) if (client==remove)
{ {
// TODO if this broker is connected to an external broker
// we have to unsubscribe remove's topics.
// (but doing this, check that other clients are not subscribed...)
// Unless -> we could receive useless messages
// -> we are using (memory) one IndexedString plus its string for nothing.
debug("Remove " << clients.size()); debug("Remove " << clients.size());
clients.erase(it); clients.erase(it);
debug("Client removed " << clients.size()); debug("Client removed " << clients.size());
@@ -112,6 +130,13 @@ void MqttBroker::loop()
{ {
WiFiClient client = server.available(); WiFiClient client = server.available();
if (broker)
{
// TODO should monitor broker's activity.
// 1 When broker disconnect and reconnect we have to re-subscribe
broker->loop();
}
if (client) if (client)
{ {
addClient(new MqttClient(this, client)); addClient(new MqttClient(this, client));
@@ -120,7 +145,7 @@ void MqttBroker::loop()
// for(auto it=clients.begin(); it!=clients.end(); it++) // for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop // use index because size can change during the loop
for(int i=0; i<clients.size(); i++) for(size_t i=0; i<clients.size(); i++)
{ {
auto client = clients[i]; auto client = clients[i];
if (client->connected()) if (client->connected())
@@ -137,7 +162,15 @@ void MqttBroker::loop()
} }
} }
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
if (broker && broker->connected())
{
return broker->subscribe(topic, qos);
}
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk;
@@ -146,29 +179,32 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
for(auto client: clients) for(auto client: clients)
{ {
i++; i++;
#if TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Broker is connected if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
{ {
// ext broker -> clients or // ext_broker -> clients or clients -> ext_broker
// or clients -> ext broker if (source == broker) // external broker -> internal clients
if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // external clients -> this broker
{ {
MqttError ret = broker->publish(topic, msg); // As this broker is connected to another broker, simply forward the msg
MqttError ret = broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret; if (ret != MqttOk) retval = ret;
} }
} }
else // Disconnected: R7 else // Disconnected
{ {
// All is allowed
doit = true; doit = true;
} }
#if TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
#endif
if (doit) retval = client->publish(topic, msg); if (doit) retval = client->publishIfSubscribed(topic, msg);
debug(""); debug("");
} }
return retval; return retval;
@@ -227,11 +263,32 @@ void MqttClient::loop()
message.incoming(client->read()); message.incoming(client->read());
if (message.type()) if (message.type())
{ {
processMessage(); processMessage(&message);
message.reset();
} }
} }
} }
void MqttClient::resubscribe()
{
// TODO resubscription limited to 256 bytes
if (subscriptions.size())
{
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
for(auto topic: subscriptions)
{
msg.add(topic);
msg.add(0); // TODO qos
}
msg.sendTo(this); // TODO return value
}
}
MqttError MqttClient::subscribe(Topic topic, uint8_t qos) MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{ {
debug("subsribe(" << topic.c_str() << ")"); debug("subsribe(" << topic.c_str() << ")");
@@ -239,39 +296,64 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
subscriptions.insert(topic); subscriptions.insert(topic);
if (parent==nullptr) // remote broker ? if (parent==nullptr) // remote broker
{ {
debug("remote subscribe"); return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
MqttMessage msg(MqttMessage::Type::Subscribe, 2); }
else
// TODO manage packet identifier {
msg.add(0); return parent->subscribe(topic, qos);
msg.add(0);
msg.add(topic.str());
msg.add(qos);
ret = msg.sendTo(this);
// TODO we should wait (state machine) for SUBACK
} }
return ret; return ret;
} }
void MqttClient::processMessage() MqttError MqttClient::unsubscribe(Topic topic)
{ {
std::string error; auto it=subscriptions.find(topic);
std::string s; if (it != subscriptions.end())
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) {
{ subscriptions.erase(it);
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; if (parent==nullptr) // remote broker
message.hexdump("Incoming"); {
return sendTopic(topic, MqttMessage::Type::UnSubscribe, 0);
}
}
return MqttOk;
} }
auto header = message.getVHeader();
MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos)
{
MqttMessage msg(type, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic);
msg.add(qos);
// TODO instead we should wait (state machine) for SUBACK / UNSUBACK ?
return msg.sendTo(this);
}
long MqttClient::counter=0;
void MqttClient::processMessage(const MqttMessage* mesg)
{
counter++;
#if TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming");
}
#endif
auto header = mesg->getVHeader();
const char* payload; const char* payload;
uint16_t len; uint16_t len;
bool bclose=true; bool bclose=true;
switch(message.type() & 0XF0) switch(mesg->type() & 0XF0)
{ {
case MqttMessage::Type::Connect: case MqttMessage::Type::Connect:
if (mqtt_connected) if (mqtt_connected)
@@ -294,44 +376,30 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
} }
// ClientId // ClientId
message.getString(payload, len); mesg->getString(payload, len);
debug("client id len=" << len);
if (len>30)
{
Serial << '(';
for(int i=0; i<30; i++)
{
if (i%5==0) Serial << ' ';
char c=*(header+i);
Serial << (c < 32 ? '.' : c);
}
Serial << " )" << endl;
debug("Bad client id length");
break;
}
clientId = std::string(payload, len); clientId = std::string(payload, len);
payload += len; payload += len;
if (mqtt_flags & FlagWill) // Will topic if (mqtt_flags & FlagWill) // Will topic
{ {
message.getString(payload, len); // Will Topic mesg->getString(payload, len); // Will Topic
outstring("WillTopic", payload, len); outstring("WillTopic", payload, len);
payload += len; payload += len;
message.getString(payload, len); // Will Message mesg->getString(payload, len); // Will Message
outstring("WillMessage", payload, len); outstring("WillMessage", payload, len);
payload += len; payload += len;
} }
// FIXME forgetting credential is allowed (security hole) // FIXME forgetting credential is allowed (security hole)
if (mqtt_flags & FlagUserName) if (mqtt_flags & FlagUserName)
{ {
message.getString(payload, len); mesg->getString(payload, len);
if (!parent->checkUser(payload, len)) break; if (!parent->checkUser(payload, len)) break;
payload += len; payload += len;
} }
if (mqtt_flags & FlagPassword) if (mqtt_flags & FlagPassword)
{ {
message.getString(payload, len); mesg->getString(payload, len);
if (!parent->checkPassword(payload, len)) break; if (!parent->checkPassword(payload, len)) break;
payload += len; payload += len;
} }
@@ -339,21 +407,22 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl; Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl;
bclose = false; bclose = false;
mqtt_connected=true; mqtt_connected=true;
// Reuse received msg {
message.create(MqttMessage::Type::Connack); MqttMessage msg(MqttMessage::Type::ConnAck);
message.add(0); // Session present (not implemented) msg.add(0); // Session present (not implemented)
message.add(0); // Connection accepted msg.add(0); // Connection accepted
message.sendTo(this); msg.sendTo(this);
}
break; break;
case MqttMessage::Type::Connack: case MqttMessage::Type::ConnAck:
// TODO what more on connack ?
mqtt_connected = true; mqtt_connected = true;
bclose = false; bclose = false;
resubscribe();
break; break;
case MqttMessage::Type::Suback: case MqttMessage::Type::SubAck:
case MqttMessage::Type::Puback: case MqttMessage::Type::PubAck:
if (!mqtt_connected) break; if (!mqtt_connected) break;
// Ignore acks // Ignore acks
bclose = false; bclose = false;
@@ -379,18 +448,27 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
case MqttMessage::Type::UnSubscribe:
{ {
if (!mqtt_connected) break; if (!mqtt_connected) break;
payload = header+2; payload = header+2;
debug("subscribe loop"); debug("subscribe loop");
while(payload < message.end()) while(payload < mesg->end())
{ {
message.getString(payload, len); // Topic mesg->getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')'); debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len); outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len)); // subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len)); Topic topic(payload, len);
if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe)
subscriptions.insert(topic);
else
{
auto it=subscriptions.find(topic);
if (it != subscriptions.end())
subscriptions.erase(it);
}
payload += len; payload += len;
uint8_t qos = *payload++; uint8_t qos = *payload++;
debug(" qos=" << qos); debug(" qos=" << qos);
@@ -402,36 +480,42 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
if (!mqtt_connected) break; if (mqtt_connected or client == nullptr)
{ {
uint8_t qos = message.type() & 0x6; uint8_t qos = mesg->type() & 0x6;
payload = header; payload = header;
message.getString(payload, len); mesg->getString(payload, len);
Topic published(payload, len); Topic published(payload, len);
payload += len; payload += len;
len=message.end()-payload;
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl; // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
len=mesg->end()-payload;
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
if (parent)
if (client==nullptr) // internal MqttClient receives publish
{
if (callback and isSubscribedTo(published))
{
callback(this, published, payload, len); // TODO send the real payload
}
}
else if (parent) // from outside to inside
{ {
debug("publishing to parent"); debug("publishing to parent");
parent->publish(this, published, message); parent->publish(this, published, *mesg);
} }
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
// TODO should send PUBACK
bclose = false; bclose = false;
} }
break; break;
case MqttMessage::Type::PubAck: case MqttMessage::Type::Disconnect:
// TODO should discard any will msg
if (!mqtt_connected) break; if (!mqtt_connected) break;
bclose = false; mqtt_connected = false;
close(false);
bclose=false;
break; break;
default: default:
@@ -440,8 +524,9 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
}; };
if (bclose) if (bclose)
{ {
Serial << "*************** Error msg 0x" << _HEX(message.type()); Serial << "*************** Error msg 0x" << _HEX(mesg->type());
if (error.length()) Serial << ':' << error.c_str(); mesg->hexdump("-------ERROR ------");
dump();
Serial << endl; Serial << endl;
close(); close();
} }
@@ -449,7 +534,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
{ {
clientAlive(parent ? 5 : 0); clientAlive(parent ? 5 : 0);
} }
message.reset();
} }
bool Topic::matches(const Topic& topic) const bool Topic::matches(const Topic& topic) const
@@ -462,12 +546,14 @@ bool Topic::matches(const Topic& topic) const
// publish from local client // publish from local client
MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{ {
MqttMessage msg; MqttMessage msg(MqttMessage::Publish);
msg.create(MqttMessage::Publish);
msg.add(topic); msg.add(topic);
msg.add(payload, pay_length, false); msg.add(payload, pay_length, false);
msg.complete();
if (parent) if (parent)
{
return parent->publish(this, topic, msg); return parent->publish(this, topic, msg);
}
else if (client) else if (client)
return msg.sendTo(this); return msg.sendTo(this);
else else
@@ -475,31 +561,33 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
} }
// republish a received publish if it matches any in subscriptions // republish a received publish if it matches any in subscriptions
MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg) MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg)
{ {
MqttError retval=MqttOk; MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) if (isSubscribedTo(topic))
{ {
Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' '; if (client)
if (subscription.matches(topic)) retval = msg.sendTo(this);
else
{ {
Serial << " match/send"; processMessage(&msg);
if (client) // callback(this, topic, nullptr, 0); // TODO Payload
{
retval = msg.sendTo(this);
}
else if (callback)
{
callback(this, topic, nullptr, 0); // TODO Payload
}
} }
Serial << endl;
} }
return retval; return retval;
} }
bool MqttClient::isSubscribedTo(const Topic& topic) const
{
for(const auto& subscription: subscriptions)
if (subscription.matches(topic))
return true;
return false;
}
void MqttMessage::reset() void MqttMessage::reset()
{ {
buffer.clear(); buffer.clear();
@@ -527,8 +615,15 @@ void MqttMessage::incoming(char in_byte)
vheader = buffer.length(); vheader = buffer.length();
if (size==0) if (size==0)
state = Complete; state = Complete;
else if (size > 500) // TODO magic
{
state = Error;
}
else else
{
buffer.reserve(size);
state = VariableHeader; state = VariableHeader;
}
} }
break; break;
case VariableHeader: case VariableHeader:
@@ -546,6 +641,7 @@ void MqttMessage::incoming(char in_byte)
case Complete: case Complete:
default: default:
Serial << "Spurious " << _HEX(in_byte) << endl; Serial << "Spurious " << _HEX(in_byte) << endl;
hexdump("spurious");
reset(); reset();
break; break;
} }
@@ -566,7 +662,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
while(len--) incoming(*p++); while(len--) incoming(*p++);
} }
void MqttMessage::encodeLength(char* msb, int length) void MqttMessage::encodeLength(char* msb, int length) const
{ {
do do
{ {
@@ -577,13 +673,19 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
MqttError MqttMessage::sendTo(MqttClient* client) void MqttMessage::complete()
{ {
if (buffer.size()>2) encodeLength(&buffer[1], buffer.size()-2);
state = Complete;
}
MqttError MqttMessage::sendTo(MqttClient* client) const
{
if (buffer.size())
{ {
debug("sending " << buffer.size() << " bytes"); debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2); encodeLength(&buffer[1], buffer.size()-2);
hexdump("snd"); // hexdump("snd");
client->write(&buffer[0], buffer.size()); client->write(&buffer[0], buffer.size());
} }
else else

View File

@@ -5,11 +5,11 @@
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h> #include <MqttStreaming.h>
#define TINY_MQTT_DEBUG #if 0
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#define TINY_MQTT_DEBUG 1
#else #else
#define TINY_MQTT_DEBUG 0
#define debug(what) {} #define debug(what) {}
#endif #endif
@@ -39,15 +39,17 @@ class MqttMessage
public: public:
enum Type enum Type
{ {
Unknown = 0, Unknown = 0,
Connect = 0x10, Connect = 0x10,
Connack = 0x20, ConnAck = 0x20,
Publish = 0x30, Publish = 0x30,
PubAck = 0x40, PubAck = 0x40,
Subscribe = 0x80, Subscribe = 0x80,
Suback = 0x90, SubAck = 0x90,
PingReq = 0xC0, UnSubscribe = 0xA0,
PingResp = 0xD0, PingReq = 0xC0,
PingResp = 0xD0,
Disconnect = 0xE0
}; };
enum State enum State
{ {
@@ -70,6 +72,7 @@ class MqttMessage
const char* end() const { return &buffer[0]+buffer.size(); } const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; } const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return buffer.size(); } uint16_t length() const { return buffer.size(); }
void complete();
void reset(); void reset();
@@ -83,6 +86,13 @@ class MqttMessage
return state == Complete ? static_cast<Type>(buffer[0]) : Unknown; return state == Complete ? static_cast<Type>(buffer[0]) : Unknown;
} }
// shouldn't exist because it breaks constness :-(
// but this saves memory so ...
void changeType(Type type) const
{
buffer[0] = type;
}
void create(Type type) void create(Type type)
{ {
buffer=(char)type; buffer=(char)type;
@@ -91,13 +101,13 @@ class MqttMessage
size=0; size=0;
state=Create; state=Create;
} }
MqttError sendTo(MqttClient*); MqttError sendTo(MqttClient*) const;
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
void encodeLength(char* msb, int length); void encodeLength(char* msb, int length) const;
std::string buffer; mutable std::string buffer; // mutable -> sendTo()
uint8_t vheader; uint8_t vheader;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
State state; State state;
@@ -118,13 +128,15 @@ class MqttClient
FlagReserved = 1 FlagReserved = 1
}; };
public: public:
MqttClient(MqttBroker*); /** Constructor. If broker is not null, this is the adress of a local broker.
MqttClient() : MqttClient(nullptr) {}; If you want to connect elsewhere, leave broker null and use connect() **/
MqttClient(MqttBroker* broker = nullptr, const std::string& id="");
MqttClient(const std::string& id) : MqttClient(nullptr, id){}
~MqttClient(); ~MqttClient();
void connect(MqttBroker* parent); void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port, uint16_t ka=10); void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10);
bool connected() { return bool connected() { return
(parent!=nullptr and client==nullptr) or (parent!=nullptr and client==nullptr) or
@@ -135,8 +147,9 @@ class MqttClient
const std::string& id() const { return clientId; } const std::string& id() const { return clientId; }
void id(std::string& new_id) { clientId = new_id; } void id(std::string& new_id) { clientId = new_id; }
/** Should be called in main loop() */
void loop(); void loop();
void close(); void close(bool bSendDisconnect=true);
void setCallback(CallBack fun) {callback=fun; }; void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world // Publish from client to the world
@@ -146,23 +159,22 @@ class MqttClient
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
MqttError subscribe(Topic topic, uint8_t qos=0); MqttError subscribe(Topic topic, uint8_t qos=0);
MqttError unsubscribe(Topic& topic); MqttError unsubscribe(Topic topic);
bool isSubscribedTo(const Topic& topic) const;
// connected to local broker // connected to local broker
// TODO seems to be useless // TODO seems to be useless
bool isLocal() const { return client == nullptr; } bool isLocal() const { return client == nullptr; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
uint32_t ms=millis(); uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent Serial << "MqttClient (" << clientId.c_str() << ") " << (connected() ? " ON " : " OFF");
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive;
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive; Serial << (client && client->connected() ? "" : "dis") << "connected";
Serial << " cnx " << (client && client->connected()); message.hexdump("entrant msg");
Serial << " [";
message.hexdump("entrant msg");
bool c=false; bool c=false;
Serial << " [";
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
@@ -172,16 +184,21 @@ class MqttClient
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif
/** Count the number of messages that have been sent **/
static long counter;
private: private:
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
MqttError publish(const Topic& topic, MqttMessage& msg); MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
void clientAlive(uint32_t more_seconds); void clientAlive(uint32_t more_seconds);
void processMessage(); void processMessage(const MqttMessage* message);
bool mqtt_connected = false; bool mqtt_connected = false;
char mqtt_flags; char mqtt_flags;
@@ -216,12 +233,13 @@ class MqttBroker
void begin() { server.begin(); } void begin() { server.begin(); }
void loop(); void loop();
uint8_t port() const { return server.port(); } uint16_t port() const { return server.port(); }
void connect(std::string host, uint32_t port=1883); void connect(const std::string& host, uint16_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
#ifdef TINY_MQTT_DEBUG size_t clientsCount() const { return clients.size(); }
void dump() void dump()
{ {
Serial << clients.size() << " client/s" << endl; Serial << clients.size() << " client/s" << endl;
@@ -231,7 +249,6 @@ class MqttBroker
client->dump(); client->dump();
} }
} }
#endif
private: private:
friend class MqttClient; friend class MqttClient;
@@ -243,7 +260,9 @@ class MqttBroker
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const;
MqttError subscribe(const Topic& topic, uint8_t qos);
// For clients that are added not by the broker itself // For clients that are added not by the broker itself
void addClient(MqttClient* client); void addClient(MqttClient* client);

20
tests/Makefile Normal file
View File

@@ -0,0 +1,20 @@
tests:
set -e; \
for i in *-tests/Makefile; do \
echo '==== Making:' $$(dirname $$i); \
$(MAKE) -C $$(dirname $$i) -j; \
done
runtests:
set -e; \
for i in *-tests/Makefile; do \
echo '==== Running:' $$(dirname $$i); \
$$(dirname $$i)/$$(dirname $$i).out; \
done
clean:
set -e; \
for i in *-tests/Makefile; do \
echo '==== Cleaning:' $$(dirname $$i); \
$(MAKE) -C $$(dirname $$i) clean; \
done

View File

@@ -0,0 +1,6 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := local-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -0,0 +1,152 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt local unit tests.
*
* Clients are connected to pseudo remote broker
* The remote will be 127.0.0.1:1883
* We are using 127.0.0.1 because this is simpler to test with a single ESP
* Also, this will allow to mock and thus run Action on github
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
}
test(local_client_should_unregister_when_destroyed)
{
return;
assertEqual(broker.clientsCount(), (size_t)0);
{
MqttClient client;
assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected
client.connect("127.0.0.1", 1883);
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected
}
assertEqual(broker.clientsCount(), (size_t)0);
}
#if 0
test(local_connect)
{
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient client;
assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1);
}
test(local_publish_should_be_dispatched)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber;
subscriber.subscribe("a/b");
subscriber.subscribe("a/c");
subscriber.setCallback(onPublish);
MqttClient publisher;
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something
assertTrue(published[""]["a/b"] == 1);
assertTrue(published[""]["a/c"] == 2);
}
test(local_publish_should_be_dispatched_to_local_clients)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber_a("A");
subscriber_a.setCallback(onPublish);
subscriber_a.subscribe("a/b");
subscriber_a.subscribe("a/c");
MqttClient subscriber_b("B");
subscriber_b.setCallback(onPublish);
subscriber_b.subscribe("a/b");
MqttClient publisher;
publisher.publish("a/b");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertTrue(published["A"]["a/b"] == 1);
assertTrue(published["A"]["a/c"] == 1);
assertTrue(published["B"]["a/b"] == 1);
assertTrue(published["B"]["a/c"] == 0);
}
test(local_unsubscribe)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber;
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher;
publisher.publish("a/b");
subscriber.unsubscribe("a/b");
publisher.publish("a/b");
publisher.publish("a/b");
assertTrue(published[""]["a/b"] == 1); // Only one publish has been received
}
test(local_nocallback_when_destroyed)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient publisher;
{
MqttClient subscriber;
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
publisher.publish("a/b");
}
publisher.publish("a/b");
assertEqual(published.size(), (size_t)1); // Only one publish has been received
}
#endif
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ NO WIFI CONNECTION TinyMqtt TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}

View File

@@ -0,0 +1,6 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := nowifi-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -0,0 +1,165 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt nowifi unit tests.
*
* No wifi connection unit tests.
* Checks with a local broker. Clients must connect to the local broker
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
}
test(nowifi_client_should_unregister_when_destroyed)
{
assertEqual(broker.clientsCount(), (size_t)0);
{
MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1);
}
assertEqual(broker.clientsCount(), (size_t)0);
}
test(nowifi_connect)
{
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient client(&broker);
assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1);
}
test(nowifi_publish_should_be_dispatched)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber(&broker);
subscriber.subscribe("a/b");
subscriber.subscribe("a/c");
subscriber.setCallback(onPublish);
MqttClient publisher(&broker);
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something
assertEqual(published[""]["a/b"], 1);
assertEqual(published[""]["a/c"], 2);
}
test(nowifi_publish_should_be_dispatched_to_clients)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber_a(&broker, "A");
subscriber_a.setCallback(onPublish);
subscriber_a.subscribe("a/b");
subscriber_a.subscribe("a/c");
MqttClient subscriber_b(&broker, "B");
subscriber_b.setCallback(onPublish);
subscriber_b.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b"); // A and B should receive this
publisher.publish("a/c"); // A should receive this
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertEqual(published["A"]["a/b"], 1);
assertEqual(published["A"]["a/c"], 1);
assertEqual(published["B"]["a/b"], 1);
assertEqual(published["B"]["a/c"], 0);
}
test(nowifi_unsubscribe)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b"); // This publish is received
subscriber.unsubscribe("a/b");
publisher.publish("a/b"); // Those one, no (unsubscribed)
publisher.publish("a/b");
assertEqual(published[""]["a/b"], 1); // Only one publish has been received
}
test(nowifi_nocallback_when_destroyed)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient publisher(&broker);
{
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
publisher.publish("a/b");
}
publisher.publish("a/b");
assertEqual(published.size(), (size_t)1); // Only one publish has been received
}
test(nowifi_payload_nullptr)
{
return; // FIXME
published.clear();
const char* payload="abcd";
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b", payload, strlen(payload)); // This publish is received
// coming from MqttClient::publish(...)
assertEqual(payload, lastPayload);
assertEqual(lastLength, (size_t)4);
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ NO WIFI CONNECTION TinyMqtt TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}

6
tests/result.json Normal file
View File

@@ -0,0 +1,6 @@
{
"schemaVersion" : 1,
"label" : "tests",
"message" : "Message content",
"color": "red"
}

2
tests/result.yaml Normal file
View File

@@ -0,0 +1,2 @@
result: 1
insert: "passed"

View File

@@ -0,0 +1,6 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := string-indexer-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -0,0 +1,123 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt local unit tests.
*
* Clients are connected to pseudo remote broker
* The remote will be 127.0.0.1:1883
* We are using 127.0.0.1 because this is simpler to test with a single ESP
* Also, this will allow to mock and thus run Action on github
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
}
test(indexer_empty)
{
assertEqual(StringIndexer::count(), 0);
}
test(indexer_strings_deleted_should_empty_indexer)
{
assertTrue(StringIndexer::count()==0);
{
IndexedString one("one");
assertEqual(StringIndexer::count(), 1);
IndexedString two("two");
assertEqual(StringIndexer::count(), 2);
IndexedString three("three");
assertEqual(StringIndexer::count(), 3);
IndexedString four("four");
assertEqual(StringIndexer::count(), 4);
}
assertEqual(StringIndexer::count(), 0);
}
test(indexer_same_strings_count_as_one)
{
IndexedString one ("one");
IndexedString two ("one");
IndexedString three("one");
IndexedString fourt("one");
assertEqual(StringIndexer::count(), 1);
}
test(indexer_size_of_indexed_string)
{
assertEqual(sizeof(IndexedString), (size_t)1);
}
test(indexer_different_strings_are_different)
{
IndexedString one("one");
IndexedString two("two");
assertFalse(one == two);
}
test(indexer_same_strings_should_equal)
{
IndexedString one("one");
IndexedString two("one");
assertTrue(one == two);
}
test(indexer_indexed_operator_eq)
{
IndexedString one("one");
{
IndexedString same = one;
assertTrue(one == same);
assertEqual(StringIndexer::count(), 1);
}
assertEqual(StringIndexer::count(), 1);
}
test(indexer_get_string)
{
std::string sone("one");
IndexedString one(sone);
assertTrue(sone==one.str());
}
test(indexer_get_index)
{
IndexedString one1("one");
IndexedString one2("one");
IndexedString two1("two");
IndexedString two2("two");
assertTrue(one1.getIndex() == one2.getIndex());
assertTrue(two1.getIndex() == two2.getIndex());
assertTrue(one1.getIndex() != two1.getIndex());
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ TinyMqtt StringIndexer TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}