Compare commits

...

34 Commits
0.3.0 ... 0.5.1

Author SHA1 Message Date
hsaturn
f122d5e902 relase 0.5.1 2021-03-25 01:26:27 +01:00
hsaturn
d63793cf77 Avoid to use message member, minor changes 2021-03-25 01:26:03 +01:00
hsaturn
8386779e92 tinytest great enhancements 2021-03-25 01:24:46 +01:00
hsaturn
1b988a06a2 Relase 0.5.0 2021-03-24 21:21:46 +01:00
hsaturn
d92aa1fe3c Minor change to publish 2021-03-24 21:20:07 +01:00
hsaturn
0d6e194560 test client enhancements 2021-03-24 21:19:44 +01:00
hsaturn
7107da2cce Client supports (does not disconnect) Suback / Puback 2021-03-24 21:18:27 +01:00
hsaturn
28c8713415 Client keep_alive is now parameterized 2021-03-24 21:17:08 +01:00
hsaturn
70cf8137de Fixed build of client-without-wifi 2021-03-24 18:35:57 +01:00
hsaturn
5ab315e472 Removed dependency with Streaming.h 2021-03-24 18:35:11 +01:00
hsaturn
b96b36f10c README update 2021-03-24 01:33:45 +01:00
hsaturn
ba831ea366 README update 2021-03-24 01:32:47 +01:00
hsaturn
4020393f90 MqttClient can subscribe and receive publishes from distant broker 2021-03-24 01:30:56 +01:00
hsaturn
7b20e7deb5 Supports multiple subscriptions 2021-03-23 23:51:33 +01:00
hsaturn
efe6a05bbd MqttStreaming.h, streaming with fixes 2021-03-23 23:41:00 +01:00
hsaturn
84dbb80106 Release 0.4.0 2021-03-22 02:45:57 +01:00
hsaturn
47bc06f0ce removed need of Streaming.h if no debug 2021-03-22 02:44:30 +01:00
hsaturn
07c96c19a5 Better simple-client example 2021-03-22 02:35:34 +01:00
hsaturn
de8813f9f6 No more serial prints 2021-03-22 02:35:10 +01:00
hsaturn
fbc24c94e3 MqttClient::publish with String added 2021-03-22 02:34:45 +01:00
hsaturn
169abf8099 allow MqttClient to be constructed with nothing 2021-03-22 02:34:23 +01:00
hsaturn
5cee67095e Fix payload content 2021-03-22 02:33:54 +01:00
hsaturn
0cb2e99b4b Better debug defines 2021-03-22 02:32:45 +01:00
hsaturn
54c905a32f API Changed
Fix too long time
2021-03-22 02:10:54 +01:00
hsaturn
befab9dd6e More TODOs (happy betas) 2021-03-22 01:59:40 +01:00
hsaturn
bd2e7cc5f6 Fix crash on MqttClient timeout when not linked to a broker 2021-03-22 01:59:17 +01:00
hsaturn
18b5f0c27b Better client creation 2021-03-22 01:19:50 +01:00
hsaturn
e71a4d5e87 MqttClient was unable to publish in some cases 2021-03-22 01:19:26 +01:00
hsaturn
620dbf31af Rewrite interpreter, can handle brokers now 2021-03-22 00:28:05 +01:00
hsaturn
52690ec7e7 Fix some rare case crashes 2021-03-22 00:27:23 +01:00
hsaturn
9f28e7f92f Version 0.3.0 library files 2021-03-21 19:34:40 +01:00
hsaturn
ed9efbb5ce Removed buffer 256 thus less memory is needed for MqttClient instances 2021-03-21 17:21:36 +01:00
hsaturn
4fd34bfffa More commands, and dot notation added 2021-03-21 16:34:14 +01:00
hsaturn
7be4d86f46 TODO list changed 2021-03-21 16:33:53 +01:00
14 changed files with 1049 additions and 243 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
*~
src/my_credentials.h

View File

@@ -23,12 +23,16 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
* Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed.
* Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* ~~MqttClient does not support more than one subscription at time~~
* MqttClient auto re-subscribe
* MqttClient auto reconnection
* MqttClient does not callback payload...
* MqttClient user/password
## Quickstart
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)

View File

