Compare commits

..

172 Commits

Author SHA1 Message Date
hsaturn
d3210c3c93 Merge branch 'main' into AsyncAndWifi 2021-04-11 23:30:13 +02:00
hsaturn
23f1207718 Lot of new functions for tinytest
- command every allowing to add peridic evaluations
  very usefull for benchmarks and load tests
- on/off command
2021-04-11 23:27:15 +02:00
hsaturn
122ab88960 Rewrite client-with-wifi.ino 2021-04-11 23:26:49 +02:00
hsaturn
28b0ac1611 Fix missing receive loop for mqttclient 2021-04-11 21:21:48 +02:00
hsaturn
1cfb5cfab1 Allow multiple command per line separated by ; 2021-04-11 19:19:06 +02:00
hsaturn
b023cd67a9 Fix AUnit in debug mode / Not async 2021-04-11 17:02:24 +02:00
hsaturn
24ee6b5201 Fixes in WiFiClient mode 2021-04-11 16:33:12 +02:00
hsaturn
2e92a98db2 Trying to fuse togeter Async and not async version 2021-04-11 15:51:33 +02:00
hsaturn
c59bddfd39 Implementation of Unsuback (unless MqttClient disconnects) 2021-04-11 01:58:44 +02:00
hsaturn
7bdb9cc0cd Tinytest, allow to blink output 0 2021-04-11 01:57:58 +02:00
hsaturn
be62699702 Fix connect problem with MqttClient 2021-04-11 00:48:04 +02:00
hsaturn
77da47e1da Update README.md 2021-04-10 18:02:48 +02:00
hsaturn
88797bfafd Added AUnit 2021-04-10 18:02:28 +02:00
hsaturn
1e3b37623d Fix AUnit build 2021-04-10 17:54:46 +02:00
hsaturn
ba6a96976a ESP32 version that could work 2021-04-10 17:47:21 +02:00
hsaturn
6afd3939b3 Merge remote-tracking branch 'origin/AsyncTcp' into main 2021-04-10 17:23:53 +02:00
hsaturn
2ffe0c13fa README.md update 2021-04-10 17:23:46 +02:00
hsaturn
fe3f8d7b32 Very promising async commit 2021-04-10 17:19:57 +02:00
hsaturn
d1c7ebe134 Added unsubscribe to tinytest 2021-04-10 17:18:53 +02:00
hsaturn
aa0ed9a7a7 Bad merge fix 2021-04-10 17:18:53 +02:00
hsaturn
48eb0daf9a Fix bug in unsubscription list 2021-04-10 17:16:25 +02:00
hsaturn
34c05bc37a Fix compilation in DEBUG mode 2021-04-10 17:16:25 +02:00
hsaturn
7c96c4a5cc Fix warning 2021-04-10 17:16:25 +02:00
hsaturn
b280196395 Added mqDns to tinytest 2021-04-10 17:16:25 +02:00
hsaturn
c75f4893e8 AsyncTcp
AsyncTcp
2021-04-10 17:16:25 +02:00
hsaturn
d666f6a53b AsyncTCP (to be continued) 2021-04-10 17:16:25 +02:00
hsaturn
7ef18de755 Very promising async commit 2021-04-10 17:16:25 +02:00
hsaturn
838df3a34a Added unsubscribe to tinytest 2021-04-10 17:15:21 +02:00
hsaturn
ad602194cf Fix bug in unsubscription list 2021-04-10 16:51:56 +02:00
hsaturn
afc9370e3e Fix compilation in DEBUG mode 2021-04-10 16:51:35 +02:00
hsaturn
d96143f185 Fix warning 2021-04-10 16:50:45 +02:00
hsaturn
9c939a5667 Added mqDns to tinytest 2021-04-10 16:50:14 +02:00
hsaturn
8a25155fd8 Bad merge fix 2021-04-10 16:06:45 +02:00
hsaturn
d64ffe772e Merge branch 'AsyncTcp' of github.com:hsaturn/TinyMqtt into AsyncTcp 2021-04-10 15:57:13 +02:00
hsaturn
db610e6f0f Merge branch 'AsyncTcp' of github.com:hsaturn/TinyMqtt into AsyncTcp 2021-04-10 15:52:04 +02:00
hsaturn
6711f30ad0 AsyncTcp
AsyncTcp
2021-04-10 15:51:29 +02:00
hsaturn
3e8d34e4e7 Very promising async commit
Very promising async commit
2021-04-10 15:47:49 +02:00
hsaturn
67a296eb28 Fix too many things in StringIndexer test 2021-04-10 15:39:42 +02:00
hsaturn
e90076d010 AsyncTcp 2021-04-10 14:03:36 +02:00
hsaturn
f42464c173 AsyncTCP (to be continued) 2021-04-10 13:58:23 +02:00
hsaturn
36b452281f Very promising async commit 2021-04-10 13:42:43 +02:00
hsaturn
077c0c6adf Typo in libraries text 2021-04-09 23:29:32 +02:00
hsaturn
6f1e5d7488 Added blink command allowing to check if loop slows down 2021-04-09 23:27:50 +02:00
hsaturn
ca8ad88109 Refactoring of EspMock 2021-04-07 06:44:15 +02:00
hsaturn
986a9c592d Update README.md 2021-04-05 14:18:21 +02:00
hsaturn
62868cba34 Fix payload test (the payload was sent, the test was buggy) 2021-04-05 13:54:40 +02:00
hsaturn
80dade00fe Avoid unitialized values 2021-04-05 13:54:09 +02:00
hsaturn
8254bd4831 gitignore removed (not properly used) 2021-04-05 13:53:35 +02:00
hsaturn
5834a278c7 Release 0.7.3 2021-04-04 06:36:47 +02:00
hsaturn
146d0de1d4 MqttClient: bug fix, connection lost at each publish received 2021-04-04 06:35:50 +02:00
hsaturn
297a22efb5 Big rewrite of MqttClient in order to avoid code duplicate 2021-04-04 05:57:48 +02:00
hsaturn
510ff514a9 [tests] Changed assertions 2021-04-04 01:07:12 +02:00
hsaturn
ad6f7155e5 Added test for StringIndexer 2021-04-03 21:11:54 +02:00
hsaturn
3ed5874373 Fix compilation 2021-04-02 19:57:04 +02:00
hsaturn
e1a936e081 Merge branch 'main' of github.com:hsaturn/TinyMqtt into main 2021-04-02 19:23:53 +02:00
hsaturn
0757a95fbf Keywords updated, code clean 2021-04-02 19:22:59 +02:00
hsaturn
7c8d71262f Fix test (not yet finished) 2021-04-02 18:59:07 +02:00
hsaturn
138ce973f2 Typos in libraries 2021-04-02 18:48:37 +02:00
hsaturn
bf499117b7 Merge branch 'main' of github.com:hsaturn/TinyMqtt into main 2021-03-31 19:12:28 +02:00
hsaturn
4ed6f72602 Merge branch 'test' into main 2021-03-31 19:11:24 +02:00
hsaturn
87a78c549f Fix crash at end of unit tests 2021-03-31 19:09:43 +02:00
hsaturn
5211360b91 Local tests added 2021-03-31 19:09:05 +02:00
hsaturn
549a23ffb7 Fix delete was not really deleting in tinytest 2021-03-31 10:40:06 +02:00
hsaturn
3a1af655d7 Fix compilation problem 2021-03-31 00:33:15 +02:00
hsaturn
e71ffefc5a Fixed a useless test and modified MqttClient constructors 2021-03-31 00:22:31 +02:00
hsaturn
b6a0dde2b1 json 2021-03-30 08:52:46 +02:00
hsaturn
babc391632 Added json 2021-03-30 08:52:10 +02:00
hsaturn
27bdbb9a0b str 2021-03-30 08:41:30 +02:00
hsaturn
6a9e158428 results 2021-03-30 08:39:02 +02:00
hsaturn
6fc6794dc3 result0.yaml 2021-03-30 08:37:25 +02:00
hsaturn
1eaa514579 result.yaml 2021-03-30 08:35:33 +02:00
hsaturn
7af4c2ca69 Added .o to gitignore 2021-03-30 08:21:56 +02:00
hsaturn
a340558460 Fix tests 2021-03-30 08:21:33 +02:00
hsaturn
9a7db237d3 Renamed local to nowifi as local will be used for local (127.0.0.1) tests 2021-03-30 08:15:13 +02:00
hsaturn
91e083e7b0 Merge branch 'linter' into main 2021-03-29 23:58:00 +02:00
hsaturn
97adc985e6 Code clean 2021-03-29 23:56:36 +02:00
hsaturn
6fcfc9dfc0 ptr 2021-03-29 23:20:49 +02:00
hsaturn
a6596ffc89 Fix ptr 2021-03-29 23:06:40 +02:00
hsaturn
533ab0c70d Try to mock Esp 2021-03-29 21:47:14 +02:00
hsaturn
d5dd896b45 MqttClient::unsubscribe implemented 2021-03-29 20:48:45 +02:00
hsaturn
bd7fa8f39c Update readme.md 2021-03-29 20:47:14 +02:00
hsaturn
6395e931ce refix EspWifi 2021-03-29 20:47:14 +02:00
hsaturn
635fee6f7c ESP8266WiFi lib added for aunit 2021-03-29 20:47:14 +02:00
hsaturn
dc2420d88e Fix makefile 2021-03-29 20:47:14 +02:00
hsaturn
2fbc46cbe2 Revert auint 2021-03-29 20:47:14 +02:00
hsaturn
a003156ae1 Fix aunit 2021-03-29 20:47:14 +02:00
hsaturn
913e1aa7ae Fixed make target 2021-03-29 20:46:59 +02:00
hsaturn
8272515bd7 Fixed make target 2021-03-29 20:46:46 +02:00
hsaturn
9a7f6a3020 Fixed make target 2021-03-29 20:46:46 +02:00
hsaturn
fead702d9f Added makefile for aunit 2021-03-29 20:46:46 +02:00
hsaturn
eaf938f2fd Test aunit 2021-03-29 20:46:46 +02:00
hsaturn
8eefa63f45 Lint fixes 2021-03-29 20:46:46 +02:00
hsaturn
9d48c436d8 test 2021-03-29 20:46:46 +02:00
hsaturn
792a28e831 deleted 2021-03-29 20:45:33 +02:00
hsaturn
9407193454 MqttClient::unsubscribe implemented 2021-03-29 20:39:10 +02:00
hsaturn
602050f309 Update readme.md 2021-03-29 11:45:51 +02:00
hsaturn
1a70c90af2 refix EspWifi 2021-03-29 00:01:32 +02:00
hsaturn
ed4091c53e ESP8266WiFi lib added for aunit 2021-03-28 23:57:06 +02:00
hsaturn
f2a805f724 Fix makefile 2021-03-28 23:50:50 +02:00
hsaturn
3083bcf071 Revert auint 2021-03-28 23:44:11 +02:00
hsaturn
d01f46dbc1 Fix aunit 2021-03-28 23:42:57 +02:00
hsaturn
39b2257619 Merge branch 'linter' of github.com:hsaturn/TinyMqtt into linter 2021-03-28 23:39:50 +02:00
hsaturn
60d385189b Fixed make target 2021-03-28 23:39:37 +02:00
hsaturn
82c5b971e9 Release 0.7.0 2021-03-28 23:34:54 +02:00
hsaturn
3f2c1c57e1 Fixed make target 2021-03-28 23:30:19 +02:00
hsaturn
e550197d0a Fixed make target 2021-03-28 23:28:48 +02:00
hsaturn
253bc9b3f5 Added makefile for aunit 2021-03-28 23:26:53 +02:00
hsaturn
96d8018960 Test aunit 2021-03-28 23:16:07 +02:00
hsaturn
505cacc2df Lint fixes 2021-03-28 23:09:10 +02:00
hsaturn
62848056a2 test 2021-03-28 23:01:14 +02:00
hsaturn
01998e74ec Test linter 2021-03-28 22:59:42 +02:00
hsaturn
5f46fd304c Release 0.7.0 2021-03-28 22:47:13 +02:00
hsaturn
213d637eaf Code cleaning 2021-03-28 21:31:10 +02:00
hsaturn
3bb2dd5a81 Code cleaning 2021-03-28 21:29:02 +02:00
hsaturn
7d9ab6381d Disconnect added (finally) 2021-03-28 21:28:06 +02:00
hsaturn
470cde62da Version 0.6.0 2021-03-27 10:26:41 +01:00
hsaturn
3fb9b6317d README.md modified (typo) 2021-03-27 02:00:49 +01:00
hsaturn
ee9ad93bfd README.md modified 2021-03-27 02:00:00 +01:00
hsaturn
0b1a932244 MqttClient resubscribe on reconnect 2021-03-27 01:55:03 +01:00
hsaturn
972759237c Speed and stability improved 2021-03-27 01:40:49 +01:00
hsaturn
cb00d7f82a MqttClient / UnSubscribe message implemented 2021-03-27 01:39:37 +01:00
hsaturn
0b735d22a5 Less debug as the code is more stable now 2021-03-27 01:38:32 +01:00
hsaturn
9178aac02c MqttClient client length augmented to 60 (was not passing MqttBox tests 2021-03-26 01:59:08 +01:00
hsaturn
c706fbcff2 Update readme 2021-03-26 01:52:31 +01:00
hsaturn
a0c41a0ccb Smart ip feature for connect 2021-03-26 01:51:30 +01:00
hsaturn
b780dcf99c test fix, was unable to use set when replacements occurs 2021-03-26 01:10:33 +01:00
hsaturn
f122d5e902 relase 0.5.1 2021-03-25 01:26:27 +01:00
hsaturn
d63793cf77 Avoid to use message member, minor changes 2021-03-25 01:26:03 +01:00
hsaturn
8386779e92 tinytest great enhancements 2021-03-25 01:24:46 +01:00
hsaturn
1b988a06a2 Relase 0.5.0 2021-03-24 21:21:46 +01:00
hsaturn
d92aa1fe3c Minor change to publish 2021-03-24 21:20:07 +01:00
hsaturn
0d6e194560 test client enhancements 2021-03-24 21:19:44 +01:00
hsaturn
7107da2cce Client supports (does not disconnect) Suback / Puback 2021-03-24 21:18:27 +01:00
hsaturn
28c8713415 Client keep_alive is now parameterized 2021-03-24 21:17:08 +01:00
hsaturn
70cf8137de Fixed build of client-without-wifi 2021-03-24 18:35:57 +01:00
hsaturn
5ab315e472 Removed dependency with Streaming.h 2021-03-24 18:35:11 +01:00
hsaturn
b96b36f10c README update 2021-03-24 01:33:45 +01:00
hsaturn
ba831ea366 README update 2021-03-24 01:32:47 +01:00
hsaturn
4020393f90 MqttClient can subscribe and receive publishes from distant broker 2021-03-24 01:30:56 +01:00
hsaturn
7b20e7deb5 Supports multiple subscriptions 2021-03-23 23:51:33 +01:00
hsaturn
efe6a05bbd MqttStreaming.h, streaming with fixes 2021-03-23 23:41:00 +01:00
hsaturn
84dbb80106 Release 0.4.0 2021-03-22 02:45:57 +01:00
hsaturn
47bc06f0ce removed need of Streaming.h if no debug 2021-03-22 02:44:30 +01:00
hsaturn
07c96c19a5 Better simple-client example 2021-03-22 02:35:34 +01:00
hsaturn
de8813f9f6 No more serial prints 2021-03-22 02:35:10 +01:00
hsaturn
fbc24c94e3 MqttClient::publish with String added 2021-03-22 02:34:45 +01:00
hsaturn
169abf8099 allow MqttClient to be constructed with nothing 2021-03-22 02:34:23 +01:00
hsaturn
5cee67095e Fix payload content 2021-03-22 02:33:54 +01:00
hsaturn
0cb2e99b4b Better debug defines 2021-03-22 02:32:45 +01:00
hsaturn
54c905a32f API Changed
Fix too long time
2021-03-22 02:10:54 +01:00
hsaturn
befab9dd6e More TODOs (happy betas) 2021-03-22 01:59:40 +01:00
hsaturn
bd2e7cc5f6 Fix crash on MqttClient timeout when not linked to a broker 2021-03-22 01:59:17 +01:00
hsaturn
18b5f0c27b Better client creation 2021-03-22 01:19:50 +01:00
hsaturn
e71a4d5e87 MqttClient was unable to publish in some cases 2021-03-22 01:19:26 +01:00
hsaturn
620dbf31af Rewrite interpreter, can handle brokers now 2021-03-22 00:28:05 +01:00
hsaturn
52690ec7e7 Fix some rare case crashes 2021-03-22 00:27:23 +01:00
hsaturn
9f28e7f92f Version 0.3.0 library files 2021-03-21 19:34:40 +01:00
hsaturn
ed9efbb5ce Removed buffer 256 thus less memory is needed for MqttClient instances 2021-03-21 17:21:36 +01:00
hsaturn
4fd34bfffa More commands, and dot notation added 2021-03-21 16:34:14 +01:00
hsaturn
7be4d86f46 TODO list changed 2021-03-21 16:33:53 +01:00
hsaturn
428eb51850 Release 0.3.0
clients can now connect to outside.
bug fixed for broker (pings etc.)
crashes fixed when clients where removed
More examples added (the tinymqtt-test is great)
2021-03-21 13:50:42 +01:00
hsaturn
4e629bbc1e Update README.md 2021-03-21 12:26:02 +01:00
hsaturn
389a2eec8b Update README.md 2021-03-21 12:20:11 +01:00
hsaturn
6ff31c9820 Readme rewritten 2021-03-21 12:09:52 +01:00
hsaturn
dd44a4a658 Readme rewritten 2021-03-21 12:08:12 +01:00
hsaturn
17fabeae79 Readme rewritten 2021-03-21 11:50:23 +01:00
hsaturn
d052f6b55a Credentials added 2021-03-21 11:15:28 +01:00
hsaturn
6a80b29fd3 [broker] fix timeout on external client 2021-03-19 22:30:58 +01:00
hsaturn
cc708cdf22 Example when wifi is not connected 2021-03-19 22:04:23 +01:00
hsaturn
132fc56803 Update README.md 2021-03-19 22:01:02 +01:00
hsaturn
b33c9ba687 Version 0.2 2021-03-19 19:02:40 +01:00
hsaturn
bb2a2e6737 Added includes 2021-03-16 23:53:52 +01:00
27 changed files with 2829 additions and 187 deletions

28
.github/workflows/aunit.yml vendored Normal file
View File

@@ -0,0 +1,28 @@
# See https://docs.github.com/en/actions/guides for documentation about GitHub
# Actions.
name: AUnit Tests
# Run on all branches.
on: [push]
jobs:
build:
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v2
- name: Setup
run: |
cd ..
git clone https://github.com/bxparks/EpoxyDuino
git clone https://github.com/bxparks/AceRoutine
git clone https://github.com/bxparks/AUnit
git clone https://github.com/bxparks/AceCommon
git clone https://github.com/hsaturn/EspMock
- name: Verify tests
run: |
make -C tests
make -C tests runtests

26
.github/workflows/superlinter.yml vendored Normal file
View File

@@ -0,0 +1,26 @@
name: Super-Linter
# Run this workflow every time a new commit pushed to your repository
#
on: push
jobs:
# Set the job key. The key is displayed as the job name
# when a job name is not provided
super-lint:
# Name the Job
name: Lint code base
# Set the type of machine to run on
runs-on: ubuntu-latest
steps:
# Checks out a copy of your repository on the ubuntu-latest machine
- name: Checkout code
uses: actions/checkout@v2
# Runs the Super-Linter action
- name: Run Super-Linter
uses: github/super-linter@v3
env:
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

1
.gitignore vendored
View File

@@ -1 +0,0 @@
*~

View File

@@ -1,2 +1,66 @@
# TinyMqtt
ESP 8266 Small footprint Mqtt Broker and Client
![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
[![AUnit Tests](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml/badge.svg)](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml)
![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![Esp8266](https://img.shields.io/badge/platform-ESP8266-green)
![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![Mqtt 3.1.1](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small, fast and capable Mqtt Broker and Client
## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode)
- Brokers can connect to another broker and becomes then a
proxy for clients that are connected to it.
- zeroconf, this is a strange but very powerful mode where
all brokers tries to connect together on the same local network.
## Quickstart
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
(you can use the Arduino library manager and search for TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)
## Examples
| Example | Description |
| ------------------- | ------------------------------------------ |
| client-without-wifi | standalone example |
| simple-client | Connect the ESP to an external Mqtt broker |
| simple-broker | Simple Mqtt broker with your ESP |
| tinymqtt-test | Complex console example |
- tinymqtt-test : This is a complex sketch with a terminal console
that allows to add clients publish, connect etc with interpreted commands.
## Standalone mode (zeroconf)
-> The zeroconf mode is not yet implemented
zeroconf clients to connect to broker on local network.
In Zeroconf mode, each ESP is a a broker and scans the local network.
After a while one ESP naturally becomes a 'master' and all ESP are connected together.
No problem if the master dies, a new master will be choosen soon.
## TODO List
* ~~Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)~~
* Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients)
* Why not a 'global' TinyMqtt::loop() instead of having to call loop for all broker/clients instances
* Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* ~~MqttClient auto re-subscribe (::resubscribe works bad on broker.emqx.io)~~
* MqttClient auto reconnection
* MqttClient user/password
* Wildcards (I may implement only # as I'm not interrested by a clever and cpu consuming matching)
* I suspect that MqttClient::parent could be removed and replaced with a simple boolean
(this'll need to rewrite a few functions)
## License
Gnu GPL 3.0, see [LICENSE](https://github.com/hsaturn/TinyMqtt/blob/main/LICENSE).

View File

@@ -0,0 +1,96 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/**
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
* | +-------->| broker | | 1883 <--- External client/s
* | | +--------+ |
* | | ^ |
* | | | |
* | v v |
* | +----------+ +----------+ |
* | | internal | | internal | |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
*/
#include <my_credentials.h>
std::string topic="sensor/temperature";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << endl << "---------> A Received " << topic.c_str() << endl; }
void onPublishB(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << endl << "---------> B Received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
mqtt_b.setCallback(onPublishB);
mqtt_b.subscribe(topic);
}
void loop()
{
broker.loop(); // Don't forget to add loop for every broker and clients
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
// ============= client B publish ================
static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str()));
}
}

View File

@@ -0,0 +1,63 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/** TinyMQTT allows a disconnected mode:
*
* In this example, local clients A and B are talking together, no need to be connected.
* A single ESP can use this to be able to comunicate with itself with the power
* of MQTT, and once connected still continue to work with others.
*
*/
std::string topic="sensor/temperature";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "init" << endl;
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
mqtt_b.setCallback(onPublishB);
mqtt_b.subscribe(topic);
}
void loop()
{
broker.loop();
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 5000;
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
// ============= client B publish ================
static const int intervalB = 3000; // will send topic each 5000 ms
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic);
}
}

