Compare commits

...

51 Commits

Author SHA1 Message Date
hsaturn
fab242e212 Version 0.7.6 2021-08-09 03:04:21 +02:00
hsaturn
56a2be621f Fix message length error 2021-08-09 03:00:28 +02:00
hsaturn
c4cf39ab68 Fix aunit tests 2021-08-09 02:33:01 +02:00
hsaturn
90dea36ab0 Attempt to write more readable examples 2021-05-13 03:21:03 +02:00
hsaturn
25a721e06a Attempt to write more readable examples 2021-05-13 03:21:03 +02:00
hsaturn
b228d35ab0 Attempt to write more readable examples 2021-05-13 03:21:03 +02:00
hsaturn
c70716a595 Attempt to write more readable examples 2021-05-13 03:21:03 +02:00
hsaturn
8d5cad5fec Added overload of Client::publish, fix compilation 2021-05-13 03:21:03 +02:00
hsaturn
d5430228e5 Fix cr/lf bug
Added echo on|off for later vt100 terminal
Added rnd(min,max) function
Simplied every function
Better parsing for getword (accept strings)
Fix for blink command
2021-05-13 03:21:03 +02:00
hsaturn
91c1f96146 Update README.md
links updates
2021-05-08 15:14:59 +02:00
hsaturn
f04a2a07c0 Fix aunit.yml 2021-04-28 21:22:07 +00:00
hsaturn
38f306c170 Fix compilation esp8266 esp32 2021-04-28 23:16:33 +02:00
hsaturn
024e80c9dc Perpare for future ESP32 aunit tests 2021-04-28 21:00:31 +00:00
hsaturn
2249ddef7f Release 0.7.5 2021-04-28 18:57:24 +02:00
hsaturn
e193929f8f changed every command 2021-04-28 18:56:17 +02:00
hsaturn
e00e31de33 Fix build in Esp8266 mode
Modify dump() functions
2021-04-28 18:55:57 +02:00
hsaturn
20292b7b7b Fix example syntax 2021-04-28 08:24:44 +02:00
hsaturn
26de3befa8 TinyTest Esp32 port
Update library definition
2021-04-28 08:19:55 +02:00
hsaturn
1098466055 Update README.md 2021-04-28 08:14:46 +02:00
hsaturn
2d3663e78c Update README.md 2021-04-28 07:32:56 +02:00
hsaturn
5e16282ad0 Disable ci.yml 2021-04-21 01:10:35 +02:00
hsaturn
e35a43c4a4 Trying to remove unsupported platform that break ci.yml 2021-04-21 01:05:31 +02:00
hsaturn
087a203ba0 Create ci.yml 2021-04-21 00:56:49 +02:00
hsaturn
5d313bbf5e Rewrite examples 2021-04-12 00:27:25 +02:00
hsaturn
ce896f02c4 Merge pull request #6 from hsaturn/AsyncAndWifi
AsyncTcp can be activated by removing the command on TCP_ASYNC in TinyMqtt.h
But the code is not bug free yet.
2021-04-11 23:33:29 +02:00
hsaturn
d3210c3c93 Merge branch 'main' into AsyncAndWifi 2021-04-11 23:30:13 +02:00
hsaturn
23f1207718 Lot of new functions for tinytest
- command every allowing to add peridic evaluations
  very usefull for benchmarks and load tests
