Compare commits

..

26 Commits

Author SHA1 Message Date
hsaturn
d3210c3c93 Merge branch 'main' into AsyncAndWifi 2021-04-11 23:30:13 +02:00
hsaturn
23f1207718 Lot of new functions for tinytest
- command every allowing to add peridic evaluations
  very usefull for benchmarks and load tests
- on/off command
2021-04-11 23:27:15 +02:00
hsaturn
122ab88960 Rewrite client-with-wifi.ino 2021-04-11 23:26:49 +02:00
hsaturn
28b0ac1611 Fix missing receive loop for mqttclient 2021-04-11 21:21:48 +02:00
hsaturn
1cfb5cfab1 Allow multiple command per line separated by ; 2021-04-11 19:19:06 +02:00
hsaturn
b023cd67a9 Fix AUnit in debug mode / Not async 2021-04-11 17:02:24 +02:00
hsaturn
24ee6b5201 Fixes in WiFiClient mode 2021-04-11 16:33:12 +02:00
hsaturn
2e92a98db2 Trying to fuse togeter Async and not async version 2021-04-11 15:51:33 +02:00
hsaturn
c59bddfd39 Implementation of Unsuback (unless MqttClient disconnects) 2021-04-11 01:58:44 +02:00
hsaturn
7bdb9cc0cd Tinytest, allow to blink output 0 2021-04-11 01:57:58 +02:00
hsaturn
be62699702 Fix connect problem with MqttClient 2021-04-11 00:48:04 +02:00
hsaturn
77da47e1da Update README.md 2021-04-10 18:02:48 +02:00
hsaturn
88797bfafd Added AUnit 2021-04-10 18:02:28 +02:00
hsaturn
1e3b37623d Fix AUnit build 2021-04-10 17:54:46 +02:00
hsaturn
ba6a96976a ESP32 version that could work 2021-04-10 17:47:21 +02:00
hsaturn
6afd3939b3 Merge remote-tracking branch 'origin/AsyncTcp' into main 2021-04-10 17:23:53 +02:00
hsaturn
2ffe0c13fa README.md update 2021-04-10 17:23:46 +02:00
hsaturn
48eb0daf9a Fix bug in unsubscription list 2021-04-10 17:16:25 +02:00
hsaturn
34c05bc37a Fix compilation in DEBUG mode 2021-04-10 17:16:25 +02:00
hsaturn
7c96c4a5cc Fix warning 2021-04-10 17:16:25 +02:00
hsaturn
b280196395 Added mqDns to tinytest 2021-04-10 17:16:25 +02:00
hsaturn
c75f4893e8 AsyncTcp
AsyncTcp
2021-04-10 17:16:25 +02:00
hsaturn
d666f6a53b AsyncTCP (to be continued) 2021-04-10 17:16:25 +02:00
hsaturn
7ef18de755 Very promising async commit 2021-04-10 17:16:25 +02:00
hsaturn
838df3a34a Added unsubscribe to tinytest 2021-04-10 17:15:21 +02:00
hsaturn
8a25155fd8 Bad merge fix 2021-04-10 16:06:45 +02:00
6 changed files with 537 additions and 351 deletions

View File