@@ -1,10 +1,7 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
// TODO should be renamed to most-complete setup
/**
* Local broker that accept connections
* Local broker that accept connections and two local clients
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
@@ -31,11 +28,11 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishA(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << endl << "---------> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void onPublishB(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << endl << "---------> B Received " << topic.c_str() << endl; }
void setup()
{
@@ -67,7 +64,7 @@ void loop()
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 50000;
static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
@@ -78,7 +75,7 @@ void loop()
}
// ============= client B publish ================
static const int intervalB = 30000; // will send topic each 5000 ms
static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)

View File

@@ -1,5 +1,4 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** TinyMQTT allows a disconnected mode:
*
@@ -15,10 +14,10 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length)
void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length)
void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void setup()

View File

@@ -1,5 +1,4 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <my_credentials.h>

View File

@@ -1,6 +1,5 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** Simple Client
*
@@ -16,6 +15,9 @@
#include <my_credentials.h>
static float temp=19;
static MqttClient client;
void setup()
{
Serial.begin(115200);
@@ -29,8 +31,22 @@ void setup()
{ delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port
}
void loop()
{
client.loop();
delay(1000);
auto rnd=random(100);
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
client.publish("sensor/temperature", String(temp));
}

View File

@@ -0,0 +1,34 @@
// vim: ts=30
Exemple of commands that can be sent via the serial monitor to tinymqtt-test
----------------------------------------------------------------------------
Commands can usually be abbreviated to their first letters.
ex: cl for client, a / a.con / a.sub / a.p for publish.
set name value set variable name to value (later replaced)
set name if no value, then var is erased
set view all vars
reserved keywords are forbidden
client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883
a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic
delete a destroy the client
----------------------------------------------------
example:
client c
c.connect broker.emqx.io
set topic sensor/temperature
c.subscribe topic
c.publish topic 15
c.publish topic 20
macro exansion example
set temp publish sensor/temperature
c.temp 20 -> c.publish sensor/temperature 20

View File

@@ -1,9 +1,10 @@
#define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <MqttStreaming.h>
#include <map>
/**
* Local broker that accept connections
* Console allowing to make any kind of test.
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
@@ -25,11 +26,13 @@
std::string topic="sensor/temperature";
MqttBroker broker(1883);
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; }
std::map<std::string, MqttClient*> clients;
std::map<std::string, MqttBroker*> brokers;
void setup()
{
Serial.begin(115200);
@@ -46,21 +49,101 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
MqttBroker* broker = new MqttBroker(1883);
broker->begin();
brokers["broker"] = broker;
}
std::string getword(std::string& str, const char* if_empty=nullptr)
int getint(std::string& str, const int if_empty=0)
{
std::string sword;
while(str.length() && str[0]!=' ')
while(str.length() && str[0]>='0' && str[0]<='9')
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return atoi(sword.c_str());
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string sword;
while(str.length() && str[0]!=sep)
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return sword;
}
std::map<std::string, std::string> vars;
std::set<std::string> commands = {
"auto", "broker", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "view"
};
void getCommand(std::string& search)
{
while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return;
std::string matches;
int count=0;
for(std::string cmd: commands)
{
if (cmd.substr(0, search.length()) == search)
{
if (count) matches +=", ";
count++;
matches += cmd;
}
}
if (count==1)
search = matches;
else if (count>1)
{
Serial << "Ambiguous command: " << matches << endl;
search="";
}
}
void replace(const char* d, std::string& str, std::string srch, std::string to)
{
if (d[0] && d[1])
{
srch=d[0]+srch+d[1];
to=d[0]+to+d[1];
size_t pos = 0;
while((pos=str.find(srch, pos)) != std::string::npos)
{
str.erase(pos, srch.length());
str.insert(pos, to);
pos += to.length();
}
}
}
void replaceVars(std::string& cmd)
{
cmd = ' '+cmd+' ';
for(auto it: vars)
{
replace("..", cmd, it.first, it.second);
replace(". ", cmd, it.first, it.second);
replace(" .", cmd, it.first, it.second);
replace(" ", cmd, it.first, it.second);
}
cmd.erase(0, cmd.find_first_not_of(" "));
cmd.erase(cmd.find_last_not_of(" ")+1);
}
// publish at regular interval
class automatic
{
public:
@@ -128,7 +211,7 @@ class automatic
autop->bon=false;
else if (s=="interval")
{
int32_t i=atol(getword(cmd).c_str());
int32_t i=getint(cmd);
if (i)
autop->interval(atol(s.c_str()));
else
@@ -162,6 +245,7 @@ class automatic
{
Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl;
Serial << " auto [$id] interval [s]" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl;
}
@@ -172,6 +256,7 @@ class automatic
std::string topic_;
bool bon=false;
static std::map<MqttClient*, automatic*> autos;
float temp=19;
};
std::map<MqttClient*, automatic*> automatic::autos;
@@ -181,35 +266,12 @@ bool compare(std::string s, const char* cmd)
return strncmp(cmd, s.c_str(), s.length())==0;
}
std::map<std::string, MqttClient*> clients;
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false)
{
std::string s=getword(cmd);
bool found = clients.find(s) != clients.end();
if (canBeNull && found==false)
{
cmd += ' ' + s;
}
if (found or canBeNull)
{
MqttClient* publish = publish = clients[s];
func(cmd, publish);
}
else
{
Serial << "client not found (" << s.c_str() << ")" << endl;
cmd="";
}
}
void loop()
{
broker.loop();
for(auto it: brokers)
it.second->loop();
for(auto it: clients)
it.second->loop();
@@ -223,90 +285,247 @@ void loop()
if (c==10 or c==14)
{
Serial << "------------------------------------------------------" << endl;
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
replaceVars(cmd);
Serial << "---------------@[ " << cmd.c_str() << " ]--------------" << endl;
while(cmd.length())
{
std::string s = getword(cmd);
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
clients.erase(s);
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
Serial << (int32_t)it.second << '/' << (int32_t)broker << endl;
if (broker != it.second) continue;
Serial << "deleted" << endl;
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); });
}
else if (compare(s,"publish"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->publish(getword(cmd, topic.c_str())); });
}
else if (compare(s,"subscribe"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->subscribe(getword(cmd, topic.c_str())); });
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->dump(); });
broker->dump();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
}
else if (compare(s, "auto"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ automatic::command(publish, cmd);
if (publish == nullptr)
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}, true);
}
else if (compare(s, "new"))
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length())
if (id.length() or brokers.find(id)!=brokers.end())
{
MqttClient* client = new MqttClient(&broker);
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "missing id" << endl;
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "delete"))
else if (compare(s, "set"))
{
s = getword(cmd);
auto it=clients.find(s);
if (it != clients.end())
std::string name(getword(cmd));
if (name.length()==0)
{
delete it->second;
clients.erase(it);
cmd+=" ls";
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
Serial << "Unknown client (" << s.c_str() << ")" << endl;
}
else if (compare(s, "ls"))
{
Serial << "main : " << clients.size() << " client/s." << endl;
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
broker.dump();
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " new/delete $id" << endl;
Serial << " connect $id [ip]" << endl;
Serial << " subscribe $id [topic]" << endl;
Serial << " publish $id [topic]" << endl;
Serial << " view $id " << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " ls" << endl;
Serial << " reset" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
@@ -314,8 +533,15 @@ void loop()
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
}
}
else

View File

@@ -6,7 +6,7 @@
"type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git"
},
"version": "0.2",
"version": "0.5.1",
"exclude": "",
"examples": "examples/*/*.ino",
"frameworks": "arduino",