View File

@@ -1,9 +1,6 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
const char *ssid = ; // Put here your wifi SSID ("ssid")
const char *password = ; // Put here your Wifi password ("pwd")
#include <my_credentials.h>
#define PORT 1883
MqttBroker broker(PORT);
@@ -16,7 +13,6 @@ void setup()
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial << '.';
delay(500);
}

View File

@@ -0,0 +1,56 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
/** Simple Client
*
* This is the simplest Mqtt client configuration
*
* 1 - edit my_credentials.h to setup wifi essid/password
* 2 - change BROKER values (or keep emqx.io test broker)
*
* pro - small memory footprint (both ram and flash)
* - very simple to setup and use
*
* cons - cannot work without wifi connection
* - stop working if broker is down
* - local publishes takes more time (because they go outside)
*/
const char* BROKER = "broker.emqx.io";
const uint16_t BROKER_PORT = 1883;
#include <my_credentials.h>
static float temp=19;
static MqttClient client;
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Simple clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect(BROKER, BROKER_PORT); // Put here your broker ip / port
}
void loop()
{
client.loop();
delay(1000);
auto rnd=random(100);
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
client.publish("sensor/temperature", String(temp));
}

View File

@@ -0,0 +1,39 @@
// vim: ts=40
Exemple of commands that can be sent via the serial monitor to tinymqtt-test
----------------------------------------------------------------------------
Commands can usually be abbreviated to their first letters.
ex: cl for client, a / a.con / a.sub / a.p for publish.
--------
set name value set variable name to value (later replaced)
set name if no value, then var is erased
set view all vars
reserved keywords are forbidden
client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883
a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic
delete a destroy the client
* note, if 'server' is a number, then it replaces the end of the local ip.
i.e. if local ip is 192.168.1.10, connect 2.35 becomes 192.168.2.35
--------
example:
--------
client c
c.connect broker.emqx.io
set topic sensor/temperature
c.subscribe topic
c.publish topic 15
c.publish topic 20
macro exansion example
----------------------
set temp publish sensor/temperature
c.temp 20 -> c.publish sensor/temperature 20