- on/off command
2021-04-11 23:27:15 +02:00
hsaturn
122ab88960 Rewrite client-with-wifi.ino 2021-04-11 23:26:49 +02:00
hsaturn
28b0ac1611 Fix missing receive loop for mqttclient 2021-04-11 21:21:48 +02:00
hsaturn
1cfb5cfab1 Allow multiple command per line separated by ; 2021-04-11 19:19:06 +02:00
hsaturn
b023cd67a9 Fix AUnit in debug mode / Not async 2021-04-11 17:02:24 +02:00
hsaturn
24ee6b5201 Fixes in WiFiClient mode 2021-04-11 16:33:12 +02:00
hsaturn
2e92a98db2 Trying to fuse togeter Async and not async version 2021-04-11 15:51:33 +02:00
hsaturn
c59bddfd39 Implementation of Unsuback (unless MqttClient disconnects) 2021-04-11 01:58:44 +02:00
hsaturn
7bdb9cc0cd Tinytest, allow to blink output 0 2021-04-11 01:57:58 +02:00
hsaturn
be62699702 Fix connect problem with MqttClient 2021-04-11 00:48:04 +02:00
hsaturn
77da47e1da Update README.md 2021-04-10 18:02:48 +02:00
hsaturn
88797bfafd Added AUnit 2021-04-10 18:02:28 +02:00
hsaturn
1e3b37623d Fix AUnit build 2021-04-10 17:54:46 +02:00
hsaturn
ba6a96976a ESP32 version that could work 2021-04-10 17:47:21 +02:00
hsaturn
6afd3939b3 Merge remote-tracking branch 'origin/AsyncTcp' into main 2021-04-10 17:23:53 +02:00
hsaturn
2ffe0c13fa README.md update 2021-04-10 17:23:46 +02:00
hsaturn
48eb0daf9a Fix bug in unsubscription list 2021-04-10 17:16:25 +02:00
hsaturn
34c05bc37a Fix compilation in DEBUG mode 2021-04-10 17:16:25 +02:00
hsaturn
7c96c4a5cc Fix warning 2021-04-10 17:16:25 +02:00
hsaturn
b280196395 Added mqDns to tinytest 2021-04-10 17:16:25 +02:00
hsaturn
c75f4893e8 AsyncTcp
AsyncTcp
2021-04-10 17:16:25 +02:00
hsaturn
d666f6a53b AsyncTCP (to be continued) 2021-04-10 17:16:25 +02:00
hsaturn
7ef18de755 Very promising async commit 2021-04-10 17:16:25 +02:00
hsaturn
838df3a34a Added unsubscribe to tinytest 2021-04-10 17:15:21 +02:00
hsaturn
8a25155fd8 Bad merge fix 2021-04-10 16:06:45 +02:00
16 changed files with 871 additions and 450 deletions

View File

@@ -24,5 +24,11 @@ jobs:
git clone https://github.com/hsaturn/EspMock
- name: Verify tests
run: |
make -C tests
# Run tests for ESP8266
make -C tests ESP_LIBS="ESP8266WiFi ESPAsyncTCP" tests
make -C tests runtests
make -C tests clean
# Run tests for ESP32
#make -C tests ESP_LIBS="ESP8266WiFi ESPAsyncTCP" tests
#make -C tests runtests
#make -C tests clean

18
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,18 @@
name: "CI"
on:
jobs:
ci:
runs-on: ubuntu-20.04
steps:
- name: Checkout this repository
uses: actions/checkout@v2.3.4
- name: Cache for arduino-ci
uses: actions/cache@v2.1.3
with:
path: |
~/.arduino15
key: ${{ runner.os }}-arduino
- name: Install nix
uses: cachix/install-nix-action@v12
- run: nix-shell -I nixpkgs=channel:nixpkgs-unstable -p arduino-ci --run "arduino-ci"

View File