View File

@@ -1,9 +1,9 @@
name=TinyMqtt
version=0.2.0
author=HSaturn <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com>
version=0.5.1
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages and to jhost a broker in your ESP. It does support MQTT 3.1.1 without any QOS.
category=Communication
url=https://github.com/hsaturn/TinyMqtt
architectures=*

412
src/MqttStreaming.h Normal file
View File

@@ -0,0 +1,412 @@
/* MqttStreaming.h - Fork of Streaming.h adding std::string and with some minor fixes
* (I have to speek to the author in order to include my changes to his library if possible)
**/
/*
Streaming.h - Arduino library for supporting the << streaming operator
Copyright (c) 2010-2012 Mikal Hart. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Version 6 library changes
Copyright (c) 2019 Gazoodle. All rights reserved.
1. _BASED moved to template to remove type conversion to long and
sign changes which break int8_t and int16_t negative numbers.
The print implementation still upscales to long for it's internal
print routine.
2. _PAD added to allow padding & filling of characters to the stream
3. _WIDTH & _WIDTHZ added to allow width printing with space padding
and zero padding for numerics
4. Simple _FMT mechanism ala printf, but without the typeunsafetyness
and no internal buffers for replaceable stream printing
*/
#ifndef ARDUINO_STREAMING
#define ARDUINO_STREAMING
#if defined(ARDUINO) && ARDUINO >= 100
#include "Arduino.h"
#else
#ifndef STREAMING_CONSOLE
#include "WProgram.h"
#endif
#endif
#include <string>
#if defined(ARDUINO_ARCH_AVR) || defined(ARDUINO_ARCH_MEGAAVR)
// No stl library, so need trivial version of std::is_signed ...
namespace std {
template<typename T>
struct is_signed { static const bool value = false; };
template<>
struct is_signed<int8_t> { static const bool value = true; };
template<>
struct is_signed<int16_t> { static const bool value = true; };
template<>
struct is_signed<int32_t> { static const bool value = true; };
};
#else
#include <type_traits>
#endif
#define STREAMING_LIBRARY_VERSION 6
#if !defined(typeof)
#define typeof(x) __typeof__(x)
#endif
// PrintBuffer implementation of Print, a small buffer to print in
// see its use with pad_float()
template <size_t N>
class PrintBuffer : public Print
{
size_t pos = 0;
char str[N] {};
public:
inline const char *operator() ()
{ return str; };
// inline void clear()
// { pos = 0; str[0] = '\0'; };
inline size_t write(uint8_t c)
{ return write(&c, 1); };
inline size_t write(const uint8_t *buffer, size_t size)
{
size_t s = std::min(size, N-1 - pos); // need a /0 left
if (s)
{
memcpy(&str[pos], buffer, s);
pos += s;
}
return s;
};
};
// Generic template
template<class T>
inline Print &operator <<(Print &stream, const T &arg)
{ stream.print(arg); return stream; }
// TODO sfinae maybe could do the trick ?
inline Print &operator <<(Print &stream, const std::string &str)
{ stream.print(str.c_str()); return stream; }
template<typename T>
struct _BASED
{
T val;
int base;
_BASED(T v, int b): val(v), base(b)
{}
};
#if ARDUINO >= 100
struct _BYTE_CODE
{
byte val;
_BYTE_CODE(byte v) : val(v)
{}
};
#define _BYTE(a) _BYTE_CODE(a)
inline Print &operator <<(Print &obj, const _BYTE_CODE &arg)
{ obj.write(arg.val); return obj; }
#else
#define _BYTE(a) _BASED<typeof(a)>(a, BYTE)
#endif
#define _HEX(a) _BASED<typeof(a)>(a, HEX)
#define _DEC(a) _BASED<typeof(a)>(a, DEC)
#define _OCT(a) _BASED<typeof(a)>(a, OCT)
#define _BIN(a) _BASED<typeof(a)>(a, BIN)
// Specialization for class _BASED
// Thanks to Arduino forum user Ben Combee who suggested this
// clever technique to allow for expressions like
// Serial << _HEX(a);
template<typename T>
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
{ obj.print(arg.val, arg.base); return obj; }
#if ARDUINO >= 18
// Specialization for class _FLOAT
// Thanks to Michael Margolis for suggesting a way
// to accommodate Arduino 0018's floating point precision
// feature like this:
// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision
struct _FLOAT
{
double val; // only Print::print(double)
int digits;
_FLOAT(double v, int d): val(v), digits(d)
{}
};
inline Print &operator <<(Print &obj, const _FLOAT &arg)
{ obj.print(arg.val, arg.digits); return obj; }
#endif
// Specialization for enum _EndLineCode
// Thanks to Arduino forum user Paul V. who suggested this
// clever technique to allow for expressions like
// Serial << "Hello!" << endl;
enum _EndLineCode { endl };
inline Print &operator <<(Print &obj, _EndLineCode)
{ obj.println(); return obj; }
// Specialization for padding & filling, mainly utilized
// by the width printers
//
// Use like
// Serial << _PAD(10,' '); // Will output 10 spaces
// Serial << _PAD(4, '0'); // Will output 4 zeros
struct _PAD
{
int8_t width;
char chr;
_PAD(int8_t w, char c) : width(w), chr(c) {}
};
inline Print &operator <<(Print& stm, const _PAD &arg)
{
for(int8_t i = 0; i < arg.width; i++)
stm.print(arg.chr);
return stm;
}
// Specialization for width printing
//
// Use like Result
// -------- ------
// Serial << _WIDTH(1,5) " 1"
// Serial << _WIDTH(10,5) " 10"
// Serial << _WIDTH(100,5) " 100"
// Serial << _WIDTHZ(1,5) "00001"
//
// Great for times & dates, or hex dumps
//
// Serial << _WIDTHZ(hour,2) << ':' << _WIDTHZ(min,2) << ':' << _WIDTHZ(sec,2)
//
// for(int index=0; index<byte_array_size; index++)
// Serial << _WIDTHZ(_HEX(byte_array[index]))
template<typename T>
struct __WIDTH
{
const T val;
int8_t width;
char pad;
__WIDTH(const T& v, int8_t w, char p) : val(v), width(w), pad(p) {}
};
// Count digits in an integer of specific base
template<typename T>
inline uint8_t digits(T v, int8_t base = 10)
{
uint8_t digits = 0;
if ( std::is_signed<T>::value )
{
if ( v < 0 )
{
digits++;
v = -v; // v needs to be postive for the digits counter to work
}
}
do
{
v /= base;
digits++;
} while( v > 0 );
return digits;
}
// Generic get the width of a value in base 10
template<typename T>
inline uint8_t get_value_width(T val)
{ return digits(val); }
inline uint8_t get_value_width(const char * val)
{ return strlen(val); }
#ifdef ARDUINO
inline uint8_t get_value_width(const __FlashStringHelper * val)
{ return strlen_P(reinterpret_cast<const char *>(val)); }
#endif
// _BASED<T> get the width of a value
template<typename T>
inline uint8_t get_value_width(_BASED<T> b)
{ return digits(b.val, b.base); }
// Constructor wrapper to allow automatic template parameter deduction
template<typename T>
__WIDTH<T> _WIDTH(T val, int8_t width) { return __WIDTH<T>(val, width, ' '); }
template<typename T>
__WIDTH<T> _WIDTHZ(T val, int8_t width) { return __WIDTH<T>(val, width, '0'); }
// Operator overload to handle width printing.
template<typename T>
inline Print &operator <<(Print &stm, const __WIDTH<T> &arg)
{ stm << _PAD(arg.width - get_value_width(arg.val), arg.pad) << arg.val; return stm; }
// explicit Operator overload to handle width printing of _FLOAT, double and float
template<typename T>
inline Print &pad_float(Print &stm, const __WIDTH<T> &arg, const double val, const int digits = 2) // see Print::print(double, int = 2)
{
PrintBuffer<32> buf; // it's only ~45B on the stack, no allocation, leak or fragmentation
size_t size = buf.print(val, digits); // print in buf
return stm << _PAD(arg.width - size, arg.pad) << buf(); // pad and concat what's in buf
}
inline Print &operator <<(Print &stm, const __WIDTH<float> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<double> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<_FLOAT> &arg)
{ auto& f = arg.val; return pad_float(stm, arg, f.val, f.digits); }
// a less verbose _FLOATW for _WIDTH(_FLOAT)
#define _FLOATW(val, digits, width) _WIDTH<_FLOAT>(_FLOAT((val), (digits)), (width))
// Specialization for replacement formatting
//
// Designed to be similar to printf that everyone knows and loves/hates. But without
// the internal buffers and type agnosticism. This version only has placeholders in
// the format string, the actual values are supplied using the stream safe operators
// defined in this library.
//
// Use like this:
//
// Serial << FMT(F("Replace % with %"), 1, 2 )
// Serial << FMT("Time is %:%:%", _WIDTHZ(hours,2), _WIDTHZ(minutes,2), _WIDTHZ(seconds,2))
// Serial << FMT("Your score is %\\%", score); // Note the \\ to escape the % sign
// Ok, hold your hats. This is a foray into C++11's variadic template engine ...
inline char get_next_format_char(const char *& format_string)
{
char format_char = *format_string;
if ( format_char > 0 ) format_string++;
return format_char;
}
#ifdef ARDUINO
inline char get_next_format_char(const __FlashStringHelper*& format_string)
{
char format_char = pgm_read_byte(format_string);
if ( format_char > 0 ) format_string = reinterpret_cast<const __FlashStringHelper*>(reinterpret_cast<const char *>(format_string)+1);
return format_char;
}
#endif
template<typename Ft>
inline bool check_backslash(char& format_char, Ft& format_string)
{
if ( format_char == '\\')
{
format_char = get_next_format_char(format_string);
return true;
}
return false;
}
// The template tail printer helper
template<typename Ft, typename... Ts>
struct __FMT
{
Ft format_string;
__FMT(Ft f, Ts ... args) : format_string(f) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
check_backslash(c, format);
if ( c )
stm.print(c);
}
}
};
// The variadic template helper
template<typename Ft, typename T, typename... Ts>
struct __FMT<Ft, T, Ts...> : __FMT<Ft, Ts...>
{
T val;
__FMT(Ft f, T t, Ts... ts) : __FMT<Ft, Ts...>(f, ts...), val(t) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
if (!check_backslash(c, format))
{
if ( c == '%')
{
stm << val;
// Variadic recursion ... compiler rolls this out during
// template argument pack expansion
__FMT<Ft, Ts...>::tstreamf(stm, format);
return;
}
}
if (c)
stm.print(c);
}
}
};
// The actual operator should you only instanciate the FMT
// helper with a format string and no parameters
template<typename Ft, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// The variadic stream helper
template<typename Ft, typename T, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, T, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// As we don't have C++17, we can't get a constructor to use
// automatic argument deduction, but ... this little trick gets
// around that ...
template<typename Ft, typename... Ts>
__FMT<Ft, Ts...> _FMT(Ft format, Ts ... args) { return __FMT<Ft, Ts...>(format, args...); }
#endif