View File

@@ -0,0 +1,760 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h>
#include <ESP8266mDNS.h>
#include <sstream>
#include <map>
/**
* Console allowing to make any kind of test.
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
*/
#include <my_credentials.h>
std::string topic="sensor/temperature";
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str();
if (payload) Serial << ", payload[" << length << "]=[";
while(length--)
{
const char c=*payload++;
if (c!=10 and c!=13 and c <32) Serial << '?';
Serial << *payload++;
}
Serial<< endl;
}
std::map<std::string, MqttClient*> clients;
std::map<std::string, MqttBroker*> brokers;
void setup()
{
WiFi.persistent(false); // https://github.com/esp8266/Arduino/issues/1054
Serial.begin(115200);
delay(500);
Serial << endl << endl << endl
<< "Connecting to '" << ssid << "' ";
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
Serial << "Type help for more..." << endl;
const char* name="tinytest";
Serial << "Starting MDNS, name= " << name;
if (!MDNS.begin(name))
Serial << " error, not available." << endl;
else
Serial << " ok." << endl;
MqttBroker* broker = new MqttBroker(1883);
broker->begin();
brokers["broker"] = broker;
}
int getint(std::string& str, const int if_empty=0)
{
std::string sword;
while(str.length() && str[0]>='0' && str[0]<='9')
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return atoi(sword.c_str());
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string sword;
while(str.length() && str[0]!=sep)
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return sword;
}
bool isaddr(std::string s)
{
if (s.length()==0 or s.length()>3) return false;
for(char c: s)
if (c<'0' or c>'9') return false;
return true;
}
std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string addr=getword(str, if_empty, sep);
std::string ip=addr;
std::vector<std::string> build;
while(ip.length())
{
std::string b=getword(ip,nullptr,'.');
if (isaddr(b) && build.size()<4)
{
build.push_back(b);
}
else
return addr;
}
IPAddress local=WiFi.localIP();
addr="";
while(build.size()!=4)
{
std::stringstream b;
b << (int)local[3-build.size()];
build.insert(build.begin(), b.str());
}
for(std::string s: build)
{
if (addr.length()) addr += '.';
addr += s;
}
return addr;
}
std::map<std::string, std::string> vars;
std::set<std::string> commands = {
"auto", "broker", "blink", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "unsubscribe", "view", "every"
};
void getCommand(std::string& search)
{
while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return;
std::string matches;
int count=0;
for(std::string cmd: commands)
{
if (cmd.substr(0, search.length()) == search)
{
if (count) matches +=", ";
count++;
matches += cmd;
}
}
if (count==1)
search = matches;
else if (count>1)
{
Serial << "Ambiguous command: " << matches << endl;
search="";
}
}
void replace(const char* d, std::string& str, std::string srch, std::string to)
{
if (d[0] && d[1])
{
srch=d[0]+srch+d[1];
to=d[0]+to+d[1];
size_t pos = 0;
while((pos=str.find(srch, pos)) != std::string::npos)
{
str.erase(pos, srch.length());
str.insert(pos, to);
pos += to.length();
}
}
}
void replaceVars(std::string& cmd)
{
cmd = ' '+cmd+' ';
for(auto it: vars)
{
replace("..", cmd, it.first, it.second);
replace(". ", cmd, it.first, it.second);
replace(" .", cmd, it.first, it.second);
replace(" ", cmd, it.first, it.second);
}
cmd.erase(0, cmd.find_first_not_of(" "));
cmd.erase(cmd.find_last_not_of(" ")+1);
}
// publish at regular interval
class automatic
{
public:
automatic(MqttClient* clt, uint32_t intervl)
: client(clt), topic_(::topic)
{
interval(intervl);
autos[clt] = this;
}
void interval(uint32_t new_interval)
{
interval_ = new_interval;
if (interval_<1000) interval_=1000;
timer_ = millis() + interval_;
}
void loop_()
{
if (!bon) return;
if (interval_ && millis() > timer_)
{
Serial << "AUTO PUBLISH " << interval_ << endl;
timer_ += interval_;
client->publish(topic_, std::string(String(15+millis()%10).c_str()));
}
}
void topic(std::string new_topic) { topic_ = new_topic; }
static void loop()
{
for(auto it: autos)
it.second->loop_();
}
static void command(MqttClient* who, std::string cmd)
{
automatic* autop = nullptr;
if (autos.find(who) != autos.end())
{
autop=autos[who];
}
std::string s = getword(cmd);
if (compare(s, "create"))
{
std::string seconds=getword(cmd, "10000");
if (autop) delete autop;
std::string top = getword(cmd, ::topic.c_str());
autos[who] = new automatic(who, atol(seconds.c_str()));
autos[who]->topic(top);
autos[who]->bon=true;
Serial << "New auto (" << seconds.c_str() << " topic:" << top.c_str() << ')' << endl;
}
else if (autop)
{
while(s.length())
{
if (s=="on")
{
autop->bon = true;
autop->interval(autop->interval_);
}
else if (s=="off")
autop->bon=false;
else if (s=="interval")
{
int32_t i=getint(cmd);
if (i)
autop->interval(atol(s.c_str()));
else
Serial << "Bad value" << endl;
}
else if (s=="view")
{
Serial << " automatic "
<< (int32_t)autop->client
<< " interval " << autop->interval_
<< (autop->bon ? " on" : " off") << endl;
}
else
{
Serial << "Unknown auto command (" << s.c_str() << ")" << endl;
break;
}
s=getword(cmd);
}
}
else if (who==nullptr)
{
for(auto it: autos)
command(it.first, s+' '+cmd);
}
else
Serial << "what ? (" << s.c_str() << ")" << endl;
}
static void help()
{
Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl;
Serial << " auto [$id] interval [s]" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl;
}
private:
MqttClient* client;
uint32_t interval_;
uint32_t timer_;
std::string topic_;
bool bon=false;
static std::map<MqttClient*, automatic*> autos;
float temp=19;
};
std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd)
{
uint8_t p=0;
while(s[p++]==*cmd++)
{
if (*cmd==0 or s[p]==0) return true;
if (s[p]==' ') return true;
}
return false;
}
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
struct Every
{
std::string cmd;
uint32_t ms;
uint32_t next;
void dump()
{
auto mill=millis();
Serial << ms << "ms [" << cmd << "] next in ";
if (mill > next)
Serial << "now";
else
Serial << next-mill << "ms";
}
};
uint32_t blink_ms_on[16];
uint32_t blink_ms_off[16];
uint32_t blink_next[16];
bool blink_state[16];
int16_t blink;
std::vector<Every> everies;
void eval(std::string& cmd)
{
while(cmd.length())
{
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
}
else if (compare(s, "on"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, HIGH);
}
else if (compare(s, "off"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
}
else if (compare(s, "every"))
{
uint32_t ms = getint(cmd, 0);
if (ms and cmd.length())
{
Every every;
every.ms=ms;
every.cmd=cmd;
every.next=millis()+ms;
everies.push_back(every);
every.dump();
Serial << endl;
cmd="";
}
else if (ms==0 and compare(cmd, "list"))
{
getword(cmd);
Serial << "List of everies (ms=" << millis() << ")" << endl;
uint8_t count=0;
for(auto& every: everies)
{
Serial << count << ": ";
every.dump();
Serial << endl;
count++;
}
}
else if (ms==0 and compare(cmd, "remove"))
{
getword(cmd);
int8_t every=getint(cmd, -1);
if (every==-1 and compare(cmd, "all"))
{
getword(cmd);
everies.clear();
}
else if (everies.size() > (uint8_t)every)
{
everies.erase(everies.begin()+every);
}
}
}
else if (compare(s, "blink"))
{
int8_t blink_nr = getint(cmd, -1);
if (blink_nr >= 0)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< blink_nr;
else
{
blink &= ~(1<< blink_nr);
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " every ms [command]; every list; every remove [nr|all]" << endl;
Serial << " on {output}; off {output}" << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
}
}
void loop()
{
auto ms=millis();
int8_t out=0;
int16_t blink_bits = blink;
for(auto& every: everies)
{
if (every.ms && every.cmd.length() && ms > every.next)
{
std::string cmd(every.cmd);
eval(cmd);
every.next += every.ms;
}
}
while(blink_bits)
{
if (blink_ms_on[out] and ms > blink_next[out])
{
if (blink_state[out])
{
blink_next[out] += blink_ms_on[out];
digitalWrite(out, LOW);
}
else
{
blink_next[out] += blink_ms_off[out];
digitalWrite(abs(out), HIGH);
}
blink_state[out] = not blink_state[out];
}
blink_bits >>=1;
out++;
}
static long count;
MDNS.update();
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers)
it.second->loop();
for(auto it: clients)
it.second->loop();
automatic::loop();
if (Serial.available())
{
static std::string cmd;
char c=Serial.read();
if (c==10 or c==14)
{
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
eval(cmd);
}
else
{
cmd=cmd+c;
}
}
}

View File

@@ -3,16 +3,29 @@
#######################################
#######################################
# Datatypes (KEYWORD1)
# Datatypes and functions
#######################################
MqttBroker KEYWORD1
TinyMqtt KEYWORD1
MqttBroker KEYWORD1
connect KEYWORD2
clientsCount KEYWORD2
begin KEYWORD2
loop KEYWORD2
port KEYWORD2
MqttClient KEYWORD1
connect KEYWORD2
connected KEYWORD2
publish KEYWORD2
setCallback KEYWORD2
subscribe KEYWORD2
unsubscribe KEYWORD2
#######################################
# Methods and Functions (KEYWORD2)
#######################################
Topic KEYWORD1
matches KEYWORD2
c_str KEYWORD2
#######################################
# Constants (LITERAL1)

View File

@@ -1,12 +1,12 @@
{
"name": "TinyMqtt",
"keywords": "ethernet, mqtt, m2m, iot",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS.",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages. It does support MQTT 3.1.1 with QOS=0.",
"repository": {
"type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git"
},
"version": "0.1",
"version": "0.7.3",
"exclude": "",
"examples": "examples/*/*.ino",
"frameworks": "arduino",

View File

@@ -1,9 +1,11 @@
name=TinyMqtt
version=0.1
author=HSaturn <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com>
version=0.7.3
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP. It does support MQTT 3.1.1 with QoS=0.
category=Communication
url=https://github.com/hsaturn/TinyMqtt
architectures=*
includes=TinyMqtt.h
depends=AsyncTCP

412
src/MqttStreaming.h Normal file
View File

@@ -0,0 +1,412 @@
/* MqttStreaming.h - Fork of Streaming.h adding std::string and with some minor fixes
* (I have to speek to the author in order to include my changes to his library if possible)
**/
/*
Streaming.h - Arduino library for supporting the << streaming operator
Copyright (c) 2010-2012 Mikal Hart. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Version 6 library changes
Copyright (c) 2019 Gazoodle. All rights reserved.
1. _BASED moved to template to remove type conversion to long and
sign changes which break int8_t and int16_t negative numbers.
The print implementation still upscales to long for it's internal
print routine.
2. _PAD added to allow padding & filling of characters to the stream
3. _WIDTH & _WIDTHZ added to allow width printing with space padding
and zero padding for numerics
4. Simple _FMT mechanism ala printf, but without the typeunsafetyness
and no internal buffers for replaceable stream printing
*/
#ifndef ARDUINO_STREAMING
#define ARDUINO_STREAMING
#if defined(ARDUINO) && ARDUINO >= 100
#include "Arduino.h"
#else
#ifndef STREAMING_CONSOLE
#include "WProgram.h"
#endif
#endif
#include <string>
#if defined(ARDUINO_ARCH_AVR) || defined(ARDUINO_ARCH_MEGAAVR)
// No stl library, so need trivial version of std::is_signed ...
namespace std {
template<typename T>
struct is_signed { static const bool value = false; };
template<>
struct is_signed<int8_t> { static const bool value = true; };
template<>
struct is_signed<int16_t> { static const bool value = true; };
template<>
struct is_signed<int32_t> { static const bool value = true; };
};
#else
#include <type_traits>
#endif
#define STREAMING_LIBRARY_VERSION 6
#if !defined(typeof)
#define typeof(x) __typeof__(x)
#endif
// PrintBuffer implementation of Print, a small buffer to print in
// see its use with pad_float()
template <size_t N>
class PrintBuffer : public Print
{
size_t pos = 0;
char str[N] {};
public:
inline const char *operator() ()
{ return str; };
// inline void clear()
// { pos = 0; str[0] = '\0'; };
inline size_t write(uint8_t c)
{ return write(&c, 1); };
inline size_t write(const uint8_t *buffer, size_t size)
{
size_t s = std::min(size, N-1 - pos); // need a /0 left
if (s)
{
memcpy(&str[pos], buffer, s);
pos += s;
}
return s;
};
};
// Generic template
template<class T>
inline Print &operator <<(Print &stream, const T &arg)
{ stream.print(arg); return stream; }
// TODO sfinae maybe could do the trick ?
inline Print &operator <<(Print &stream, const std::string &str)
{ stream.print(str.c_str()); return stream; }
template<typename T>
struct _BASED
{
T val;
int base;
_BASED(T v, int b): val(v), base(b)
{}
};
#if ARDUINO >= 100
struct _BYTE_CODE
{
byte val;
_BYTE_CODE(byte v) : val(v)
{}
};
#define _BYTE(a) _BYTE_CODE(a)
inline Print &operator <<(Print &obj, const _BYTE_CODE &arg)
{ obj.write(arg.val); return obj; }
#else
#define _BYTE(a) _BASED<typeof(a)>(a, BYTE)
#endif
#define _HEX(a) _BASED<typeof(a)>(a, HEX)
#define _DEC(a) _BASED<typeof(a)>(a, DEC)
#define _OCT(a) _BASED<typeof(a)>(a, OCT)
#define _BIN(a) _BASED<typeof(a)>(a, BIN)
// Specialization for class _BASED
// Thanks to Arduino forum user Ben Combee who suggested this
// clever technique to allow for expressions like
// Serial << _HEX(a);
template<typename T>
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
{ obj.print(arg.val, arg.base); return obj; }
#if ARDUINO >= 18
// Specialization for class _FLOAT
// Thanks to Michael Margolis for suggesting a way
// to accommodate Arduino 0018's floating point precision
// feature like this:
// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision
struct _FLOAT
{
double val; // only Print::print(double)
int digits;
_FLOAT(double v, int d): val(v), digits(d)
{}
};
inline Print &operator <<(Print &obj, const _FLOAT &arg)
{ obj.print(arg.val, arg.digits); return obj; }
#endif
// Specialization for enum _EndLineCode
// Thanks to Arduino forum user Paul V. who suggested this
// clever technique to allow for expressions like
// Serial << "Hello!" << endl;
enum _EndLineCode { endl };
inline Print &operator <<(Print &obj, _EndLineCode)
{ obj.println(); return obj; }
// Specialization for padding & filling, mainly utilized
// by the width printers
//
// Use like
// Serial << _PAD(10,' '); // Will output 10 spaces
// Serial << _PAD(4, '0'); // Will output 4 zeros
struct _PAD
{
int8_t width;
char chr;
_PAD(int8_t w, char c) : width(w), chr(c) {}
};
inline Print &operator <<(Print& stm, const _PAD &arg)
{
for(int8_t i = 0; i < arg.width; i++)
stm.print(arg.chr);
return stm;
}
// Specialization for width printing
//
// Use like Result
// -------- ------
// Serial << _WIDTH(1,5) " 1"
// Serial << _WIDTH(10,5) " 10"
// Serial << _WIDTH(100,5) " 100"
// Serial << _WIDTHZ(1,5) "00001"
//
// Great for times & dates, or hex dumps
//
// Serial << _WIDTHZ(hour,2) << ':' << _WIDTHZ(min,2) << ':' << _WIDTHZ(sec,2)
//
// for(int index=0; index<byte_array_size; index++)
// Serial << _WIDTHZ(_HEX(byte_array[index]))
template<typename T>
struct __WIDTH
{
const T val;
int8_t width;
char pad;
__WIDTH(const T& v, int8_t w, char p) : val(v), width(w), pad(p) {}
};
// Count digits in an integer of specific base
template<typename T>
inline uint8_t digits(T v, int8_t base = 10)
{
uint8_t digits = 0;
if ( std::is_signed<T>::value )
{
if ( v < 0 )
{
digits++;
v = -v; // v needs to be postive for the digits counter to work
}
}
do
{
v /= base;
digits++;
} while( v > 0 );
return digits;
}
// Generic get the width of a value in base 10
template<typename T>
inline uint8_t get_value_width(T val)
{ return digits(val); }
inline uint8_t get_value_width(const char * val)
{ return strlen(val); }
#ifdef ARDUINO
inline uint8_t get_value_width(const __FlashStringHelper * val)
{ return strlen_P(reinterpret_cast<const char *>(val)); }
#endif
// _BASED<T> get the width of a value
template<typename T>
inline uint8_t get_value_width(_BASED<T> b)
{ return digits(b.val, b.base); }
// Constructor wrapper to allow automatic template parameter deduction
template<typename T>
__WIDTH<T> _WIDTH(T val, int8_t width) { return __WIDTH<T>(val, width, ' '); }
template<typename T>
__WIDTH<T> _WIDTHZ(T val, int8_t width) { return __WIDTH<T>(val, width, '0'); }
// Operator overload to handle width printing.
template<typename T>
inline Print &operator <<(Print &stm, const __WIDTH<T> &arg)
{ stm << _PAD(arg.width - get_value_width(arg.val), arg.pad) << arg.val; return stm; }
// explicit Operator overload to handle width printing of _FLOAT, double and float
template<typename T>
inline Print &pad_float(Print &stm, const __WIDTH<T> &arg, const double val, const int digits = 2) // see Print::print(double, int = 2)
{
PrintBuffer<32> buf; // it's only ~45B on the stack, no allocation, leak or fragmentation
size_t size = buf.print(val, digits); // print in buf
return stm << _PAD(arg.width - size, arg.pad) << buf(); // pad and concat what's in buf
}
inline Print &operator <<(Print &stm, const __WIDTH<float> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<double> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<_FLOAT> &arg)
{ auto& f = arg.val; return pad_float(stm, arg, f.val, f.digits); }
// a less verbose _FLOATW for _WIDTH(_FLOAT)
#define _FLOATW(val, digits, width) _WIDTH<_FLOAT>(_FLOAT((val), (digits)), (width))
// Specialization for replacement formatting
//
// Designed to be similar to printf that everyone knows and loves/hates. But without
// the internal buffers and type agnosticism. This version only has placeholders in
// the format string, the actual values are supplied using the stream safe operators
// defined in this library.
//
// Use like this:
//
// Serial << FMT(F("Replace % with %"), 1, 2 )
// Serial << FMT("Time is %:%:%", _WIDTHZ(hours,2), _WIDTHZ(minutes,2), _WIDTHZ(seconds,2))
// Serial << FMT("Your score is %\\%", score); // Note the \\ to escape the % sign
// Ok, hold your hats. This is a foray into C++11's variadic template engine ...
inline char get_next_format_char(const char *& format_string)
{
char format_char = *format_string;
if ( format_char > 0 ) format_string++;
return format_char;
}
#ifdef ARDUINO
inline char get_next_format_char(const __FlashStringHelper*& format_string)
{
char format_char = pgm_read_byte(format_string);
if ( format_char > 0 ) format_string = reinterpret_cast<const __FlashStringHelper*>(reinterpret_cast<const char *>(format_string)+1);
return format_char;
}
#endif
template<typename Ft>
inline bool check_backslash(char& format_char, Ft& format_string)
{
if ( format_char == '\\')
{
format_char = get_next_format_char(format_string);
return true;
}
return false;
}
// The template tail printer helper
template<typename Ft, typename... Ts>
struct __FMT
{
Ft format_string;
__FMT(Ft f, Ts ... args) : format_string(f) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
check_backslash(c, format);
if ( c )
stm.print(c);
}
}
};
// The variadic template helper
template<typename Ft, typename T, typename... Ts>
struct __FMT<Ft, T, Ts...> : __FMT<Ft, Ts...>
{
T val;
__FMT(Ft f, T t, Ts... ts) : __FMT<Ft, Ts...>(f, ts...), val(t) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
if (!check_backslash(c, format))
{
if ( c == '%')
{
stm << val;
// Variadic recursion ... compiler rolls this out during
// template argument pack expansion
__FMT<Ft, Ts...>::tstreamf(stm, format);
return;
}
}
if (c)
stm.print(c);
}
}
};
// The actual operator should you only instanciate the FMT
// helper with a format string and no parameters
template<typename Ft, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// The variadic stream helper
template<typename Ft, typename T, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, T, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// As we don't have C++17, we can't get a constructor to use
// automatic argument deduction, but ... this little trick gets
// around that ...
template<typename Ft, typename... Ts>
__FMT<Ft, Ts...> _FMT(Ft format, Ts ... args) { return __FMT<Ft, Ts...>(format, args...); }
#endif