@@ -1,17 +1,19 @@
# TinyMqtt
![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![Esp8266](https://img.shields.io/badge/platform-ESP8266-green)
![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![Mqtt 3.1.1](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
[![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)](https://github.com/hsaturn/TinyMqtt/releases)
[![AUnit Tests](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml/badge.svg)](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml)
[![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt)](https://github.com/hsaturn/TinyMqtt/issues)
[![Esp8266](https://img.shields.io/badge/platform-ESP8266-green)](https://www.espressif.com/en/products/socs/esp8266)
[![Esp32](https://img.shields.io/badge/platform-ESP32-green)](https://www.espressif.com/en/products/socs/esp32)
[![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt)](https://www.gnu.org/licenses/gpl-3.0.fr.html)
[![Mqtt 3.1.1](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html#_Toc442180822)
ESP 8266 is a small, fast and capable Mqtt Broker and Client
TinyMqtt is a small, fast and capable Mqtt Broker and Client for Esp8266 / Esp32 / Esp WROOM
## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost.
clients that had subscribed (payload ~15 bytes ESP8266). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported
@@ -48,7 +50,7 @@ After a while one ESP naturally becomes a 'master' and all ESP are connected tog
No problem if the master dies, a new master will be choosen soon.
## TODO List
* Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)
* ~~Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)~~
* Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients)

View File

@@ -3,23 +3,36 @@
/**
* Local broker that accept connections and two local clients
*
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
* | +-------->| broker | | 1883 <--- External client/s
* | | +--------+ |
* | | ^ |
* | | | |
* | | | | -----
* | v v | ---
* | +----------+ +----------+ | -
* | | internal | | internal | +-------* Wifi
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
* - a bit hard to understand
*
*/
#include <my_credentials.h>
const char *ssid = "";
const char *password = "";
std::string topic="sensor/temperature";
@@ -28,17 +41,20 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << endl << "---------> A Received " << topic.c_str() << endl; }
void onPublishA(const MqttClient* /* source */, const Topic& topic, const char* payload, size_t /* length */)
{ Serial << "--> client A received " << topic.c_str() << ", " << payload << endl; }
void onPublishB(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << endl << "---------> B Received " << topic.c_str() << endl; }
void onPublishB(const MqttClient* /* source */, const Topic& topic, const char* payload, size_t /* length */)
{ Serial << "--> client B Received " << topic.c_str() << ", " << payload << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Clients with wifi " << endl;
delay(500);
Serial << "Clients with wifi " << endl;
if (strlen(ssid)==0)
Serial << "****** PLEASE EDIT THE EXAMPLE AND MODIFY ssid/password *************" << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
@@ -47,8 +63,8 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
broker.begin();
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
@@ -58,30 +74,31 @@ void setup()
void loop()
{
broker.loop();
broker.loop(); // Don't forget to add loop for every broker and clients
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
static const int intervalA = 5000; // publishes every 5s (please avoid usage of delay())
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic, " sent by A");
}
// ============= client B publish ================
static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str()));
}
static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
static int temperature;
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic, " sent by B: "+std::string(String(16+temperature++%6).c_str()));
}
}

View File

@@ -1,12 +1,30 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/** TinyMQTT allows a disconnected mode:
*
* In this example, local clients A and B are talking together, no need to be connected.
* A single ESP can use this to be able to comunicate with itself with the power
* of MQTT, and once connected still continue to work with others.
*
*/
*
* +-----------------------------+
* | ESP |
* | +--------+ |
* | +-------->| broker | |
* | | +--------+ |
* | | ^ |
* | | | |
* | v v |
* | +----------+ +----------+ |
* | | internal | | internal | |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* In this example, local clients A and B are talking together, no need to be connected.
*
* A single ESP can use this to be able to comunicate with itself with the power
* of MQTT, and once connected still continue to work with others.
*
* The broker may still be conected if wifi is on.
*
*/
std::string topic="sensor/temperature";
@@ -14,11 +32,11 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishA(const MqttClient* /* srce */, const Topic& topic, const char* payload, size_t /* length */)
{ Serial << "--> Client A received msg on topic " << topic.c_str() << ", " << payload << endl; }
void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void onPublishB(const MqttClient* /* srce */, const Topic& topic, const char* payload, size_t /* length */)
{ Serial << "--> Client B received msg on topic " << topic.c_str() << ", " << payload << endl; }
void setup()
{
@@ -35,7 +53,7 @@ void setup()
void loop()
{
broker.loop();
broker.loop(); // Don't forget to call loop() for all brokers and clients
mqtt_a.loop();
mqtt_b.loop();
@@ -47,7 +65,7 @@ void loop()
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
mqtt_a.publish(topic, "sent by A");
}
// ============= client B publish ================
@@ -58,6 +76,6 @@ void loop()
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic);
mqtt_b.publish(topic, "sent by B");
}
}

View File

@@ -1,14 +1,33 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <my_credentials.h>
#define PORT 1883
MqttBroker broker(PORT);
/** Basic Mqtt Broker
*
* +-----------------------------+
* | ESP |
* | +--------+ |
* | | broker | | 1883 <--- External client/s
* | +--------+ |
* | |
* +-----------------------------+
*
* Your ESP will become a MqttBroker.
* You can test it with any client such as mqtt-spy for example
*
*/
const char* ssid = "";
const char* password = "";
void setup()
{
Serial.begin(115200);
if (strlen(ssid)==0)
Serial << "****** PLEASE MODIFY ssid/password *************" << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);

View File

@@ -1,9 +1,25 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
/** Simple Client
/** Simple Client (The simplest configuration)
*
* This is the simplest Mqtt client configuration
*
* +--------+
* +------>| broker |<--- < Other client
* | +--------+
* |
* +-----------------+
* | ESP | |
* | +----------+ |
* | | internal | |
* | | client | |
* | +----------+ |
* | |
* +-----------------+
*
* 1 - change the ssid/password
* 2 - change BROKER values (or keep emqx.io test broker)
* 3 - you can use mqtt-spy to connect to the same broker and
* see the sensor/temperature updated by the client.
*
* pro - small memory footprint (both ram and flash)
* - very simple to setup and use
@@ -13,7 +29,11 @@
* - local publishes takes more time (because they go outside)
*/
#include <my_credentials.h>
const char* BROKER = "broker.emqx.io";
const uint16_t BROKER_PORT = 1883;
const char* ssid = "";
const char* password = "";
static float temp=19;
static MqttClient client;
@@ -22,7 +42,10 @@ void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Simple clients with wifi " << endl;
if (strlen(ssid)==0)
Serial << "****** PLEASE MODIFY ssid/password *************" << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
@@ -32,21 +55,33 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port
client.connect(BROKER, BROKER_PORT); // Put here your broker ip / port
}
void loop()
{
client.loop();
client.loop(); // Don't forget to call loop() for each broker and client
delay(1000);
// delay(1000); please avoid usage of delay (see below how this done using next_send and millis())
static auto next_send = millis();
if (millis() > next_send)
{
next_send += 1000;
auto rnd=random(100);
if (not client.connected())
{
Serial << millis() << ": Not connected to broker" << endl;
return;
}
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
auto rnd=random(100);
client.publish("sensor/temperature", String(temp));
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
Serial << "--> Publishing a new sensor/temperature value: " << temp << endl;
client.publish("sensor/temperature", String(temp));
}
}