View File

@@ -2,7 +2,6 @@
#include <map>
#include <string>
#include <string.h>
#include <Streaming.h>
#include <ESP8266WiFi.h>
/***
@@ -36,7 +35,7 @@ class StringIndexer
{
strings[index].str = std::string(str, len);
strings[index].used++;
Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
// Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
return index;
}
}
@@ -66,7 +65,7 @@ class StringIndexer
if (it->second.used == 0)
{
strings.erase(it);
Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
// Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
}
}
}

View File

@@ -1,12 +1,5 @@
#include "TinyMqtt.h"
#include <sstream>
#include <Streaming.h>
#if 1
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
void outstring(const char* prefix, const char*p, uint16_t len)
{
@@ -20,10 +13,19 @@ MqttBroker::MqttBroker(uint16_t port) : server(port)
{
}
MqttBroker::~MqttBroker()
{
while(clients.size())
{
delete clients[0];
}
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent)
{
client = new_client ? new WiFiClient(new_client) : nullptr;
client = new WiFiClient(new_client);
alive = millis()+5000; // client expires after 5s if no CONNECT msg
}
@@ -32,7 +34,7 @@ MqttClient::MqttClient(MqttBroker* parent)
{
client = nullptr;
parent->addClient(this);
if (parent) parent->addClient(this);
}
MqttClient::~MqttClient()
@@ -57,28 +59,30 @@ void MqttClient::close()
}
}
void MqttClient::connect(std::string broker, uint16_t port)
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{
debug("cnx: closing");
close();
debug("cnx: closed");
if (client) delete client;
client = new WiFiClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
if (client->connect(broker.c_str(), port))
{
debug("cnx: connecting");
message.create(MqttMessage::Type::Connect);
message.add("MQTT",4);
message.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name
MqttMessage msg(MqttMessage::Type::Connect);
msg.add("MQTT",4);
msg.add(0x4); // Mqtt protocol version 3.1.1
msg.add(0x0); // Connect flags TODO user / name
keep_alive = 1;
message.add(0x00); // keep_alive
message.add((char)keep_alive);
message.add(clientId);
keep_alive = ka;
msg.add(0x00); // keep_alive
msg.add((char)keep_alive);
msg.add(clientId);
debug("cnx: mqtt connecting");
message.sendTo(this);
msg.sendTo(this);
msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0);
}
}
@@ -119,7 +123,7 @@ void MqttBroker::loop()
for(int i=0; i<clients.size(); i++)
{
auto client = clients[i];
if(client->connected())
if (client->connected())
{
client->loop();
}
@@ -133,24 +137,29 @@ void MqttBroker::loop()
}
}
void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{
MqttError retval = MqttOk;
debug("publish ");
int i=0;
for(auto client: clients)
{
i++;
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected();
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6
if (broker && broker->connected()) // Broker is connected
{
// ext broker -> clients or
// or clients -> ext broker
if (source == broker) // broker -> clients
doit = true;
else // clients -> broker
broker->publish(topic, msg);
{
MqttError ret = broker->publish(topic, msg);
if (ret != MqttOk) retval = ret;
}
}
else // Disconnected: R7
{
@@ -159,9 +168,10 @@ void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessa
}
Serial << ", doit=" << doit << ' ';
if (doit) client->publish(topic, msg);
if (doit) retval = client->publish(topic, msg);
debug("");
}
return retval;
}
bool MqttBroker::compareString(
@@ -174,10 +184,10 @@ bool MqttBroker::compareString(
return *good==0;
}
void MqttMessage::getString(char* &buffer, uint16_t& len)
void MqttMessage::getString(const char* &buff, uint16_t& len)
{
len = (buffer[0]<<8)|(buffer[1]);
buffer+=2;
len = (buff[0]<<8)|(buff[1]);
buff+=2;
}
void MqttClient::clientAlive(uint32_t more_seconds)
@@ -198,9 +208,11 @@ void MqttClient::loop()
{
debug("timeout client");
close();
debug("closed");
}
else
else if (client && client->connected())
{
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2);
clientAlive(0);
@@ -220,13 +232,42 @@ void MqttClient::loop()
}
}
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
debug("subsribe(" << topic.c_str() << ")");
MqttError ret = MqttOk;
subscriptions.insert(topic);
if (parent==nullptr) // remote broker ?
{
debug("remote subscribe");
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic.str());
msg.add(qos);
ret = msg.sendTo(this);
// TODO we should wait (state machine) for SUBACK
}
return ret;
}
void MqttClient::processMessage()
{
std::string error;
std::string s;
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl;
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
message.hexdump("Incoming");
}
auto header = message.getVHeader();
char* payload;
const char* payload;
uint16_t len;
bool bclose=true;
@@ -298,11 +339,30 @@ void MqttClient::processMessage()
Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl;
bclose = false;
mqtt_connected=true;
// Reuse received msg
message.create(MqttMessage::Type::Connack);
message.add(0); // Session present (not implemented)
message.add(0); // Connection accepted
message.sendTo(this);
{
MqttMessage msg(MqttMessage::Type::ConnAck);
msg.add(0); // Session present (not implemented)
msg.add(0); // Connection accepted
msg.sendTo(this);
}
break;
case MqttMessage::Type::ConnAck:
// TODO what more on connack ?
mqtt_connected = true;
bclose = false;
break;
case MqttMessage::Type::SubAck:
case MqttMessage::Type::PubAck:
if (!mqtt_connected) break;
// Ignore acks
bclose = false;
break;
case MqttMessage::Type::PingResp:
// TODO: no PingResp is suspicious (server dead)
bclose = false;
break;
case MqttMessage::Type::PingReq:
@@ -320,14 +380,26 @@ void MqttClient::processMessage()
break;
case MqttMessage::Type::Subscribe:
{
if (!mqtt_connected) break;
payload = header+2;
message.getString(payload, len); // Topic
outstring("Subscribes", payload, len);
subscribe(Topic(payload, len));
debug("subscribe loop");
while(payload < message.end())
{
message.getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len));
payload += len;
uint8_t qos = *payload++;
debug(" qos=" << qos);
}
debug("end loop");
bclose = false;
// TODO SUBACK
}
break;
case MqttMessage::Type::Publish:
@@ -344,18 +416,20 @@ void MqttClient::processMessage()
if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP
// TODO reset RETAIN
if (parent)
{
debug("publishing to parent");
parent->publish(this, published, message);
}
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
// TODO should send PUBACK
bclose = false;
}
break;
case MqttMessage::Type::PubAck:
if (!mqtt_connected) break;
bclose = false;
break;
default:
bclose=true;
break;
@@ -369,7 +443,7 @@ void MqttClient::processMessage()
}
else
{
clientAlive(5);
clientAlive(parent ? 5 : 0);
}
message.reset();
}
@@ -382,22 +456,24 @@ bool Topic::matches(const Topic& topic) const
}
// publish from local client
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{
message.create(MqttMessage::Publish);
message.add(topic);
message.add(payload, pay_length);
MqttMessage msg(MqttMessage::Publish);
msg.add(topic);
msg.add(payload, pay_length, false);
if (parent)
parent->publish(this, topic, message);
return parent->publish(this, topic, msg);
else if (client)
publish(topic, message);
return msg.sendTo(this);
else
Serial << " Should not happen" << endl;
return MqttNowhereToSend;
}
// republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg)
MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
{
MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions)
{
@@ -407,28 +483,28 @@ void MqttClient::publish(const Topic& topic, MqttMessage& msg)
Serial << " match/send";
if (client)
{
msg.sendTo(this);
retval = msg.sendTo(this);
}
else if (callback)
{
callback(this, topic, nullptr, 0); // TODO
callback(this, topic, nullptr, 0); // TODO Payload
}
}
Serial << endl;
}
return retval;
}
void MqttMessage::reset()
{
curr=buffer;
*curr=0; // Type Unknown
buffer.clear();
state=FixedHeader;
size=0;
}
void MqttMessage::incoming(char in_byte)
{
*curr++ = in_byte;
buffer += in_byte;
switch(state)
{
case FixedHeader:
@@ -443,7 +519,7 @@ void MqttMessage::incoming(char in_byte)
}
else if ((in_byte & 0x80) == 0)
{
vheader = curr;
vheader = buffer.length();
if (size==0)
state = Complete;
else
@@ -464,22 +540,24 @@ void MqttMessage::incoming(char in_byte)
break;
case Complete:
default:
curr--;
Serial << "Spurious " << _HEX(in_byte) << endl;
state = Error;
reset();
break;
}
if (curr-buffer > 250)
if (buffer.length() > MaxBufferLength) // TODO magic 256 ?
{
debug("Spurious byte " << _HEX(in_byte));
curr=buffer;
debug("Too long " << state);
reset();
}
}
void MqttMessage::add(const char* p, size_t len)
void MqttMessage::add(const char* p, size_t len, bool addLength)
{
if (addLength)
{
incoming(len>>8);
incoming(len & 0xFF);
}
while(len--) incoming(*p++);
}
@@ -494,31 +572,58 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length);
};
void MqttMessage::sendTo(MqttClient* client)
MqttError MqttMessage::sendTo(MqttClient* client)
{
if (curr-buffer-2 >= 0)
if (buffer.size()>2)
{
encodeLength(buffer+1, curr-buffer-2);
// hexdump("snd");
client->write(buffer, curr-buffer);
debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2);
hexdump("snd");
client->write(&buffer[0], buffer.size());
}
else
{
Serial << "??? Invalid send" << endl;
Serial << (long)end() << "-" << (long)buffer << endl;
debug("??? Invalid send");
return MqttInvalidMessage;
}
return MqttOk;
}
void MqttMessage::hexdump(const char* prefix) const
{
if (prefix) Serial << prefix << ' ';
Serial << (long)buffer << "-" << (long)curr << " : ";
const char* p=buffer;
while(p!=curr)
uint16_t addr=0;
const int bytes_per_row = 8;
const char* hex_to_str = " | ";
const char* separator = hex_to_str;
const char* half_sep = " - ";
std::string ascii;
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
for(const char chr: buffer)
{
if (*p<16) Serial << '0';
Serial << _HEX(*p) << ' ';
p++;
if ((addr % bytes_per_row) == 0)
{
if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
if (prefix) Serial << prefix << separator;
ascii.clear();
}
addr++;
if (chr<16) Serial << '0';
Serial << _HEX(chr) << ' ';
ascii += (chr<32 ? '.' : chr);
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
}
if (ascii.length())
{
while(ascii.length() < bytes_per_row+strlen(half_sep))
{
Serial << " "; // spaces per hexa byte
ascii += ' ';
}
Serial << hex_to_str << ascii << separator;
}
Serial << endl;
}

View File

@@ -3,8 +3,22 @@
#include <set>
#include <string>
#include "StringIndexer.h"
#include <MqttStreaming.h>
#define MaxBufferLength 255
#define TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
enum MqttError
{
MqttOk = 0,
MqttNowhereToSend=1,
MqttInvalidMessage=2,
};
class Topic : public IndexedString
{
@@ -21,15 +35,17 @@ class Topic : public IndexedString
class MqttClient;
class MqttMessage
{
const uint16_t MaxBufferLength = 255;
public:
enum Type
{
Unknown = 0,
Connect = 0x10,
Connack = 0x20,
ConnAck = 0x20,
Publish = 0x30,
PubAck = 0x40,
Subscribe = 0x80,
SubAck = 0x90,
PingReq = 0xC0,
PingResp = 0xD0,
};
@@ -45,21 +61,21 @@ class MqttMessage
};
MqttMessage() { reset(); }
MqttMessage(Type t) { create(t); }
MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
void incoming(char byte);
void add(char byte) { incoming(byte); }
void add(const char* p, size_t len);
void add(const char* p, size_t len, bool addLength=true );
void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; }
char* end() const { return curr; }
uint16_t length() const { return curr-buffer; }
const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return buffer.size(); }
void reset();
// buff is MSB/LSB/STRING
// output buff+=2, len=length(str)
void getString(char* &buff, uint16_t& len);
static void getString(const char* &buff, uint16_t& len);
Type type() const
@@ -69,21 +85,20 @@ class MqttMessage
void create(Type type)
{
buffer[0]=type;
curr=buffer+2;
vheader=curr;
buffer=(char)type;
buffer+='\0';
vheader=2;
size=0;
state=Create;
}
void sendTo(MqttClient*);
MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const;
private:
void encodeLength(char* msb, int length);
char buffer[256]; // TODO why 256 ? (should be replaced by a std::string)
char* vheader;
char* curr;
std::string buffer;
uint8_t vheader;
uint16_t size; // bytes left to receive
State state;
};
@@ -104,13 +119,16 @@ class MqttClient
};
public:
MqttClient(MqttBroker*);
MqttClient() : MqttClient(nullptr) {};
~MqttClient();
void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port);
void connect(std::string broker, uint16_t port, uint16_t ka=10);
bool connected() { return client==nullptr || client->connected(); }
bool connected() { return
(parent!=nullptr and client==nullptr) or
(client and client->connected()); }
void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); }
@@ -122,37 +140,45 @@ class MqttClient
void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());}
void publish(const Topic& t) { publish(t, nullptr, 0);};
MqttError publish(const Topic&, const char* payload, size_t pay_length);
MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); }
MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); }
void unsubscribe(Topic& topic);
MqttError subscribe(Topic topic, uint8_t qos=0);
MqttError unsubscribe(Topic& topic);
// connected to local broker
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
#ifdef TINY_MQTT_DEBUG
void dump()
{
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << " cnx " << (client && client->connected());
Serial << " [";
message.hexdump("entrant msg");
bool c=false;
for(auto s: subscriptions)
{
Serial << (c?", ": "")<< s.str().c_str();
c=true;
}
Serial << "]" << endl;
}
#endif
private:
friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg);
MqttError publish(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds);
void processMessage();
@@ -162,33 +188,18 @@ class MqttClient
uint32_t keep_alive;
uint32_t alive;
MqttMessage message;
// TODO having a pointer on MqttBroker may produce larger binaries
// due to unecessary function linked if ever parent is not used
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback = nullptr;
};
/***********************************************
* R1 - accept external cnx
* R2 - allows all clients pusblish to go outside
* R3 - allows ext publish to all clients
* R4 - allows local publish to local clients
* R5 - tries to connect elsewhere (*)
* R6 - disconnect external clients
* R7 - allows all publish to go everywhere
* ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* ================================================+
* | connected | not connected |
* -------------+---------------+------------------+
* proxy broker | R2 R3 R5 R6 | R5 R7 |
* normal broker| R2 R3 R5 R6 | R1 R5 R7 |
* -------------+---------------+------------------+
*
*/
class MqttBroker
{
enum State
@@ -200,6 +211,7 @@ class MqttBroker
public:
// TODO limit max number of clients
MqttBroker(uint16_t port);
~MqttBroker();
void begin() { server.begin(); }
void loop();
@@ -209,15 +221,17 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; }
#ifdef TINY_MQTT_DEBUG
void dump()
{
Serial << "broker: " << clients.size() << " client/s" << endl;
Serial << clients.size() << " client/s" << endl;
for(auto client: clients)
{
Serial << " ";
client->dump();
}
}
#endif
private:
friend class MqttClient;
@@ -229,7 +243,7 @@ class MqttBroker
{ return compareString(auth_password, password, len); }
void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
// For clients that are added not by the broker itself
void addClient(MqttClient* client);