View File

@@ -2,8 +2,6 @@
#include <map>
#include <string>
#include <string.h>
#include <Streaming.h>
#include <ESP8266WiFi.h>
/***
* Allows to store up to 255 different strings with one byte class
@@ -16,11 +14,21 @@ class StringIndexer
std::string str;
uint8_t used=0;
friend class StringIndexer;
#if EPOXY_DUINO
public:
// Workaround to avoid coredump in Indexer::release
// when destroying a Topic after the deletion of
// StringIndexer::strings map (which can occurs only with AUnit,
// never in the ESP itself, because ESP never ends)
// (I hate static vars)
~StringCounter() { used=255; }
#endif
};
public:
using index_t=uint8_t;
static const index_t strToIndex(const char* str, uint8_t len)
static index_t strToIndex(const char* str, uint8_t len)
{
for(auto it=strings.begin(); it!=strings.end(); it++)
{
@@ -30,13 +38,13 @@ class StringIndexer
return it->first;
}
}
for(index_t index=0; index<255; index++)
for(index_t index=1; index; index++)
{
if (strings.find(index)==strings.end())
{
strings[index].str = std::string(str, len);
strings[index].used++;
Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
// Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
return index;
}
}
@@ -66,11 +74,13 @@ class StringIndexer
if (it->second.used == 0)
{
strings.erase(it);
Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
// Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
}
}
}
static uint16_t count() { return strings.size(); }
private:
static std::map<index_t, StringCounter> strings;
};
@@ -89,6 +99,8 @@ class IndexedString
index=StringIndexer::strToIndex(str, len);
}
IndexedString(const std::string& str) : IndexedString(str.c_str(), str.length()) {};
~IndexedString() { StringIndexer::release(index); }
IndexedString& operator=(const IndexedString& source)
@@ -103,9 +115,14 @@ class IndexedString
return i1.index < i2.index;
}
friend bool operator==(const IndexedString& i1, const IndexedString& i2)
{
return i1.index == i2.index;
}
const std::string& str() const { return StringIndexer::str(index); }
const StringIndexer::index_t getIndex() const { return index; }
const StringIndexer::index_t& getIndex() const { return index; }
private:
StringIndexer::index_t index;

View File

@@ -1,6 +1,5 @@
#include "TinyMqtt.h"
#include <sstream>
#include <Streaming.h>
void outstring(const char* prefix, const char*p, uint16_t len)
{
@@ -11,64 +10,218 @@ void outstring(const char* prefix, const char*p, uint16_t len)
}
MqttBroker::MqttBroker(uint16_t port)
: server(port)
{
server = new TcpServer(port);
#ifdef TCP_ASYNC
server->onClient(onClient, this);
#endif
}
MqttCnx::MqttCnx(MqttBroker* parent, WiFiClient& new_client)
: parent(parent),
mqtt_connected(false)
MqttBroker::~MqttBroker()
{
client = new_client ? new WiFiClient(new_client) : nullptr;
clientAlive();
while(clients.size())
{
delete clients[0];
}
delete server;
}
MqttCnx::~MqttCnx()
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
: parent(parent)
{
#ifdef TCP_ASYNC
client = new_client;
client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
#endif
alive = millis()+5000; // client expires after 5s if no CONNECT msg
}
MqttClient::MqttClient(MqttBroker* parent, const std::string& id)
: parent(parent), clientId(id)
{
client = nullptr;
if (parent) parent->addClient(this);
}
MqttClient::~MqttClient()
{
close();
delete client;
}
void MqttCnx::close()
void MqttClient::close(bool bSendDisconnect)
{
debug("close " << id().c_str());
mqtt_connected = false;
if (client)
{
if (bSendDisconnect and client->connected())
{
message.create(MqttMessage::Type::Disconnect);
message.sendTo(this);
}
client->stop();
delete client;
client = nullptr;
}
if (parent)
{
parent->removeClient(this);
parent=nullptr;
}
}
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{
debug("cnx: closing");
keep_alive = ka;
close();
if (client) delete client;
client = new TcpClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
#ifdef TCP_ASYNC
client->onData(onData, this);
client->onConnect(onConnect, this);
client->connect(broker.c_str(), port);
#else
if (client->connect(broker.c_str(), port))
{
onConnect(this, client);
}
#endif
}
void MqttBroker::addClient(MqttClient* client)
{
clients.push_back(client);
}
void MqttBroker::connect(const std::string& host, uint16_t port)
{
if (broker == nullptr) broker = new MqttClient;
broker->connect(host, port);
broker->parent = this; // Because connect removed the link
}
void MqttBroker::removeClient(MqttClient* remove)
{
for(auto it=clients.begin(); it!=clients.end(); it++)
{
auto client=*it;
if (client==remove)
{
// TODO if this broker is connected to an external broker
// we have to unsubscribe remove's topics.
// (but doing this, check that other clients are not subscribed...)
// Unless -> we could receive useless messages
// -> we are using (memory) one IndexedString plus its string for nothing.
debug("Remove " << clients.size());
clients.erase(it);
debug("Client removed " << clients.size());
return;
}
}
debug("Error cannot remove client"); // TODO should not occur
}
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
{
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client));
debug("New client");
}
void MqttBroker::loop()
{
WiFiClient client = server.available();
#ifndef TCP_ASYNC
WiFiClient client = server->available();
if (client)
{
clients.push_back(new MqttCnx(this, client));
Serial << "New client (" << clients.size() << ')' << endl;
onClient(this, &client);
}
#endif
if (broker)
{
// TODO should monitor broker's activity.
// 1 When broker disconnect and reconnect we have to re-subscribe
broker->loop();
}
for(auto it=clients.begin(); it!=clients.end(); it++)
// for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop
for(size_t i=0; i<clients.size(); i++)
{
auto client=*it;
if(client->connected())
auto client = clients[i];
if (client->connected())
{
client->loop();
}
else
{
Serial << "Client " << client->id().c_str() << " Disconnected" << endl;
clients.erase(it);
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent);
// Note: deleting a client not added by the broker itself will probably crash later.
delete client;
break;
}
}
}
void MqttBroker::publish(const Topic& topic, MqttMessage& msg)
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
if (broker && broker->connected())
{
return broker->subscribe(topic, qos);
}
return MqttNowhereToSend;
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
{
MqttError retval = MqttOk;
debug("publish ");
int i=0;
for(auto client: clients)
client->publish(topic, msg);
{
i++;
#ifdef TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false;
if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
{
// ext_broker -> clients or clients -> ext_broker
if (source == broker) // external broker -> internal clients
doit = true;
else // external clients -> this broker
{
// As this broker is connected to another broker, simply forward the msg
MqttError ret = broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret;
}
}
else // Disconnected
{
doit = true;
}
#ifdef TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' ';
#endif
if (doit) retval = client->publishIfSubscribed(topic, msg);
debug("");
}
return retval;
}
bool MqttBroker::compareString(
@@ -81,84 +234,225 @@ bool MqttBroker::compareString(
return *good==0;
}
void MqttMessage::getString(char* &buffer, uint16_t& len)
void MqttMessage::getString(const char* &buff, uint16_t& len)
{
len = (buffer[0]<<8)|(buffer[1]);
buffer+=2;
len = (buff[0]<<8)|(buff[1]);
buff+=2;
}
void MqttCnx::clientAlive()
void MqttClient::clientAlive(uint32_t more_seconds)
{
if (keep_alive)
{
alive=millis()+1000*(keep_alive+5);
alive=millis()+1000*(keep_alive+more_seconds);
}
else
alive=0;
}
void MqttCnx::loop()
void MqttClient::loop()
{
if (alive && (millis() > alive))
{
Serial << "timeout client" << endl;
close();
}
if (parent)
{
debug("timeout client");
close();
debug("closed");
}
else if (client && client->connected())
{
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((const char*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local broker
// there is no need to send one PingReq per instance.
}
}
#ifndef TCP_ASYNC
while(client && client->available()>0)
{
message.incoming(client->read());
if (message.type())
{
processMessage();
processMessage(&message);
message.reset();
}
}
#endif
}
void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
{
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
debug("cnx: connecting");
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
msg.add(0x00); // keep_alive
msg.add((char)mqtt->keep_alive);
msg.add(mqtt->clientId);
debug("cnx: mqtt connecting");
msg.sendTo(mqtt);
msg.reset();
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent);
mqtt->clientAlive(0);
}
#ifdef TCP_ASYNC
void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len)
{
char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr);
while(len>0)
{
client->message.incoming(*char_ptr++);
if (client->message.type())
{
client->processMessage(&client->message);
client->message.reset();
}
len--;
}
}
#endif
void MqttClient::resubscribe()
{
// TODO resubscription limited to 256 bytes
if (subscriptions.size())
{
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
for(auto topic: subscriptions)
{
msg.add(topic);
msg.add(0); // TODO qos
}
msg.sendTo(this); // TODO return value
}
}
void MqttCnx::processMessage()
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
std::string error;
std::string s;
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl;
auto header = message.getVHeader();
char* payload;
debug("subsribe(" << topic.c_str() << ")");
MqttError ret = MqttOk;
subscriptions.insert(topic);
if (parent==nullptr) // remote broker
{
return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
}
else
{
return parent->subscribe(topic, qos);
}
return ret;
}
MqttError MqttClient::unsubscribe(Topic topic)
{
auto it=subscriptions.find(topic);
if (it != subscriptions.end())
{
subscriptions.erase(it);
if (parent==nullptr) // remote broker
{
return sendTopic(topic, MqttMessage::Type::UnSubscribe, 0);
}
}
return MqttOk;
}
MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos)
{
MqttMessage msg(type, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic);
msg.add(qos);
// TODO instead we should wait (state machine) for SUBACK / UNSUBACK ?
return msg.sendTo(this);
}
long MqttClient::counter=0;
void MqttClient::processMessage(const MqttMessage* mesg)
{
counter++;
#ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming");
}
#endif
auto header = mesg->getVHeader();
const char* payload;
uint16_t len;
bool bclose=true;
switch(message.type() & 0XF0)
switch(mesg->type() & 0XF0)
{
case MqttMessage::Type::Connect:
if (mqtt_connected) break;
if (mqtt_connected)
{
debug("already connected");
break;
}
payload = header+10;
flags = header[7];
mqtt_flags = header[7];
keep_alive = (header[8]<<8)|(header[9]);
if (strncmp("MQTT", header+2,4)) break;
if (header[6]!=0x04) break; // Level 3.1.1
if (strncmp("MQTT", header+2,4))
{
debug("bad mqtt header");
break;
}
if (header[6]!=0x04)
{
debug("unknown level");
break; // Level 3.1.1
}
// ClientId
message.getString(payload, len);
mesg->getString(payload, len);
clientId = std::string(payload, len);
payload += len;
if (flags & FlagWill) // Will topic
if (mqtt_flags & FlagWill) // Will topic
{
message.getString(payload, len); // Will Topic
mesg->getString(payload, len); // Will Topic
outstring("WillTopic", payload, len);
payload += len;
message.getString(payload, len); // Will Message
mesg->getString(payload, len); // Will Message
outstring("WillMessage", payload, len);
payload += len;
}
if (flags & FlagUserName)
// FIXME forgetting credential is allowed (security hole)
if (mqtt_flags & FlagUserName)
{
message.getString(payload, len);
mesg->getString(payload, len);
if (!parent->checkUser(payload, len)) break;
payload += len;
}
if (flags & FlagPassword)
if (mqtt_flags & FlagPassword)
{
message.getString(payload, len);
mesg->getString(payload, len);
if (!parent->checkPassword(payload, len)) break;
payload += len;
}
@@ -166,54 +460,122 @@ void MqttCnx::processMessage()
Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl;
bclose = false;
mqtt_connected=true;
// Reuse received msg
message.create(MqttMessage::Type::Connack);
message.add(0); // Session present (not implemented)
message.add(0); // Connection accepted
message.sendTo(this);
{
MqttMessage msg(MqttMessage::Type::ConnAck);
msg.add(0); // Session present (not implemented)
msg.add(0); // Connection accepted
msg.sendTo(this);
}
break;
case MqttMessage::Type::ConnAck:
mqtt_connected = true;
bclose = false;
resubscribe();
break;
case MqttMessage::Type::SubAck:
case MqttMessage::Type::PubAck:
if (!mqtt_connected) break;
// Ignore acks
bclose = false;
break;
case MqttMessage::Type::PingResp:
// TODO: no PingResp is suspicious (server dead)
bclose = false;
break;
case MqttMessage::Type::PingReq:
message.create(MqttMessage::Type::PingResp);
message.add(0);
message.sendTo(this);
bclose = false;
if (!mqtt_connected) break;
if (client)
{
uint16_t pingreq = MqttMessage::Type::PingResp;
client->write((const char*)(&pingreq), 2);
bclose = false;
}
else
{
debug("internal pingreq ?");
}
break;
case MqttMessage::Type::Subscribe:
if (!mqtt_connected) break;
payload = header+2;
message.getString(payload, len); // Topic
outstring("Subscribes", payload, len);
case MqttMessage::Type::UnSubscribe:
{
if (!mqtt_connected) break;
payload = header+2;
subscriptions.insert(Topic(payload, len));
debug("un/subscribe loop");
while(payload < mesg->end())
{
mesg->getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring(" un/subscribes", payload, len);
// subscribe(Topic(payload, len));
Topic topic(payload, len);
payload += len;
if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe)
{
uint8_t qos = *payload++;
if (qos != 0) debug("Unsupported QOS" << qos << endl);
subscriptions.insert(topic);
}
else
{
auto it=subscriptions.find(topic);
if (it != subscriptions.end())
subscriptions.erase(it);
}
}
debug("end loop");
bclose = false;
// TODO SUBACK
}
break;
case MqttMessage::Type::UnSuback:
if (!mqtt_connected) break;
bclose = false;
// TODO SUBACK
break;
case MqttMessage::Type::Publish:
if (!mqtt_connected) break;
if (mqtt_connected or client == nullptr)
{
uint8_t qos = message.type() & 0x6;
uint8_t qos = mesg->type() & 0x6;
payload = header;
message.getString(payload, len);
mesg->getString(payload, len);
Topic published(payload, len);
payload += len;
len=message.end()-payload;
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl;
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
if (qos) payload+=2; // ignore packet identifier if any
len=mesg->end()-payload;
// TODO reset DUP
// TODO reset RETAIN
parent->publish(published, message);
// TODO should send PUBACK
if (client==nullptr) // internal MqttClient receives publish
{
if (callback and isSubscribedTo(published))
{
callback(this, published, payload, len); // TODO send the real payload
}
}
else if (parent) // from outside to inside
{
debug("publishing to parent");
parent->publish(this, published, *mesg);
}
bclose = false;
}
break;
case MqttMessage::Type::PubAck:
case MqttMessage::Type::Disconnect:
// TODO should discard any will msg
if (!mqtt_connected) break;
bclose = false;
mqtt_connected = false;
close(false);
bclose=false;
break;
default:
@@ -222,16 +584,16 @@ void MqttCnx::processMessage()
};
if (bclose)
{
Serial << "*************** Error msg 0x" << _HEX(message.type());
if (error.length()) Serial << ':' << error.c_str();
Serial << "*************** Error msg 0x" << _HEX(mesg->type());
mesg->hexdump("-------ERROR ------");
dump();
Serial << endl;
close();
}
else
{
clientAlive();
clientAlive(parent ? 5 : 0);
}
message.reset();
}
bool Topic::matches(const Topic& topic) const
@@ -241,29 +603,61 @@ bool Topic::matches(const Topic& topic) const
return false;
}
void MqttCnx::publish(const Topic& topic, MqttMessage& msg)
// publish from local client
MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{
for(const auto& subscription: subscriptions)
MqttMessage msg(MqttMessage::Publish);
msg.add(topic);
msg.add(payload, pay_length, false);
msg.complete();
if (parent)
{
if (subscription.matches(topic))
return parent->publish(this, topic, msg);
}
else if (client)
return msg.sendTo(this);
else
return MqttNowhereToSend;
}
// republish a received publish if it matches any in subscriptions
MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg)
{
MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size());
if (isSubscribedTo(topic))
{
if (client)
retval = msg.sendTo(this);
else
{
// Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl;
msg.sendTo(this);
processMessage(&msg);
// callback(this, topic, nullptr, 0); // TODO Payload
}
}
return retval;
}
bool MqttClient::isSubscribedTo(const Topic& topic) const
{
for(const auto& subscription: subscriptions)
if (subscription.matches(topic))
return true;
return false;
}
void MqttMessage::reset()
{
curr=buffer;
*curr=0; // Type Unknown
buffer.clear();
state=FixedHeader;
size=0;
}
void MqttMessage::incoming(char in_byte)
{
*curr++ = in_byte;
buffer += in_byte;
switch(state)
{
case FixedHeader:
@@ -278,11 +672,18 @@ void MqttMessage::incoming(char in_byte)
}
else if ((in_byte & 0x80) == 0)
{
vheader = curr;
vheader = buffer.length();
if (size==0)
state = Complete;
else if (size > 500) // TODO magic
{
state = Error;
}
else
{
buffer.reserve(size);
state = VariableHeader;
}
}
break;
case VariableHeader:
@@ -299,19 +700,30 @@ void MqttMessage::incoming(char in_byte)
break;
case Complete:
default:
curr--;
Serial << "Spurious " << _HEX(in_byte) << endl;
state = Error;
hexdump("spurious");
reset();
break;
}
if (curr-buffer > 250)
if (buffer.length() > MaxBufferLength) // TODO magic 256 ?
{
Serial << "Too much incoming bytes." << endl;
curr=buffer;
debug("Too long " << state);
reset();
}
}
void MqttMessage::encodeLength(char* msb, int length)
void MqttMessage::add(const char* p, size_t len, bool addLength)
{
if (addLength)
{
buffer.reserve(buffer.length()+addLength+2);
incoming(len>>8);
incoming(len & 0xFF);
}
while(len--) incoming(*p++);
}
void MqttMessage::encodeLength(char* msb, int length) const
{
do
{
@@ -322,31 +734,64 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length);
};
void MqttMessage::sendTo(MqttCnx* client)
void MqttMessage::complete()
{
if (curr-buffer-2 >= 0)
encodeLength(&buffer[1], buffer.size()-2);
state = Complete;
}
MqttError MqttMessage::sendTo(MqttClient* client) const
{
if (buffer.size())
{
encodeLength(buffer+1, curr-buffer-2);
debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2);
// hexdump("snd");
client->write(buffer, curr-buffer);
client->write(&buffer[0], buffer.size());
}
else
{
Serial << "??? Invalid send" << endl;
Serial << (long)end() << "-" << (long)buffer << endl;
debug("??? Invalid send");
return MqttInvalidMessage;
}
return MqttOk;
}
void MqttMessage::hexdump(const char* prefix) const
{
if (prefix) Serial << prefix << ' ';
Serial << (long)buffer << "-" << (long)curr << " : ";
const char* p=buffer;
while(p!=curr)
uint16_t addr=0;
const int bytes_per_row = 8;
const char* hex_to_str = " | ";
const char* separator = hex_to_str;
const char* half_sep = " - ";
std::string ascii;
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
for(const char chr: buffer)
{
if ((addr % bytes_per_row) == 0)
{
if (*p<16) Serial << '0';
Serial << _HEX(*p) << ' ';
p++;
if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
if (prefix) Serial << prefix << separator;
ascii.clear();
}
Serial << endl;
addr++;
if (chr<16) Serial << '0';
Serial << _HEX(chr) << ' ';
ascii += (chr<32 ? '.' : chr);
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
}
if (ascii.length())
{
while(ascii.length() < bytes_per_row+strlen(half_sep))
{
Serial << " "; // spaces per hexa byte
ascii += ' ';
}
Serial << hex_to_str << ascii << separator;
}
Serial << endl;
}

View File

@@ -1,34 +1,88 @@
#include <ESP8266WiFi.h>
#pragma once
// TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC
// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
#if defined(ESP8266) || defined(EPOXY_DUINO)
#ifdef TCP_ASYNC
#include <ESPAsyncTCP.h>
#else
#include <ESP8266WiFi.h>
#endif
#elif defined(ESP32)
#ifdef TCP_ASYNC
#include <AsyncTCP.h> // https://github.com/me-no-dev/AsyncTCP
#else
#include <WiFi.h>
#endif
#else
#error "Unsupported platform"
#endif
#ifdef EPOXY_DUINO
#define dbg_ptr uint64_t
#else
#define dbg_ptr uint32_t
#endif
#include <vector>
#include <set>
#include <string>
#include "StringIndexer.h"
#include <MqttStreaming.h>
#define MaxBufferLength 255
// #define TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
#ifdef TCP_ASYNC
using TcpClient = AsyncClient;
using TcpServer = AsyncServer;
#else
using TcpClient = WiFiClient;
using TcpServer = WiFiServer;
#endif
enum MqttError
{
MqttOk = 0,
MqttNowhereToSend=1,
MqttInvalidMessage=2,
};
class Topic : public IndexedString
{
public:
Topic(const char* s, uint8_t len) : IndexedString(s,len){}
Topic(const char* s) : Topic(s, strlen(s)) {}
Topic(const std::string s) : Topic(s.c_str(), s.length()){};
const char* c_str() const { return str().c_str(); }
bool matches(const Topic&) const;
};
class MqttCnx;
class MqttClient;
class MqttMessage
{
const uint16_t MaxBufferLength = 255;
public:
enum Type
{
Unknown = 0,
Connect = 0x10,
Connack = 0x20,
Publish = 0x30,
PubAck = 0x40,
Subscribe = 0x80,
PingReq = 0xC0,
PingResp = 0xD0,
Unknown = 0,
Connect = 0x10,
ConnAck = 0x20,
Publish = 0x30,
PubAck = 0x40,
Subscribe = 0x80,
SubAck = 0x90,
UnSubscribe = 0xA0,
UnSuback = 0xB0,
PingReq = 0xC0,
PingResp = 0xD0,
Disconnect = 0xE0
};
enum State
{
@@ -42,19 +96,22 @@ class MqttMessage
};
MqttMessage() { reset(); }
MqttMessage(Type t) { create(t); }
MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
void incoming(char byte);
void add(char byte) { incoming(byte); }
char* getVHeader() const { return vheader; }
char* end() const { return curr; }
uint16_t length() const { return curr-buffer; }
void add(const char* p, size_t len, bool addLength=true );
void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); }
const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return buffer.size(); }
void complete();
void reset();
// buff is MSB/LSB/STRING
// output buff+=2, len=length(str)
void getString(char* &buff, uint16_t& len);
static void getString(const char* &buff, uint16_t& len);
Type type() const
{
@@ -63,98 +120,189 @@ class MqttMessage
void create(Type type)
{
buffer[0]=type;
curr=buffer+2;
vheader=curr;
buffer=(char)type;
buffer+='\0'; // reserved for msg length
vheader=2;
size=0;
state=Create;
}
void sendTo(MqttCnx*);
MqttError sendTo(MqttClient*) const;
void hexdump(const char* prefix=nullptr) const;
private:
void encodeLength(char* msb, int length);
void encodeLength(char* msb, int length) const;
char buffer[256]; // TODO 256 ?
char* vheader;
char* curr;
mutable std::string buffer; // mutable -> sendTo()
uint8_t vheader;
uint16_t size; // bytes left to receive
State state;
};
class MqttBroker;
class MqttCnx
class MqttClient
{
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
enum Flags
{
FlagUserName = 128,
FlagPassword = 64,
FlagWillRetain = 32, // unsupported
FlagWillQos = 16 | 8, // unsupported
FlagWill = 4, // unsupported
FlagWill = 4, // unsupported
FlagCleanSession = 2, // unsupported
FlagReserved = 1
};
public:
MqttCnx(MqttBroker* parent, WiFiClient& client);
/** Constructor. If broker is not null, this is the adress of a local broker.
If you want to connect elsewhere, leave broker null and use connect() **/
MqttClient(MqttBroker* broker = nullptr, const std::string& id="");
MqttClient(const std::string& id) : MqttClient(nullptr, id){}
~MqttCnx();
~MqttClient();
bool connected() { return client && client->connected(); }
void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10);
bool connected() { return
(parent!=nullptr and client==nullptr) or
(client and client->connected()); }
void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); }
const std::string& id() const { return clientId; }
void id(std::string& new_id) { clientId = new_id; }
/** Should be called in main loop() */
void loop();
void close();
void publish(const Topic& topic, MqttMessage& msg);
void close(bool bSendDisconnect=true);
void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world
MqttError publish(const Topic&, const char* payload, size_t pay_length);
MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); }
MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
MqttError subscribe(Topic topic, uint8_t qos=0);
MqttError unsubscribe(Topic topic);
bool isSubscribedTo(const Topic& topic) const;
// connected to local broker
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
void dump()
{
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") " << (connected() ? " ON " : " OFF");
Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive;
Serial << (client && client->connected() ? "" : "dis") << "connected";
message.hexdump("entrant msg");
bool c=false;
Serial << " [";
for(auto s: subscriptions)
{
Serial << (c?", ": "")<< s.str().c_str();
c=true;
}
Serial << "]" << endl;
}
/** Count the number of messages that have been sent **/
static long counter;
private:
void clientAlive();
void processMessage();
static void onConnect(void * client_ptr, TcpClient*);
#ifdef TCP_ASYNC
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
#endif
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe();
char flags;
uint32_t keep_alive;
friend class MqttBroker;
MqttClient(MqttBroker* parent, TcpClient* client);
// republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
void clientAlive(uint32_t more_seconds);
void processMessage(const MqttMessage* message);
bool mqtt_connected = false;
char mqtt_flags;
uint32_t keep_alive = 60;
uint32_t alive;
bool mqtt_connected;
WiFiClient* client;
MqttMessage message;
MqttBroker* parent;
// TODO having a pointer on MqttBroker may produce larger binaries
// due to unecessary function linked if ever parent is not used
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker
TcpClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions;
std::string clientId;
};
class MqttClient
{
public:
MqttClient(IPAddress broker) : broker_ip(broker) {}
protected:
IPAddress broker_ip;
CallBack callback = nullptr;
};
class MqttBroker
{
enum State
{
Disconnected, // Also the initial state
Connecting, // connect and sends a fake publish to avoid circular cnx
Connected, // this->broker is connected and circular cnx avoided
};
public:
// TODO limit max number of clients
MqttBroker(uint16_t port);
~MqttBroker();
void begin() { server.begin(); }
void begin() { server->begin(); }
void loop();
void connect(const std::string& host, uint16_t port=1883);
bool connected() const { return state == Connected; }
size_t clientsCount() const { return clients.size(); }
void dump()
{
Serial << clients.size() << " client/s" << endl;
for(auto client: clients)
{
Serial << " ";
client->dump();
}
}
private:
friend class MqttClient;
static void onClient(void*, TcpClient*);
bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); }
bool checkPassword(const char* password, uint8_t len) const
{ return compareString(auth_password, password, len); }
void publish(const Topic& topic, MqttMessage& msg);
private:
MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const;
MqttError subscribe(const Topic& topic, uint8_t qos);
// For clients that are added not by the broker itself
void addClient(MqttClient* client);
void removeClient(MqttClient* client);
bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttCnx*> clients;
WiFiServer server;
std::vector<MqttClient*> clients;
TcpServer* server;
const char* auth_user = "guest";
const char* auth_password = "guest";
State state = Disconnected;
MqttClient* broker = nullptr;
};