@@ -1,6 +1,7 @@
# TinyMqtt
![Release](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
[![AUnit Tests](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml/badge.svg)](https://github.com/hsaturn/TinyMqtt/actions/workflows/aunit.yml)
![Issues](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![Esp8266](https://img.shields.io/badge/platform-ESP8266-green)
![Gpl 3.0](https://img.shields.io/github/license/hsaturn/TinyMqtt)
@@ -48,7 +49,7 @@ After a while one ESP naturally becomes a 'master' and all ESP are connected tog
No problem if the master dies, a new master will be choosen soon.
## TODO List
* Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)
* ~~Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)~~
* Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients)

View File

@@ -1,7 +1,21 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/**
* Local broker that accept connections and two local clients
*
* +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
* | +-------->| broker | | 1883 <--- External client/s
* | | +--------+ |
* | | ^ |
* | | | |
* | v v |
* | +----------+ +----------+ |
* | | internal | | internal | |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
@@ -12,11 +26,6 @@
* cons - Takes more memory
* - a bit hard to understand
*
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/
#include <my_credentials.h>
@@ -58,7 +67,7 @@ void setup()
void loop()
{
broker.loop();
broker.loop(); // Don't forget to add loop for every broker and clients
mqtt_a.loop();
mqtt_b.loop();

View File

@@ -1,10 +1,12 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
/** Simple Client
*
* This is the simplest Mqtt client configuration
*
* 1 - edit my_credentials.h to setup wifi essid/password
* 2 - change BROKER values (or keep emqx.io test broker)
*
* pro - small memory footprint (both ram and flash)
* - very simple to setup and use
*
@@ -13,6 +15,9 @@
* - local publishes takes more time (because they go outside)
*/
const char* BROKER = "broker.emqx.io";
const uint16_t BROKER_PORT = 1883;
#include <my_credentials.h>
static float temp=19;
@@ -32,8 +37,7 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port
client.connect(BROKER, BROKER_PORT); // Put here your broker ip / port
}
void loop()

View File

@@ -138,7 +138,7 @@ std::set<std::string> commands = {
"auto", "broker", "blink", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "unsubscribe", "view"
"publish", "reset", "subscribe", "unsubscribe", "view", "every"
};
void getCommand(std::string& search)
@@ -316,74 +316,44 @@ std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd)
{
if (s.length()==0 or s.length()>strlen(cmd)) return false;
return strncmp(cmd, s.c_str(), s.length())==0;
uint8_t p=0;
while(s[p++]==*cmd++)
{
if (*cmd==0 or s[p]==0) return true;
if (s[p]==' ') return true;
}
return false;
}
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
struct Every
{
std::string cmd;
uint32_t ms;
uint32_t next;
void dump()
{
auto mill=millis();
Serial << ms << "ms [" << cmd << "] next in ";
if (mill > next)
Serial << "now";
else
Serial << next-mill << "ms";
}
};
uint32_t blink_ms_on[16];
uint32_t blink_ms_off[16];
uint32_t blink_next[16];
bool blink_state[16];
int16_t blink;
void loop()
std::vector<Every> everies;
void eval(std::string& cmd)
{
auto ms=millis();
int8_t out=1;
int16_t blink_bits = blink;
while(blink_bits)
{
if (blink_ms_on[out] and ms > blink_next[out])
{
if (blink_state[out])
{
blink_next[out] += blink_ms_on[out];
digitalWrite(out, LOW);
}
else
{
blink_next[out] += blink_ms_off[out];
digitalWrite(abs(out), HIGH);
}
blink_state[out] = not blink_state[out];
}
blink_bits >>=1;
out++;
}
static long count;
MDNS.update();
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers)
it.second->loop();
for(auto it: clients)
it.second->loop();
automatic::loop();
if (Serial.available())
{
static std::string cmd;
char c=Serial.read();
if (c==10 or c==14)
{
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
while(cmd.length())
{
MqttError retval = MqttOk;
@@ -501,10 +471,64 @@ void loop()
client->dump();
}
}
else if (compare(s, "on"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, HIGH);
}
else if (compare(s, "off"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
}
else if (compare(s, "every"))
{
uint32_t ms = getint(cmd, 0);
if (ms and cmd.length())
{
Every every;
every.ms=ms;
every.cmd=cmd;
every.next=millis()+ms;
everies.push_back(every);
every.dump();
Serial << endl;
cmd="";
}
else if (ms==0 and compare(cmd, "list"))
{
getword(cmd);
Serial << "List of everies (ms=" << millis() << ")" << endl;
uint8_t count=0;
for(auto& every: everies)
{
Serial << count << ": ";
every.dump();
Serial << endl;
count++;
}
}
else if (ms==0 and compare(cmd, "remove"))
{
getword(cmd);
int8_t every=getint(cmd, -1);
if (every==-1 and compare(cmd, "all"))
{
getword(cmd);
everies.clear();
}
else if (everies.size() > (uint8_t)every)
{
everies.erase(everies.begin()+every);
}
}
}
else if (compare(s, "blink"))
{
uint8_t blink_nr = getint(cmd, 0);
if (blink_nr)
int8_t blink_nr = getint(cmd, -1);
if (blink_nr >= 0)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
@@ -512,10 +536,10 @@ void loop()
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< (blink_nr-1);
blink |= 1<< blink_nr;
else
{
blink &= ~(1<<(blink_nr-1));
blink &= ~(1<< blink_nr);
}
}
}
@@ -639,6 +663,8 @@ void loop()
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " every ms [command]; every list; every remove [nr|all]" << endl;
Serial << " on {output}; off {output}" << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
@@ -655,6 +681,76 @@ void loop()
Serial << "## ERROR " << retval << endl;
}
}
}
void loop()
{
auto ms=millis();
int8_t out=0;
int16_t blink_bits = blink;
for(auto& every: everies)
{
if (every.ms && every.cmd.length() && ms > every.next)
{
std::string cmd(every.cmd);
eval(cmd);
every.next += every.ms;
}
}
while(blink_bits)
{
if (blink_ms_on[out] and ms > blink_next[out])
{
if (blink_state[out])
{
blink_next[out] += blink_ms_on[out];
digitalWrite(out, LOW);
}
else
{
blink_next[out] += blink_ms_off[out];
digitalWrite(abs(out), HIGH);
}
blink_state[out] = not blink_state[out];
}
blink_bits >>=1;
out++;
}
static long count;
MDNS.update();
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers)
it.second->loop();
for(auto it: clients)
it.second->loop();
automatic::loop();
if (Serial.available())
{
static std::string cmd;
char c=Serial.read();
if (c==10 or c==14)
{
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
eval(cmd);
}
else
{

View File

@@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
MqttBroker::MqttBroker(uint16_t port)
{
server = new AsyncServer(port);
server = new TcpServer(port);
#ifdef TCP_ASYNC
server->onClient(onClient, this);
#endif
}
MqttBroker::~MqttBroker()
@@ -25,12 +27,17 @@ MqttBroker::~MqttBroker()
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client)
: parent(parent), client(new_client)
MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
: parent(parent)
{
#ifdef TCP_ASYNC
client = new_client;
client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
#endif
alive = millis()+5000; // client expires after 5s if no CONNECT msg
}
@@ -72,33 +79,22 @@ void MqttClient::close(bool bSendDisconnect)
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{
debug("cnx: closing");
keep_alive = ka;
close();
if (client) delete client;
client = new AsyncClient;
client = new TcpClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
// TODO This may return immediately !!!
// TODO so I have to add onConnect and move this code to onConnect
// TODO also, as this is async now, I must take care of
// TODO the broker that may disconnect and delete the client immediately
#ifdef TCP_ASYNC
client->onData(onData, this);
client->onConnect(onConnect, this);
client->connect(broker.c_str(), port);
#else
if (client->connect(broker.c_str(), port))
{
debug("cnx: connecting");
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
keep_alive = ka;
msg.add(0x00); // keep_alive
msg.add((char)keep_alive);
msg.add(clientId);
debug("cnx: mqtt connecting");
msg.sendTo(this);
msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0);
onConnect(this, client);
}
#endif
}
void MqttBroker::addClient(MqttClient* client)
@@ -134,16 +130,24 @@ void MqttBroker::removeClient(MqttClient* remove)
debug("Error cannot remove client"); // TODO should not occur
}
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client)
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
{
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client));
debug("New client #" << broker->clients.size());
debug("New client");
}
void MqttBroker::loop()
{
#ifndef TCP_ASYNC
WiFiClient client = server->available();
if (client)
{
onClient(this, &client);
}
#endif
if (broker)
{
// TODO should monitor broker's activity.
@@ -163,7 +167,7 @@ void MqttBroker::loop()
}
else
{
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent);
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent);
// Note: deleting a client not added by the broker itself will probably crash later.
delete client;
break;
@@ -263,13 +267,45 @@ void MqttClient::loop()
client->write((const char*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local browser
// TODO when many MqttClient passes through a local broker
// there is no need to send one PingReq per instance.
}
}
#ifndef TCP_ASYNC
while(client && client->available()>0)
{
message.incoming(client->read());
if (message.type())
{
processMessage(&message);
message.reset();
}
}
#endif
}
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
{
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
debug("cnx: connecting");
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
msg.add(0x00); // keep_alive
msg.add((char)mqtt->keep_alive);
msg.add(mqtt->clientId);
debug("cnx: mqtt connecting");
msg.sendTo(mqtt);
msg.reset();
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent);
mqtt->clientAlive(0);
}
#ifdef TCP_ASYNC
void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len)
{
char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr);
@@ -284,6 +320,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
len--;
}
}
#endif
void MqttClient::resubscribe()
{
@@ -360,7 +397,7 @@ void MqttClient::processMessage(const MqttMessage* mesg)
#ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming");
}
#endif
@@ -497,6 +534,11 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
}
break;
case MqttMessage::Type::UnSuback:
if (!mqtt_connected) break;
bclose = false;
break;
case MqttMessage::Type::Publish:
if (mqtt_connected or client == nullptr)
{

View File

@@ -1,6 +1,28 @@
#pragma once
#include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h>
// TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC
// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
#if defined(ESP8266) || defined(EPOXY_DUINO)
#ifdef TCP_ASYNC
#include <ESPAsyncTCP.h>
#else
#include <ESP8266WiFi.h>
#endif
#elif defined(ESP32)
#ifdef TCP_ASYNC
#include <AsyncTCP.h> // https://github.com/me-no-dev/AsyncTCP
#else
#include <WiFi.h>
#endif
#else
#error "Unsupported platform"
#endif
#ifdef EPOXY_DUINO
#define dbg_ptr uint64_t
#else
#define dbg_ptr uint32_t
#endif
#include <vector>
#include <set>
#include <string>
@@ -15,6 +37,14 @@
#define debug(what) {}
#endif
#ifdef TCP_ASYNC
using TcpClient = AsyncClient;
using TcpServer = AsyncServer;
#else
using TcpClient = WiFiClient;
using TcpServer = WiFiServer;
#endif
enum MqttError
{
MqttOk = 0,
@@ -49,6 +79,7 @@ class MqttMessage
Subscribe = 0x80,
SubAck = 0x90,
UnSubscribe = 0xA0,
UnSuback = 0xB0,
PingReq = 0xC0,
PingResp = 0xD0,
Disconnect = 0xE0
@@ -183,12 +214,15 @@ class MqttClient
static long counter;
private:
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len);
static void onConnect(void * client_ptr, TcpClient*);
#ifdef TCP_ASYNC
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
#endif
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe();
friend class MqttBroker;
MqttClient(MqttBroker* parent, AsyncClient* client);
MqttClient(MqttBroker* parent, TcpClient* client);
// republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
@@ -206,7 +240,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker
AsyncClient* client=nullptr; // connection to mqtt client or to remote broker
TcpClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback = nullptr;
@@ -246,7 +280,7 @@ class MqttBroker
private:
friend class MqttClient;
static void onClient(void*, AsyncClient*);
static void onClient(void*, TcpClient*);
bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); }
@@ -264,7 +298,7 @@ class MqttBroker
bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients;
AsyncServer* server;
TcpServer* server;
const char* auth_user = "guest";
const char* auth_password = "guest";