View File

@@ -1,25 +1,32 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h>
#include <ESP8266mDNS.h>
#if defined(ESP8266)
#include <ESP8266mDNS.h>
#elif defined(ESP32)
#include <WiFi.h>
#include <ESPmDNS.h>
#else
#error Unsupported platform
#endif
#include <sstream>
#include <map>
/**
* Console allowing to make any kind of test.
bool echo_on = true;
/** Very complex example
* Console allowing to make any kind of test,
* even some stress tests.
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
* Upload the sketch, the use the terminal.
* Press H for mini help.
*
* tested with mqtt-spy-0.5.4
* TODO examples of scripts
*/
#include <my_credentials.h>
const char* ssid = "";
const char* password = "";
std::string topic="sensor/temperature";
@@ -44,17 +51,32 @@ void setup()
WiFi.persistent(false); // https://github.com/esp8266/Arduino/issues/1054
Serial.begin(115200);
delay(500);
Serial << endl << endl << endl
<< "Connecting to '" << ssid << "' ";
Serial << endl << endl;
Serial << "***************************************************************" << endl;
Serial << "* Welcome to the TinyMqtt console" << endl;
Serial << endl;
Serial << "* The console allows to test all features of the libraries." << endl;
Serial << endl;
if (strlen(ssid)==0)
Serial << "* WARNING: You may want to modify ssid/password in order" << endl
<< " to reflect your Wifi configuration." << endl;
Serial << endl;
Serial << "* Enter help to view the list of commands." << endl;
Serial << "***************************************************************" << endl;
Serial << endl;
Serial << "Connecting to '" << ssid << "' ";
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ Serial << '-'; delay(500); }
{ Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
Serial << "Type help for more..." << endl;
Serial << endl << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
const char* name="tinytest";
Serial << "Starting MDNS, name= " << name;
@@ -69,27 +91,67 @@ void setup()
brokers["broker"] = broker;
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ');
int getint(std::string& str, const int if_empty=0)
{
std::string sword;
while(str.length() && str[0]>='0' && str[0]<='9')
std::string str2=str;
std::string sword = getword(str);
if (sword[0] and isdigit(sword[0]))
{
sword += str[0]; str.erase(0,1);
int ret=atoi(sword.c_str());
while(isdigit(sword[0]) or sword[0]==' ') sword.erase(0,1);
if (sword.length()) str = sword+' '+str;
return ret;
}
while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return atoi(sword.c_str());
str=str2;
return if_empty;
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ')
std::string getword(std::string& str, const char* if_empty/*=nullptr*/, char sep/*=' '*/)
{
char quote=(str[0]=='"' or str[0]=='\'' ? str[0] : 0);
if (quote) str.erase(0,1);
std::string sword;
while(str.length() && str[0]!=sep)
while(str.length() and (str[0]!=sep or quote))
{
sword += str[0]; str.erase(0,1);
if (str[0]==quote)
{
str.erase(0,1);
break;
}
sword += str[0];
str.erase(0,1);
}
while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
if (quote==false and sword.length()>=4 and sword.substr(0,3)=="rnd")
{
sword.erase(0,3);
if (sword[0]=='(')
{
int to = 100;
sword.erase(0,1);
int from=getint(sword);
if (sword[0]==',')
{
sword.erase(0,1);
to = getint(sword);
if (sword[0]!=')') Serial << "Missing ')'" << endl;
}
else
{
to=from;
from=0;
}
return String(random(from,to)).c_str();
}
else
{
Serial << "Missing '('" << endl;
}
}
while(str[0]==' ') str.erase(0,1);
return sword;
}
@@ -117,7 +179,7 @@ std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
return addr;
}
IPAddress local=WiFi.localIP();
addr="";
addr.clear();
while(build.size()!=4)
{
std::stringstream b;
@@ -138,10 +200,10 @@ std::set<std::string> commands = {
"auto", "broker", "blink", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "unsubscribe", "view"
"publish", "reset", "subscribe", "unsubscribe", "view", "echo", "every"
};
void getCommand(std::string& search)
void convertToCommand(std::string& search)
{
while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return;
@@ -161,7 +223,7 @@ void getCommand(std::string& search)
else if (count>1)
{
Serial << "Ambiguous command: " << matches << endl;
search="";
search.clear();
}
}
@@ -177,7 +239,7 @@ void replace(const char* d, std::string& str, std::string srch, std::string to)
{
str.erase(pos, srch.length());
str.insert(pos, to);
pos += to.length();
pos += to.length()-1;
}
}
}
@@ -195,6 +257,7 @@ void replaceVars(std::string& cmd)
}
cmd.erase(0, cmd.find_first_not_of(" "));
cmd.erase(cmd.find_last_not_of(" ")+1);
}
// publish at regular interval
@@ -316,22 +379,457 @@ std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd)
{
if (s.length()==0 or s.length()>strlen(cmd)) return false;
return strncmp(cmd, s.c_str(), s.length())==0;
uint8_t p=0;
while(s[p++]==*cmd++)
{
if (*cmd==0 or s[p]==0) return true;
if (s[p]==' ') return true;
}
return false;
}
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
struct Every
{
std::string cmd;
uint32_t ms;
uint32_t next;
uint32_t underrun=0;
bool active=true;
void dump()
{
Serial << (active ? "enabled " : "disabled ");
auto mill=millis();
Serial << ms << "ms [" << cmd << "] next in ";
if (mill > next)
Serial << "now";
else
Serial << next-mill << "ms";
}
};
uint32_t blink_ms_on[16];
uint32_t blink_ms_off[16];
uint32_t blink_next[16];
bool blink_state[16];
int16_t blink;
std::vector<Every> everies;
void eval(std::string& cmd)
{
while(cmd.length())
{
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd.clear();
}
}
}
s = getword(cmd);
if (s.length()) convertToCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
else
{
Serial << "Unknown broker command (" << s << ")" << endl;
s.clear();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
retval = client->publish(getword(cmd, topic.c_str()), getword(cmd));
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
else
{
Serial << "Unknown client command (" << s << ")" << endl;
s.clear();
}
}
else if (compare(s, "on"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, HIGH);
}
else if (compare(s, "off"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
}
else if (compare(s, "echo"))
{
s=getword(cmd);
if (s=="on")
echo_on = true;
else if (s=="off")
echo_on = false;
else
{
Serial << s << ' ';
while(cmd.length())
{
Serial << getword(cmd) << ' ';
}
}
}
else if (compare(s, "every"))
{
uint32_t ms = getint(cmd, 0);
if (ms)
{
if (cmd.length())
{
Every every;
every.ms=ms;
every.cmd=cmd;
every.next=millis()+ms;
everies.push_back(every);
every.dump();
Serial << endl;
cmd.clear();
}
}
else if (compare(cmd, "off") or compare(cmd, "on"))
{
bool active=getword(cmd)=="on";
uint8_t ever=getint(cmd, 100);
uint8_t count=0;
for(auto& every: everies)
{
if (count==ever or (ever==100))
{
if (every.active != active)
{
Serial << "every #" << count << (active ? " on" :" off") << endl;
every.active = active;
every.underrun = 0;
}
}
count++;
}
}
else if (compare(cmd, "list") or cmd.length()==0)
{
getword(cmd);
Serial << "List of everies (ms=" << millis() << ")" << endl;
uint8_t count=0;
for(auto& every: everies)
{
Serial << count << ": ";
every.dump();
Serial << endl;
count++;
}
}
else if (compare(cmd, "remove"))
{
Serial << "Removing..." << endl;
getword(cmd);
int8_t every=getint(cmd, -1);
if (every==-1 and compare(cmd, "last") and everies.size())
{
getword(cmd);
everies.erase(everies.begin()+everies.size()-1);
}
else if (every==-1 and compare(cmd, "all"))
{
getword(cmd);
everies.clear();
}
else if (everies.size() > (uint8_t)every)
{
everies.erase(everies.begin()+every);
}
else
Serial << "Bad colmmand" << endl;
}
else
Serial << "Bad command" << endl;
}
else if (compare(s, "blink"))
{
int8_t blink_nr = getint(cmd, -1);
if (blink_nr >= 0)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< blink_nr;
else
{
blink &= ~(1<< blink_nr);
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
it.second->dump(" ");
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " +-- '" << it.first.c_str() << "' " << it.second->clientsCount() << " client/s."<< endl;
it.second->dump(" ");
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " echo [on|off] or strings" << endl;
Serial << " every ms [command]; every list; every remove [nr|all], every (on|off) [#]" << endl;
Serial << " on {output}; off {output}" << endl;
Serial << " $id : name of the client." << endl;
Serial << " rnd[(min[,max])] random number." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "# MQTT ERROR " << retval << endl;
}
}
}
void loop()
{
auto ms=millis();
int8_t out=1;
int8_t out=0;
int16_t blink_bits = blink;
uint8_t e=0;
for(auto& every: everies)
{
if (not every.active) continue;
if (every.ms && every.cmd.length() && ms > every.next)
{
std::string cmd(every.cmd);
eval(cmd);
every.next += every.ms;
if (ms > every.next and ms > every.underrun)
{
Serial << "Underrun every #" << e << ", " << (ms - every.next) << "ms late" << endl;
every.underrun = ms+5000;
}
}
e++;
}
while(blink_bits)
{
if (blink_ms_on[out] and ms > blink_next[out])
@@ -353,7 +851,9 @@ void loop()
}
static long count;
#if defined(ESP9266)
MDNS.update();
#endif
if (MqttClient::counter != count)
{
@@ -372,10 +872,11 @@ void loop()
{
static std::string cmd;
char c=Serial.read();
if (echo_on)
Serial << c;
if (c==10 or c==14)
if (c==10 or c==13)
{
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
@@ -384,277 +885,7 @@ void loop()
last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
while(cmd.length())
{
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
}
else if (compare(s, "blink"))
{
uint8_t blink_nr = getint(cmd, 0);
if (blink_nr)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< (blink_nr-1);
else
{
blink &= ~(1<<(blink_nr-1));
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
}
eval(cmd);
}
else
{

View File

@@ -1,12 +1,12 @@
{
"name": "TinyMqtt",
"keywords": "ethernet, mqtt, m2m, iot",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages. It does support MQTT 3.1.1 with QOS=0.",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive and host a broker for MQTT. It does support MQTT 3.1.1 with QOS=0 on ESP8266 and ESP32 WROOM platfrms.",
"repository": {
"type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git"
},
"version": "0.7.3",
"version": "0.7.6",
"exclude": "",
"examples": "examples/*/*.ino",
"frameworks": "arduino",

View File

@@ -1,9 +1,9 @@
name=TinyMqtt
version=0.7.3
version=0.7.6
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP. It does support MQTT 3.1.1 with QoS=0.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP 8266 and 32 WROOM. It does support MQTT 3.1.1 with QoS=0.
category=Communication
url=https://github.com/hsaturn/TinyMqtt
architectures=*

View File

@@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
MqttBroker::MqttBroker(uint16_t port)
{
server = new AsyncServer(port);
server = new TcpServer(port);
#ifdef TCP_ASYNC
server->onClient(onClient, this);
#endif
}
MqttBroker::~MqttBroker()
@@ -25,12 +27,17 @@ MqttBroker::~MqttBroker()
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client)
: parent(parent), client(new_client)
MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
: parent(parent)
{
#ifdef TCP_ASYNC
client = new_client;
client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
#endif
alive = millis()+5000; // client expires after 5s if no CONNECT msg
}
@@ -72,33 +79,22 @@ void MqttClient::close(bool bSendDisconnect)
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{
debug("cnx: closing");
keep_alive = ka;
close();
if (client) delete client;
client = new AsyncClient;
client = new TcpClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
// TODO This may return immediately !!!
// TODO so I have to add onConnect and move this code to onConnect
// TODO also, as this is async now, I must take care of
// TODO the broker that may disconnect and delete the client immediately
#ifdef TCP_ASYNC
client->onData(onData, this);
client->onConnect(onConnect, this);
client->connect(broker.c_str(), port);
#else
if (client->connect(broker.c_str(), port))
{
debug("cnx: connecting");
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
keep_alive = ka;
msg.add(0x00); // keep_alive
msg.add((char)keep_alive);
msg.add(clientId);
debug("cnx: mqtt connecting");
msg.sendTo(this);
msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0);
onConnect(this, client);
}
#endif
}
void MqttBroker::addClient(MqttClient* client)
@@ -134,16 +130,24 @@ void MqttBroker::removeClient(MqttClient* remove)
debug("Error cannot remove client"); // TODO should not occur
}
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client)
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
{
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client));
debug("New client #" << broker->clients.size());
debug("New client");
}
void MqttBroker::loop()
{
#ifndef TCP_ASYNC
WiFiClient client = server->available();
if (client)
{
onClient(this, &client);
}
#endif
if (broker)
{
// TODO should monitor broker's activity.
@@ -163,7 +167,7 @@ void MqttBroker::loop()
}
else
{
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent);
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent);
// Note: deleting a client not added by the broker itself will probably crash later.
delete client;
break;
@@ -263,13 +267,45 @@ void MqttClient::loop()
client->write((const char*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local browser
// TODO when many MqttClient passes through a local broker
// there is no need to send one PingReq per instance.
}
}
#ifndef TCP_ASYNC
while(client && client->available()>0)
{
message.incoming(client->read());
if (message.type())
{
processMessage(&message);
message.reset();
}
}
#endif
}
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
{
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
debug("cnx: connecting");
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
msg.add(0x00); // keep_alive
msg.add((char)mqtt->keep_alive);
msg.add(mqtt->clientId);
debug("cnx: mqtt connecting");
msg.sendTo(mqtt);
msg.reset();
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent);
mqtt->clientAlive(0);
}
#ifdef TCP_ASYNC
void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len)
{
char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr);
@@ -284,6 +320,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
len--;
}
}
#endif
void MqttClient::resubscribe()
{
@@ -360,7 +397,7 @@ void MqttClient::processMessage(const MqttMessage* mesg)
#ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming");
}
#endif
@@ -497,6 +534,11 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
}
break;
case MqttMessage::Type::UnSuback:
if (!mqtt_connected) break;
bclose = false;
break;
case MqttMessage::Type::Publish:
if (mqtt_connected or client == nullptr)
{
@@ -623,7 +665,7 @@ void MqttMessage::incoming(char in_byte)
state = Length;
break;
case Length:
size = (size<<7) + (in_byte & 0x3F);
size = (size<<7) + (in_byte & 0x7F);
if (size > MaxBufferLength)
{
state = Error;

View File

@@ -1,6 +1,25 @@
#pragma once
#include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h>
// TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC
// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
#if defined(ESP8266) || defined(EPOXY_DUINO)
#ifdef TCP_ASYNC
#include <ESPAsyncTCP.h>
#else
#include <ESP8266WiFi.h>
#endif
#elif defined(ESP32)
#include <WiFi.h>
#ifdef TCP_ASYNC
#include <AsyncTCP.h> // https://github.com/me-no-dev/AsyncTCP
#endif
#endif
#ifdef EPOXY_DUINO
#define dbg_ptr uint64_t
#else
#define dbg_ptr uint32_t
#endif
#include <vector>
#include <set>
#include <string>
@@ -15,6 +34,14 @@
#define debug(what) {}
#endif
#ifdef TCP_ASYNC
using TcpClient = AsyncClient;
using TcpServer = AsyncServer;
#else
using TcpClient = WiFiClient;
using TcpServer = WiFiServer;
#endif
enum MqttError
{
MqttOk = 0,
@@ -49,6 +76,7 @@ class MqttMessage
Subscribe = 0x80,
SubAck = 0x90,
UnSubscribe = 0xA0,
UnSuback = 0xB0,
PingReq = 0xC0,
PingResp = 0xD0,
Disconnect = 0xE0
@@ -148,6 +176,7 @@ class MqttClient
// Publish from client to the world
MqttError publish(const Topic&, const char* payload, size_t pay_length);
MqttError publish(const Topic& t, const char* payload) { return publish(t, payload, strlen(payload)); }
MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); }
MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
@@ -160,35 +189,40 @@ class MqttClient
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
void dump()
void dump(std::string indent="")
{
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") " << (connected() ? " ON " : " OFF");
Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive;
Serial << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF");
Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive << ' ';
Serial << (client && client->connected() ? "" : "dis") << "connected";
message.hexdump("entrant msg");
bool c=false;
Serial << " [";
for(auto s: subscriptions)
if (subscriptions.size())
{
Serial << (c?", ": "")<< s.str().c_str();
c=true;
bool c = false;
Serial << " [";
for(auto s: subscriptions)
{
if (c) Serial << ", ";
Serial << s.str().c_str();
c=true;
}
Serial << ']';
}
Serial << "]" << endl;
Serial << endl;
}
/** Count the number of messages that have been sent **/
static long counter;
private:
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len);
static void onConnect(void * client_ptr, TcpClient*);
#ifdef TCP_ASYNC
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
#endif
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe();
friend class MqttBroker;
MqttClient(MqttBroker* parent, AsyncClient* client);
MqttClient(MqttBroker* parent, TcpClient* client);
// republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
@@ -206,7 +240,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker
AsyncClient* client=nullptr; // connection to mqtt client or to remote broker
TcpClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback = nullptr;
@@ -233,20 +267,16 @@ class MqttBroker
size_t clientsCount() const { return clients.size(); }
void dump()
void dump(std::string indent="")
{
Serial << clients.size() << " client/s" << endl;
for(auto client: clients)
{
Serial << " ";
client->dump();
}
client->dump(indent);
}
private:
friend class MqttClient;
static void onClient(void*, AsyncClient*);
static void onClient(void*, TcpClient*);
bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); }
@@ -264,7 +294,7 @@ class MqttBroker
bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients;
AsyncServer* server;
TcpServer* server;
const char* auth_user = "guest";
const char* auth_password = "guest";

View File

@@ -2,6 +2,7 @@
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := local-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
ESP_LIBS = ESP8266WiFi ESPAsyncTCP
include ../../../EspMock/EspMock.mk
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP
ARDUINO_LIB_DIRS := ../../../EspMock/libraries
EPOXY_CORE := EPOXY_CORE_ESP8266
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -2,6 +2,7 @@
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := nowifi-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
ESP_LIBS = ESP8266WiFi ESPAsyncTCP
include ../../../EspMock/EspMock.mk
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsyncTCP
ARDUINO_LIB_DIRS := ../../../EspMock/libraries
EPOXY_CORE := EPOXY_CORE_ESP8266
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -2,6 +2,7 @@
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := string-indexer-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
ESP_LIBS = ESP8266WiFi ESPAsyncTCP
include ../../../EspMock/EspMock.mk
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock ESP8266WiFi ESPAsync
ARDUINO_LIB_DIRS := ../../../EspMock/libraries
EPOXY_CORE := EPOXY_CORE_ESP8266
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -105,5 +105,5 @@ void setup() {
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
// if (Serial.available()) ESP.reset();
}