2
src/my_credentials.h Normal file
View File

@@ -0,0 +1,2 @@
const char *ssid = "YOUR-SSID-HERE";
const char *password = "YOUR-PASSWORD-HERE";

20
tests/Makefile Normal file
View File

@@ -0,0 +1,20 @@
tests:
set -e; \
for i in *-tests/Makefile; do \
echo '==== Making:' $$(dirname $$i); \
$(MAKE) -C $$(dirname $$i) -j; \
done
runtests: tests
set -e; \
for i in *-tests/Makefile; do \
echo '==== Running:' $$(dirname $$i); \
$$(dirname $$i)/$$(dirname $$i).out; \
done
clean:
set -e; \
for i in *-tests/Makefile; do \
echo '==== Cleaning:' $$(dirname $$i); \
$(MAKE) -C $$(dirname $$i) clean; \
done

View File

@@ -0,0 +1,7 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := local-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
ESP_LIBS = ESP8266WiFi ESPAsyncTCP
include ../../../EspMock/EspMock.mk

View File

@@ -0,0 +1,152 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt local unit tests.
*
* Clients are connected to pseudo remote broker
* The remote should be 127.0.0.1:1883 <--- But this does not work due to Esp network limitations
* We are using 127.0.0.1 because this is simpler to test with a single ESP
* Also, this will allow to mock and thus run Action on github
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
}
test(local_client_should_unregister_when_destroyed)
{
return;
assertEqual(broker.clientsCount(), (size_t)0);
{
MqttClient client;
assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected
client.connect("127.0.0.1", 1883);
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected
}
assertEqual(broker.clientsCount(), (size_t)0);
}
#if 0
test(local_connect)
{
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient client;
assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1);
}
test(local_publish_should_be_dispatched)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber;
subscriber.subscribe("a/b");
subscriber.subscribe("a/c");
subscriber.setCallback(onPublish);
MqttClient publisher;
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something
assertTrue(published[""]["a/b"] == 1);
assertTrue(published[""]["a/c"] == 2);
}
test(local_publish_should_be_dispatched_to_local_clients)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber_a("A");
subscriber_a.setCallback(onPublish);
subscriber_a.subscribe("a/b");
subscriber_a.subscribe("a/c");
MqttClient subscriber_b("B");
subscriber_b.setCallback(onPublish);
subscriber_b.subscribe("a/b");
MqttClient publisher;
publisher.publish("a/b");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertTrue(published["A"]["a/b"] == 1);
assertTrue(published["A"]["a/c"] == 1);
assertTrue(published["B"]["a/b"] == 1);
assertTrue(published["B"]["a/c"] == 0);
}
test(local_unsubscribe)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber;
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher;
publisher.publish("a/b");
subscriber.unsubscribe("a/b");
publisher.publish("a/b");
publisher.publish("a/b");
assertTrue(published[""]["a/b"] == 1); // Only one publish has been received
}
test(local_nocallback_when_destroyed)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient publisher;
{
MqttClient subscriber;
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
publisher.publish("a/b");
}
publisher.publish("a/b");
assertEqual(published.size(), (size_t)1); // Only one publish has been received
}
#endif
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ NO WIFI CONNECTION TinyMqtt TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}

View File

@@ -0,0 +1,7 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := nowifi-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
ESP_LIBS = ESP8266WiFi ESPAsyncTCP
include ../../../EspMock/EspMock.mk

View File

@@ -0,0 +1,166 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt nowifi unit tests.
*
* No wifi connection unit tests.
* Checks with a local broker. Clients must connect to the local broker
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
char* lastPayload = nullptr;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
if (lastPayload) free(lastPayload);
lastPayload = strdup(payload);
lastLength = length;
}
test(nowifi_client_should_unregister_when_destroyed)
{
assertEqual(broker.clientsCount(), (size_t)0);
{
MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1);
}
assertEqual(broker.clientsCount(), (size_t)0);
}
test(nowifi_connect)
{
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient client(&broker);
assertTrue(client.connected());
assertEqual(broker.clientsCount(), (size_t)1);
}
test(nowifi_publish_should_be_dispatched)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber(&broker);
subscriber.subscribe("a/b");
subscriber.subscribe("a/c");
subscriber.setCallback(onPublish);
MqttClient publisher(&broker);
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something
assertEqual(published[""]["a/b"], 1);
assertEqual(published[""]["a/c"], 2);
}
test(nowifi_publish_should_be_dispatched_to_clients)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber_a(&broker, "A");
subscriber_a.setCallback(onPublish);
subscriber_a.subscribe("a/b");
subscriber_a.subscribe("a/c");
MqttClient subscriber_b(&broker, "B");
subscriber_b.setCallback(onPublish);
subscriber_b.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b"); // A and B should receive this
publisher.publish("a/c"); // A should receive this
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertEqual(published["A"]["a/b"], 1);
assertEqual(published["A"]["a/c"], 1);
assertEqual(published["B"]["a/b"], 1);
assertEqual(published["B"]["a/c"], 0);
}
test(nowifi_unsubscribe)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b"); // This publish is received
subscriber.unsubscribe("a/b");
publisher.publish("a/b"); // Those one, no (unsubscribed)
publisher.publish("a/b");
assertEqual(published[""]["a/b"], 1); // Only one publish has been received
}
test(nowifi_nocallback_when_destroyed)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
MqttClient publisher(&broker);
{
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
publisher.publish("a/b");
}
publisher.publish("a/b");
assertEqual(published.size(), (size_t)1); // Only one publish has been received
}
test(nowifi_payload_nullptr)
{
published.clear();
const char* payload="abcd";
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b", payload, strlen(payload)); // This publish is received
// coming from MqttClient::publish(...)
assertEqual(payload, lastPayload);
assertEqual(lastLength, (size_t)4);
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ NO WIFI CONNECTION TinyMqtt TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}

6
tests/result.json Normal file
View File

@@ -0,0 +1,6 @@
{
"schemaVersion" : 1,
"label" : "tests",
"message" : "Message content",
"color": "red"
}

2
tests/result.yaml Normal file
View File

@@ -0,0 +1,2 @@
result: 1
insert: "passed"

View File

@@ -0,0 +1,7 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := string-indexer-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
ESP_LIBS = ESP8266WiFi ESPAsyncTCP
include ../../../EspMock/EspMock.mk

View File

@@ -0,0 +1,109 @@
#include <AUnit.h>
#include <StringIndexer.h>
#include <map>
/**
* TinyMqtt / StringIndexer unit tests.
*
**/
using namespace std;
test(indexer_empty)
{
assertEqual(StringIndexer::count(), 0);
}
test(indexer_strings_deleted_should_empty_indexer)
{
assertTrue(StringIndexer::count()==0);
{
IndexedString one("one");
assertEqual(StringIndexer::count(), 1);
IndexedString two("two");
assertEqual(StringIndexer::count(), 2);
IndexedString three("three");
assertEqual(StringIndexer::count(), 3);
IndexedString four("four");
assertEqual(StringIndexer::count(), 4);
}
assertEqual(StringIndexer::count(), 0);
}
test(indexer_same_strings_count_as_one)
{
IndexedString one ("one");
IndexedString two ("one");
IndexedString three("one");
IndexedString fourt("one");
assertEqual(StringIndexer::count(), 1);
}
test(indexer_size_of_indexed_string)
{
assertEqual(sizeof(IndexedString), (size_t)1);
}
test(indexer_different_strings_are_different)
{
IndexedString one("one");
IndexedString two("two");
assertFalse(one == two);
}
test(indexer_same_strings_should_equal)
{
IndexedString one("one");
IndexedString two("one");
assertTrue(one == two);
}
test(indexer_indexed_operator_eq)
{
IndexedString one("one");
{
IndexedString same = one;
assertTrue(one == same);
assertEqual(StringIndexer::count(), 1);
}
assertEqual(StringIndexer::count(), 1);
}
test(indexer_get_string)
{
std::string sone("one");
IndexedString one(sone);
assertTrue(sone==one.str());
}
test(indexer_get_index)
{
IndexedString one1("one");
IndexedString one2("one");
IndexedString two1("two");
IndexedString two2("two");
assertTrue(one1.getIndex() == one2.getIndex());
assertTrue(two1.getIndex() == two2.getIndex());
assertTrue(one1.getIndex() != two1.getIndex());
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ TinyMqtt StringIndexer TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}