Compare commits

..

30 Commits
0.3.0 ... 0.5.0

Author SHA1 Message Date
hsaturn
d92aa1fe3c Minor change to publish 2021-03-24 21:20:07 +01:00
hsaturn
0d6e194560 test client enhancements 2021-03-24 21:19:44 +01:00
hsaturn
7107da2cce Client supports (does not disconnect) Suback / Puback 2021-03-24 21:18:27 +01:00
hsaturn
28c8713415 Client keep_alive is now parameterized 2021-03-24 21:17:08 +01:00
hsaturn
70cf8137de Fixed build of client-without-wifi 2021-03-24 18:35:57 +01:00
hsaturn
5ab315e472 Removed dependency with Streaming.h 2021-03-24 18:35:11 +01:00
hsaturn
b96b36f10c README update 2021-03-24 01:33:45 +01:00
hsaturn
ba831ea366 README update 2021-03-24 01:32:47 +01:00
hsaturn
4020393f90 MqttClient can subscribe and receive publishes from distant broker 2021-03-24 01:30:56 +01:00
hsaturn
7b20e7deb5 Supports multiple subscriptions 2021-03-23 23:51:33 +01:00
hsaturn
efe6a05bbd MqttStreaming.h, streaming with fixes 2021-03-23 23:41:00 +01:00
hsaturn
84dbb80106 Release 0.4.0 2021-03-22 02:45:57 +01:00
hsaturn
47bc06f0ce removed need of Streaming.h if no debug 2021-03-22 02:44:30 +01:00
hsaturn
07c96c19a5 Better simple-client example 2021-03-22 02:35:34 +01:00
hsaturn
de8813f9f6 No more serial prints 2021-03-22 02:35:10 +01:00
hsaturn
fbc24c94e3 MqttClient::publish with String added 2021-03-22 02:34:45 +01:00
hsaturn
169abf8099 allow MqttClient to be constructed with nothing 2021-03-22 02:34:23 +01:00
hsaturn
5cee67095e Fix payload content 2021-03-22 02:33:54 +01:00
hsaturn
0cb2e99b4b Better debug defines 2021-03-22 02:32:45 +01:00
hsaturn
54c905a32f API Changed
Fix too long time
2021-03-22 02:10:54 +01:00
hsaturn
befab9dd6e More TODOs (happy betas) 2021-03-22 01:59:40 +01:00
hsaturn
bd2e7cc5f6 Fix crash on MqttClient timeout when not linked to a broker 2021-03-22 01:59:17 +01:00
hsaturn
18b5f0c27b Better client creation 2021-03-22 01:19:50 +01:00
hsaturn
e71a4d5e87 MqttClient was unable to publish in some cases 2021-03-22 01:19:26 +01:00
hsaturn
620dbf31af Rewrite interpreter, can handle brokers now 2021-03-22 00:28:05 +01:00
hsaturn
52690ec7e7 Fix some rare case crashes 2021-03-22 00:27:23 +01:00
hsaturn
9f28e7f92f Version 0.3.0 library files 2021-03-21 19:34:40 +01:00
hsaturn
ed9efbb5ce Removed buffer 256 thus less memory is needed for MqttClient instances 2021-03-21 17:21:36 +01:00
hsaturn
4fd34bfffa More commands, and dot notation added 2021-03-21 16:34:14 +01:00
hsaturn
7be4d86f46 TODO list changed 2021-03-21 16:33:53 +01:00
14 changed files with 908 additions and 224 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
*~ *~
src/my_credentials.h

View File

@@ -23,12 +23,16 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
* Implement zeroconf mode (needs async) * Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be * Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed. * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* ~~MqttClient does not support more than one subscription at time~~
* MqttClient auto re-subscribe
* MqttClient auto reconnection
* MqttClient does not callback payload...
* MqttClient user/password
## Quickstart ## Quickstart
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt) * install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup) * modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)

View File

@@ -1,10 +1,7 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
// TODO should be renamed to most-complete setup
/** /**
* Local broker that accept connections * Local broker that accept connections and two local clients
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
@@ -31,11 +28,11 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker); MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker); MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length) void onPublishA(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; } { Serial << endl << "---------> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length) void onPublishB(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; } { Serial << endl << "---------> B Received " << topic.c_str() << endl; }
void setup() void setup()
{ {
@@ -67,7 +64,7 @@ void loop()
mqtt_b.loop(); mqtt_b.loop();
// ============= client A publish ================ // ============= client A publish ================
static const int intervalA = 50000; static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA; static uint32_t timerA = millis() + intervalA;
if (millis() > timerA) if (millis() > timerA)
@@ -78,7 +75,7 @@ void loop()
} }
// ============= client B publish ================ // ============= client B publish ================
static const int intervalB = 30000; // will send topic each 5000 ms static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB; static uint32_t timerB = millis() + intervalB;
if (millis() > timerB) if (millis() > timerB)

View File

@@ -1,5 +1,4 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** TinyMQTT allows a disconnected mode: /** TinyMQTT allows a disconnected mode:
* *
@@ -15,10 +14,10 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker); MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker); MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length) void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; } { Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length) void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; } { Serial << "--> B Received " << topic.c_str() << endl; }
void setup() void setup()

View File

@@ -1,5 +1,4 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <my_credentials.h> #include <my_credentials.h>

View File

@@ -1,6 +1,5 @@
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** Simple Client /** Simple Client
* *
@@ -16,6 +15,9 @@
#include <my_credentials.h> #include <my_credentials.h>
static float temp=19;
static MqttClient client;
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
@@ -29,8 +31,22 @@ void setup()
{ delay(500); Serial << '.'; } { delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port
} }
void loop() void loop()
{ {
client.loop();
delay(1000);
auto rnd=random(100);
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
client.publish("sensor/temperature", String(temp));
} }

View File

@@ -0,0 +1,11 @@
// vim: ts=30
Exemple of commands that can be sent via the serial monitor to tinymqtt-test
----------------------------------------------------------------------------
client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883
a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic
delete a destroy the client

View File

@@ -1,9 +1,10 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #define TINY_MQTT_DEBUG
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h>
#include <map> #include <map>
/** /**
* Local broker that accept connections * Console allowing to make any kind of test.
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
@@ -25,11 +26,13 @@
std::string topic="sensor/temperature"; std::string topic="sensor/temperature";
MqttBroker broker(1883);
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; } { Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; }
std::map<std::string, MqttClient*> clients;
std::map<std::string, MqttBroker*> brokers;
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
@@ -46,21 +49,36 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin(); MqttBroker* broker = new MqttBroker(1883);
broker->begin();
brokers["broker"] = broker;
} }
std::string getword(std::string& str, const char* if_empty=nullptr) int getint(std::string& str, const int if_empty=0)
{ {
std::string sword; std::string sword;
while(str.length() && str[0]!=' ') while(str.length() && str[0]>='0' && str[0]<='9')
{ {
sword += str[0]; str.erase(0,1); sword += str[0]; str.erase(0,1);
} }
while(str[0]==' ') str.erase(0,1); while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty; if (if_empty and sword.length()==0) return if_empty;
return atoi(sword.c_str());
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string sword;
while(str.length() && str[0]!=sep)
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return sword; return sword;
} }
// publish at regular interval
class automatic class automatic
{ {
public: public:
@@ -128,7 +146,7 @@ class automatic
autop->bon=false; autop->bon=false;
else if (s=="interval") else if (s=="interval")
{ {
int32_t i=atol(getword(cmd).c_str()); int32_t i=getint(cmd);
if (i) if (i)
autop->interval(atol(s.c_str())); autop->interval(atol(s.c_str()));
else else
@@ -172,6 +190,7 @@ class automatic
std::string topic_; std::string topic_;
bool bon=false; bool bon=false;
static std::map<MqttClient*, automatic*> autos; static std::map<MqttClient*, automatic*> autos;
float temp=19;
}; };
std::map<MqttClient*, automatic*> automatic::autos; std::map<MqttClient*, automatic*> automatic::autos;
@@ -181,35 +200,12 @@ bool compare(std::string s, const char* cmd)
return strncmp(cmd, s.c_str(), s.length())==0; return strncmp(cmd, s.c_str(), s.length())==0;
} }
std::map<std::string, MqttClient*> clients;
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false)
{
std::string s=getword(cmd);
bool found = clients.find(s) != clients.end();
if (canBeNull && found==false)
{
cmd += ' ' + s;
}
if (found or canBeNull)
{
MqttClient* publish = publish = clients[s];
func(cmd, publish);
}
else
{
Serial << "client not found (" << s.c_str() << ")" << endl;
cmd="";
}
}
void loop() void loop()
{ {
broker.loop(); for(auto it: brokers)
it.second->loop();
for(auto it: clients) for(auto it: clients)
it.second->loop(); it.second->loop();
@@ -223,90 +219,209 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "------------------------------------------------------" << endl; Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
while(cmd.length()) while(cmd.length())
{ {
std::string s = getword(cmd); MqttError retval = MqttOk;
if (compare(s,"connect"))
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos)
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) s=getword(cmd, nullptr, '.');
{ publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); });
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
} }
else if (compare(s,"publish"))
s = getword(cmd);
if (compare(s, "delete"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) if (client==nullptr && broker==nullptr)
{ publish->publish(getword(cmd, topic.c_str())); }); {
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
clients.erase(s);
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
Serial << (int32_t)it.second << '/' << (int32_t)broker << endl;
if (broker != it.second) continue;
Serial << "deleted" << endl;
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
} }
else if (compare(s,"subscribe")) else if (broker)
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) if (compare(s,"connect"))
{ publish->subscribe(getword(cmd, topic.c_str())); }); {
} Serial << "NYI" << endl;
else if (compare(s, "view")) }
else if (compare(s, "view"))
{
broker->dump();
}
}
else if (client)
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) if (compare(s,"connect"))
{ publish->dump(); }); {
client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
} }
else if (compare(s, "auto")) else if (compare(s, "auto"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) automatic::command(client, cmd);
{ automatic::command(publish, cmd); if (client == nullptr)
if (publish == nullptr) cmd.clear();
cmd.clear();
}, true);
} }
else if (compare(s, "new")) else if (compare(s, "broker"))
{ {
std::string id=getword(cmd); std::string id=getword(cmd);
if (id.length()) if (id.length() or brokers.find(id)!=brokers.end())
{ {
MqttClient* client = new MqttClient(&broker); int port=getint(cmd, 0);
client->id(id); if (port)
clients[id]=client; {
client->setCallback(onPublish); MqttBroker* broker = new MqttBroker(port);
client->subscribe(topic); broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
} }
else else
Serial << "missing id" << endl; Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls"; cmd+=" ls";
} }
else if (compare(s, "delete")) else if (compare(s, "client"))
{ {
s = getword(cmd); std::string id=getword(cmd);
auto it=clients.find(s); if (id.length() or clients.find(id)!=clients.end())
if (it != clients.end())
{ {
delete it->second; s=getword(cmd); // broker name
clients.erase(it); if (s=="" or brokers.find(s) != brokers.end())
cmd+=" ls"; {
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
} }
else else
Serial << "Unknown client (" << s.c_str() << ")" << endl; Serial << "Missing or existing client name" << endl;
cmd+=" ls";
} }
else if (compare(s, "ls")) else if (compare(s, "ls") or compare(s, "view"))
{ {
Serial << "main : " << clients.size() << " client/s." << endl; Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients) for(auto it: clients)
{ {
Serial << " "; it.second->dump(); Serial << " "; it.second->dump();
} }
broker.dump();
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
} }
else if (compare(s, "reset")) else if (compare(s, "reset"))
ESP.restart(); ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help")) else if (compare(s,"help"))
{ {
Serial << "syntax:" << endl; Serial << "syntax:" << endl;
Serial << " new/delete $id" << endl; Serial << " MqttBroker:" << endl;
Serial << " connect $id [ip]" << endl; Serial << " broker {name} {port} : create a new broker" << endl;
Serial << " subscribe $id [topic]" << endl; Serial << endl;
Serial << " publish $id [topic]" << endl; Serial << " MqttClient:" << endl;
Serial << " view $id " << endl; Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help(); automatic::help();
Serial << endl; Serial << endl;
Serial << " help" << endl; Serial << " help" << endl;
Serial << " ls" << endl; Serial << " ls / ip / reset" << endl;
Serial << " reset" << endl; Serial << " ! repeat last command" << endl;
Serial << endl; Serial << endl;
Serial << " $id : name of the client." << endl; Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl; Serial << " default topic is '" << topic.c_str() << "'" << endl;
@@ -314,7 +429,14 @@ void loop()
} }
else else
{ {
Serial << "Unknown command (" << s.c_str() << ")" << endl; while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
} }
} }
} }

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.2", "version": "0.4.0",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,9 +1,9 @@
name=TinyMqtt name=TinyMqtt
version=0.2.0 version=0.4.0
author=HSaturn <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages and to jhost a broker in your ESP. It does support MQTT 3.1.1 without any QOS.
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*

412
src/MqttStreaming.h Normal file
View File

@@ -0,0 +1,412 @@
/* MqttStreaming.h - Fork of Streaming.h adding std::string and with some minor fixes
* (I have to speek to the author in order to include my changes to his library if possible)
**/
/*
Streaming.h - Arduino library for supporting the << streaming operator
Copyright (c) 2010-2012 Mikal Hart. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Version 6 library changes
Copyright (c) 2019 Gazoodle. All rights reserved.
1. _BASED moved to template to remove type conversion to long and
sign changes which break int8_t and int16_t negative numbers.
The print implementation still upscales to long for it's internal
print routine.
2. _PAD added to allow padding & filling of characters to the stream
3. _WIDTH & _WIDTHZ added to allow width printing with space padding
and zero padding for numerics
4. Simple _FMT mechanism ala printf, but without the typeunsafetyness
and no internal buffers for replaceable stream printing
*/
#ifndef ARDUINO_STREAMING
#define ARDUINO_STREAMING
#if defined(ARDUINO) && ARDUINO >= 100
#include "Arduino.h"
#else
#ifndef STREAMING_CONSOLE
#include "WProgram.h"
#endif
#endif
#include <string>
#if defined(ARDUINO_ARCH_AVR) || defined(ARDUINO_ARCH_MEGAAVR)
// No stl library, so need trivial version of std::is_signed ...
namespace std {
template<typename T>
struct is_signed { static const bool value = false; };
template<>
struct is_signed<int8_t> { static const bool value = true; };
template<>
struct is_signed<int16_t> { static const bool value = true; };
template<>
struct is_signed<int32_t> { static const bool value = true; };
};
#else
#include <type_traits>
#endif
#define STREAMING_LIBRARY_VERSION 6
#if !defined(typeof)
#define typeof(x) __typeof__(x)
#endif
// PrintBuffer implementation of Print, a small buffer to print in
// see its use with pad_float()
template <size_t N>
class PrintBuffer : public Print
{
size_t pos = 0;
char str[N] {};
public:
inline const char *operator() ()
{ return str; };
// inline void clear()
// { pos = 0; str[0] = '\0'; };
inline size_t write(uint8_t c)
{ return write(&c, 1); };
inline size_t write(const uint8_t *buffer, size_t size)
{
size_t s = std::min(size, N-1 - pos); // need a /0 left
if (s)
{
memcpy(&str[pos], buffer, s);
pos += s;
}
return s;
};
};
// Generic template
template<class T>
inline Print &operator <<(Print &stream, const T &arg)
{ stream.print(arg); return stream; }
// TODO sfinae maybe could do the trick ?
inline Print &operator <<(Print &stream, const std::string &str)
{ stream.print(str.c_str()); return stream; }
template<typename T>
struct _BASED
{
T val;
int base;
_BASED(T v, int b): val(v), base(b)
{}
};
#if ARDUINO >= 100
struct _BYTE_CODE
{
byte val;
_BYTE_CODE(byte v) : val(v)
{}
};
#define _BYTE(a) _BYTE_CODE(a)
inline Print &operator <<(Print &obj, const _BYTE_CODE &arg)
{ obj.write(arg.val); return obj; }
#else
#define _BYTE(a) _BASED<typeof(a)>(a, BYTE)
#endif
#define _HEX(a) _BASED<typeof(a)>(a, HEX)
#define _DEC(a) _BASED<typeof(a)>(a, DEC)
#define _OCT(a) _BASED<typeof(a)>(a, OCT)
#define _BIN(a) _BASED<typeof(a)>(a, BIN)
// Specialization for class _BASED
// Thanks to Arduino forum user Ben Combee who suggested this
// clever technique to allow for expressions like
// Serial << _HEX(a);
template<typename T>
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
{ obj.print(arg.val, arg.base); return obj; }
#if ARDUINO >= 18
// Specialization for class _FLOAT
// Thanks to Michael Margolis for suggesting a way
// to accommodate Arduino 0018's floating point precision
// feature like this:
// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision
struct _FLOAT
{
double val; // only Print::print(double)
int digits;
_FLOAT(double v, int d): val(v), digits(d)
{}
};
inline Print &operator <<(Print &obj, const _FLOAT &arg)
{ obj.print(arg.val, arg.digits); return obj; }
#endif
// Specialization for enum _EndLineCode
// Thanks to Arduino forum user Paul V. who suggested this
// clever technique to allow for expressions like
// Serial << "Hello!" << endl;
enum _EndLineCode { endl };
inline Print &operator <<(Print &obj, _EndLineCode)
{ obj.println(); return obj; }
// Specialization for padding & filling, mainly utilized
// by the width printers
//
// Use like
// Serial << _PAD(10,' '); // Will output 10 spaces
// Serial << _PAD(4, '0'); // Will output 4 zeros
struct _PAD
{
int8_t width;
char chr;
_PAD(int8_t w, char c) : width(w), chr(c) {}
};
inline Print &operator <<(Print& stm, const _PAD &arg)
{
for(int8_t i = 0; i < arg.width; i++)
stm.print(arg.chr);
return stm;
}
// Specialization for width printing
//
// Use like Result
// -------- ------
// Serial << _WIDTH(1,5) " 1"
// Serial << _WIDTH(10,5) " 10"
// Serial << _WIDTH(100,5) " 100"
// Serial << _WIDTHZ(1,5) "00001"
//
// Great for times & dates, or hex dumps
//
// Serial << _WIDTHZ(hour,2) << ':' << _WIDTHZ(min,2) << ':' << _WIDTHZ(sec,2)
//
// for(int index=0; index<byte_array_size; index++)
// Serial << _WIDTHZ(_HEX(byte_array[index]))
template<typename T>
struct __WIDTH
{
const T val;
int8_t width;
char pad;
__WIDTH(const T& v, int8_t w, char p) : val(v), width(w), pad(p) {}
};
// Count digits in an integer of specific base
template<typename T>
inline uint8_t digits(T v, int8_t base = 10)
{
uint8_t digits = 0;
if ( std::is_signed<T>::value )
{
if ( v < 0 )
{
digits++;
v = -v; // v needs to be postive for the digits counter to work
}
}
do
{
v /= base;
digits++;
} while( v > 0 );
return digits;
}
// Generic get the width of a value in base 10
template<typename T>
inline uint8_t get_value_width(T val)
{ return digits(val); }
inline uint8_t get_value_width(const char * val)
{ return strlen(val); }
#ifdef ARDUINO
inline uint8_t get_value_width(const __FlashStringHelper * val)
{ return strlen_P(reinterpret_cast<const char *>(val)); }
#endif
// _BASED<T> get the width of a value
template<typename T>
inline uint8_t get_value_width(_BASED<T> b)
{ return digits(b.val, b.base); }
// Constructor wrapper to allow automatic template parameter deduction
template<typename T>
__WIDTH<T> _WIDTH(T val, int8_t width) { return __WIDTH<T>(val, width, ' '); }
template<typename T>
__WIDTH<T> _WIDTHZ(T val, int8_t width) { return __WIDTH<T>(val, width, '0'); }
// Operator overload to handle width printing.
template<typename T>
inline Print &operator <<(Print &stm, const __WIDTH<T> &arg)
{ stm << _PAD(arg.width - get_value_width(arg.val), arg.pad) << arg.val; return stm; }
// explicit Operator overload to handle width printing of _FLOAT, double and float
template<typename T>
inline Print &pad_float(Print &stm, const __WIDTH<T> &arg, const double val, const int digits = 2) // see Print::print(double, int = 2)
{
PrintBuffer<32> buf; // it's only ~45B on the stack, no allocation, leak or fragmentation
size_t size = buf.print(val, digits); // print in buf
return stm << _PAD(arg.width - size, arg.pad) << buf(); // pad and concat what's in buf
}
inline Print &operator <<(Print &stm, const __WIDTH<float> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<double> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<_FLOAT> &arg)
{ auto& f = arg.val; return pad_float(stm, arg, f.val, f.digits); }
// a less verbose _FLOATW for _WIDTH(_FLOAT)
#define _FLOATW(val, digits, width) _WIDTH<_FLOAT>(_FLOAT((val), (digits)), (width))
// Specialization for replacement formatting
//
// Designed to be similar to printf that everyone knows and loves/hates. But without
// the internal buffers and type agnosticism. This version only has placeholders in
// the format string, the actual values are supplied using the stream safe operators
// defined in this library.
//
// Use like this:
//
// Serial << FMT(F("Replace % with %"), 1, 2 )
// Serial << FMT("Time is %:%:%", _WIDTHZ(hours,2), _WIDTHZ(minutes,2), _WIDTHZ(seconds,2))
// Serial << FMT("Your score is %\\%", score); // Note the \\ to escape the % sign
// Ok, hold your hats. This is a foray into C++11's variadic template engine ...
inline char get_next_format_char(const char *& format_string)
{
char format_char = *format_string;
if ( format_char > 0 ) format_string++;
return format_char;
}
#ifdef ARDUINO
inline char get_next_format_char(const __FlashStringHelper*& format_string)
{
char format_char = pgm_read_byte(format_string);
if ( format_char > 0 ) format_string = reinterpret_cast<const __FlashStringHelper*>(reinterpret_cast<const char *>(format_string)+1);
return format_char;
}
#endif
template<typename Ft>
inline bool check_backslash(char& format_char, Ft& format_string)
{
if ( format_char == '\\')
{
format_char = get_next_format_char(format_string);
return true;
}
return false;
}
// The template tail printer helper
template<typename Ft, typename... Ts>
struct __FMT
{
Ft format_string;
__FMT(Ft f, Ts ... args) : format_string(f) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
check_backslash(c, format);
if ( c )
stm.print(c);
}
}
};
// The variadic template helper
template<typename Ft, typename T, typename... Ts>
struct __FMT<Ft, T, Ts...> : __FMT<Ft, Ts...>
{
T val;
__FMT(Ft f, T t, Ts... ts) : __FMT<Ft, Ts...>(f, ts...), val(t) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
if (!check_backslash(c, format))
{
if ( c == '%')
{
stm << val;
// Variadic recursion ... compiler rolls this out during
// template argument pack expansion
__FMT<Ft, Ts...>::tstreamf(stm, format);
return;
}
}
if (c)
stm.print(c);
}
}
};
// The actual operator should you only instanciate the FMT
// helper with a format string and no parameters
template<typename Ft, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// The variadic stream helper
template<typename Ft, typename T, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, T, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// As we don't have C++17, we can't get a constructor to use
// automatic argument deduction, but ... this little trick gets
// around that ...
template<typename Ft, typename... Ts>
__FMT<Ft, Ts...> _FMT(Ft format, Ts ... args) { return __FMT<Ft, Ts...>(format, args...); }
#endif

View File

@@ -2,7 +2,6 @@
#include <map> #include <map>
#include <string> #include <string>
#include <string.h> #include <string.h>
#include <Streaming.h>
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
/*** /***
@@ -36,7 +35,7 @@ class StringIndexer
{ {
strings[index].str = std::string(str, len); strings[index].str = std::string(str, len);
strings[index].used++; strings[index].used++;
Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl; // Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
return index; return index;
} }
} }
@@ -66,7 +65,7 @@ class StringIndexer
if (it->second.used == 0) if (it->second.used == 0)
{ {
strings.erase(it); strings.erase(it);
Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl; // Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
} }
} }
} }

View File

@@ -1,12 +1,5 @@
#include "TinyMqtt.h" #include "TinyMqtt.h"
#include <sstream> #include <sstream>
#include <Streaming.h>
#if 1
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
void outstring(const char* prefix, const char*p, uint16_t len) void outstring(const char* prefix, const char*p, uint16_t len)
{ {
@@ -20,10 +13,19 @@ MqttBroker::MqttBroker(uint16_t port) : server(port)
{ {
} }
MqttBroker::~MqttBroker()
{
while(clients.size())
{
delete clients[0];
}
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent) : parent(parent)
{ {
client = new_client ? new WiFiClient(new_client) : nullptr; client = new WiFiClient(new_client);
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
@@ -32,7 +34,7 @@ MqttClient::MqttClient(MqttBroker* parent)
{ {
client = nullptr; client = nullptr;
parent->addClient(this); if (parent) parent->addClient(this);
} }
MqttClient::~MqttClient() MqttClient::~MqttClient()
@@ -57,13 +59,13 @@ void MqttClient::close()
} }
} }
void MqttClient::connect(std::string broker, uint16_t port) void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{ {
debug("cnx: closing"); debug("cnx: closing");
close(); close();
debug("cnx: closed");
if (client) delete client; if (client) delete client;
client = new WiFiClient; client = new WiFiClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); debug("cnx: connecting");
@@ -72,13 +74,15 @@ void MqttClient::connect(std::string broker, uint16_t port)
message.add(0x4); // Mqtt protocol version 3.1.1 message.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name message.add(0x0); // Connect flags TODO user / name
keep_alive = 1; keep_alive = ka; // TODO not configurable
message.add(0x00); // keep_alive message.add(0x00); // keep_alive
message.add((char)keep_alive); message.add((char)keep_alive);
message.add(clientId); message.add(clientId);
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
message.sendTo(this); message.sendTo(this);
message.reset();
debug("cnx: mqtt sent " << (int32_t)parent); debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0); clientAlive(0);
} }
} }
@@ -119,7 +123,7 @@ void MqttBroker::loop()
for(int i=0; i<clients.size(); i++) for(int i=0; i<clients.size(); i++)
{ {
auto client = clients[i]; auto client = clients[i];
if(client->connected()) if (client->connected())
{ {
client->loop(); client->loop();
} }
@@ -133,24 +137,29 @@ void MqttBroker::loop()
} }
} }
void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{ {
MqttError retval = MqttOk;
debug("publish "); debug("publish ");
int i=0; int i=0;
for(auto client: clients) for(auto client: clients)
{ {
i++; i++;
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected(); " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6 if (broker && broker->connected()) // Broker is connected
{ {
// ext broker -> clients or // ext broker -> clients or
// or clients -> ext broker // or clients -> ext broker
if (source == broker) // broker -> clients if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // clients -> broker
broker->publish(topic, msg); {
MqttError ret = broker->publish(topic, msg);
if (ret != MqttOk) retval = ret;
}
} }
else // Disconnected: R7 else // Disconnected: R7
{ {
@@ -159,9 +168,10 @@ void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessa
} }
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
if (doit) client->publish(topic, msg); if (doit) retval = client->publish(topic, msg);
debug(""); debug("");
} }
return retval;
} }
bool MqttBroker::compareString( bool MqttBroker::compareString(
@@ -174,10 +184,10 @@ bool MqttBroker::compareString(
return *good==0; return *good==0;
} }
void MqttMessage::getString(char* &buffer, uint16_t& len) void MqttMessage::getString(const char* &buff, uint16_t& len)
{ {
len = (buffer[0]<<8)|(buffer[1]); len = (buff[0]<<8)|(buff[1]);
buffer+=2; buff+=2;
} }
void MqttClient::clientAlive(uint32_t more_seconds) void MqttClient::clientAlive(uint32_t more_seconds)
@@ -198,9 +208,11 @@ void MqttClient::loop()
{ {
debug("timeout client"); debug("timeout client");
close(); close();
debug("closed");
} }
else else if (client && client->connected())
{ {
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2); client->write((uint8_t*)(&pingreq), 2);
clientAlive(0); clientAlive(0);
@@ -220,13 +232,42 @@ void MqttClient::loop()
} }
} }
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
debug("subsribe(" << topic.c_str() << ")");
MqttError ret = MqttOk;
subscriptions.insert(topic);
if (parent==nullptr) // remote broker ?
{
debug("remote subscribe");
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic.str());
msg.add(qos);
ret = msg.sendTo(this);
// TODO we should wait (state machine) for SUBACK
}
return ret;
}
void MqttClient::processMessage() void MqttClient::processMessage()
{ {
std::string error; std::string error;
std::string s; std::string s;
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
message.hexdump("Incoming");
}
auto header = message.getVHeader(); auto header = message.getVHeader();
char* payload; const char* payload;
uint16_t len; uint16_t len;
bool bclose=true; bool bclose=true;
@@ -305,6 +346,24 @@ void MqttClient::processMessage()
message.sendTo(this); message.sendTo(this);
break; break;
case MqttMessage::Type::Connack:
// TODO what more on connack ?
mqtt_connected = true;
bclose = false;
break;
case MqttMessage::Type::Suback:
case MqttMessage::Type::Puback:
if (!mqtt_connected) break;
// Ignore acks
bclose = false;
break;
case MqttMessage::Type::PingResp:
// TODO: no PingResp is suspicious (server dead)
bclose = false;
break;
case MqttMessage::Type::PingReq: case MqttMessage::Type::PingReq:
if (!mqtt_connected) break; if (!mqtt_connected) break;
if (client) if (client)
@@ -320,14 +379,26 @@ void MqttClient::processMessage()
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
if (!mqtt_connected) break; {
payload = header+2; if (!mqtt_connected) break;
message.getString(payload, len); // Topic payload = header+2;
outstring("Subscribes", payload, len);
subscribe(Topic(payload, len)); debug("subscribe loop");
bclose = false; while(payload < message.end())
// TODO SUBACK {
message.getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len));
payload += len;
uint8_t qos = *payload++;
debug(" qos=" << qos);
}
debug("end loop");
bclose = false;
// TODO SUBACK
}
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
@@ -344,8 +415,15 @@ void MqttClient::processMessage()
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
debug("publishing to parent"); if (parent)
parent->publish(this, published, message); {
debug("publishing to parent");
parent->publish(this, published, message);
}
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
// TODO should send PUBACK // TODO should send PUBACK
bclose = false; bclose = false;
} }
@@ -369,7 +447,7 @@ void MqttClient::processMessage()
} }
else else
{ {
clientAlive(5); clientAlive(parent ? 5 : 0);
} }
message.reset(); message.reset();
} }
@@ -382,22 +460,25 @@ bool Topic::matches(const Topic& topic) const
} }
// publish from local client // publish from local client
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{ {
message.create(MqttMessage::Publish); MqttMessage msg;
message.add(topic); msg.create(MqttMessage::Publish);
message.add(payload, pay_length); msg.add(topic);
msg.add(payload, pay_length, false);
if (parent) if (parent)
parent->publish(this, topic, message); return parent->publish(this, topic, msg);
else if (client) else if (client)
publish(topic, message); return msg.sendTo(this);
else else
Serial << " Should not happen" << endl; return MqttNowhereToSend;
} }
// republish a received publish if it matches any in subscriptions // republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg) MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
{ {
MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
@@ -407,28 +488,28 @@ void MqttClient::publish(const Topic& topic, MqttMessage& msg)
Serial << " match/send"; Serial << " match/send";
if (client) if (client)
{ {
msg.sendTo(this); retval = msg.sendTo(this);
} }
else if (callback) else if (callback)
{ {
callback(this, topic, nullptr, 0); // TODO callback(this, topic, nullptr, 0); // TODO Payload
} }
} }
Serial << endl; Serial << endl;
} }
return retval;
} }
void MqttMessage::reset() void MqttMessage::reset()
{ {
curr=buffer; buffer.clear();
*curr=0; // Type Unknown
state=FixedHeader; state=FixedHeader;
size=0; size=0;
} }
void MqttMessage::incoming(char in_byte) void MqttMessage::incoming(char in_byte)
{ {
*curr++ = in_byte; buffer += in_byte;
switch(state) switch(state)
{ {
case FixedHeader: case FixedHeader:
@@ -443,7 +524,7 @@ void MqttMessage::incoming(char in_byte)
} }
else if ((in_byte & 0x80) == 0) else if ((in_byte & 0x80) == 0)
{ {
vheader = curr; vheader = buffer.length();
if (size==0) if (size==0)
state = Complete; state = Complete;
else else
@@ -464,22 +545,24 @@ void MqttMessage::incoming(char in_byte)
break; break;
case Complete: case Complete:
default: default:
curr--;
Serial << "Spurious " << _HEX(in_byte) << endl; Serial << "Spurious " << _HEX(in_byte) << endl;
state = Error; reset();
break; break;
} }
if (curr-buffer > 250) if (buffer.length() > MaxBufferLength) // TODO magic 256 ?
{ {
debug("Spurious byte " << _HEX(in_byte)); debug("Too long " << state);
curr=buffer; reset();
} }
} }
void MqttMessage::add(const char* p, size_t len) void MqttMessage::add(const char* p, size_t len, bool addLength)
{ {
incoming(len>>8); if (addLength)
incoming(len & 0xFF); {
incoming(len>>8);
incoming(len & 0xFF);
}
while(len--) incoming(*p++); while(len--) incoming(*p++);
} }
@@ -494,31 +577,58 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
void MqttMessage::sendTo(MqttClient* client) MqttError MqttMessage::sendTo(MqttClient* client)
{ {
if (curr-buffer-2 >= 0) if (buffer.size()>2)
{ {
encodeLength(buffer+1, curr-buffer-2); debug("sending " << buffer.size() << " bytes");
// hexdump("snd"); encodeLength(&buffer[1], buffer.size()-2);
client->write(buffer, curr-buffer); hexdump("snd");
client->write(&buffer[0], buffer.size());
} }
else else
{ {
Serial << "??? Invalid send" << endl; debug("??? Invalid send");
Serial << (long)end() << "-" << (long)buffer << endl; return MqttInvalidMessage;
} }
return MqttOk;
} }
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const
{ {
if (prefix) Serial << prefix << ' '; uint16_t addr=0;
Serial << (long)buffer << "-" << (long)curr << " : "; const int bytes_per_row = 8;
const char* p=buffer; const char* hex_to_str = " | ";
while(p!=curr) const char* separator = hex_to_str;
const char* half_sep = " - ";
std::string ascii;
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
for(const char chr: buffer)
{
if ((addr % bytes_per_row) == 0)
{ {
if (*p<16) Serial << '0'; if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
Serial << _HEX(*p) << ' '; if (prefix) Serial << prefix << separator;
p++; ascii.clear();
} }
Serial << endl; addr++;
if (chr<16) Serial << '0';
Serial << _HEX(chr) << ' ';
ascii += (chr<32 ? '.' : chr);
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
}
if (ascii.length())
{
while(ascii.length() < bytes_per_row+strlen(half_sep))
{
Serial << " "; // spaces per hexa byte
ascii += ' ';
}
Serial << hex_to_str << ascii << separator;
}
Serial << endl;
} }

View File

@@ -3,8 +3,22 @@
#include <set> #include <set>
#include <string> #include <string>
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h>
#define MaxBufferLength 255 #define TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
enum MqttError
{
MqttOk = 0,
MqttNowhereToSend=1,
MqttInvalidMessage=2,
};
class Topic : public IndexedString class Topic : public IndexedString
{ {
@@ -21,6 +35,7 @@ class Topic : public IndexedString
class MqttClient; class MqttClient;
class MqttMessage class MqttMessage
{ {
const uint16_t MaxBufferLength = 255;
public: public:
enum Type enum Type
{ {
@@ -30,6 +45,7 @@ class MqttMessage
Publish = 0x30, Publish = 0x30,
PubAck = 0x40, PubAck = 0x40,
Subscribe = 0x80, Subscribe = 0x80,
Suback = 0x90,
PingReq = 0xC0, PingReq = 0xC0,
PingResp = 0xD0, PingResp = 0xD0,
}; };
@@ -45,21 +61,21 @@ class MqttMessage
}; };
MqttMessage() { reset(); } MqttMessage() { reset(); }
MqttMessage(Type t) { create(t); } MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len); void add(const char* p, size_t len, bool addLength=true );
void add(const std::string& s) { add(s.c_str(), s.length()); } void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); } void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; } const char* end() const { return &buffer[0]+buffer.size(); }
char* end() const { return curr; } const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return curr-buffer; } uint16_t length() const { return buffer.size(); }
void reset(); void reset();
// buff is MSB/LSB/STRING // buff is MSB/LSB/STRING
// output buff+=2, len=length(str) // output buff+=2, len=length(str)
void getString(char* &buff, uint16_t& len); static void getString(const char* &buff, uint16_t& len);
Type type() const Type type() const
@@ -69,21 +85,20 @@ class MqttMessage
void create(Type type) void create(Type type)
{ {
buffer[0]=type; buffer=(char)type;
curr=buffer+2; buffer+='\0';
vheader=curr; vheader=2;
size=0; size=0;
state=Create; state=Create;
} }
void sendTo(MqttClient*); MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
void encodeLength(char* msb, int length); void encodeLength(char* msb, int length);
char buffer[256]; // TODO why 256 ? (should be replaced by a std::string) std::string buffer;
char* vheader; uint8_t vheader;
char* curr;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
State state; State state;
}; };
@@ -104,13 +119,16 @@ class MqttClient
}; };
public: public:
MqttClient(MqttBroker*); MqttClient(MqttBroker*);
MqttClient() : MqttClient(nullptr) {};
~MqttClient(); ~MqttClient();
void connect(MqttBroker* parent); void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port); void connect(std::string broker, uint16_t port, uint16_t ka=10);
bool connected() { return client==nullptr || client->connected(); } bool connected() { return
(parent!=nullptr and client==nullptr) or
(client and client->connected()); }
void write(const char* buf, size_t length) void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); } { if (client) client->write(buf, length); }
@@ -122,37 +140,45 @@ class MqttClient
void setCallback(CallBack fun) {callback=fun; }; void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world // Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length); MqttError publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); }
void publish(const Topic& t) { publish(t, nullptr, 0);}; MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); } MqttError subscribe(Topic topic, uint8_t qos=0);
void unsubscribe(Topic& topic); MqttError unsubscribe(Topic& topic);
// connected to local broker // connected to local broker
// TODO seems to be useless // TODO seems to be useless
bool isLocal() const { return client == nullptr; } bool isLocal() const { return client == nullptr; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); << " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << " cnx " << (client && client->connected());
Serial << " ["; Serial << " [";
message.hexdump("entrant msg");
bool c=false; bool c=false;
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
c=true; c=true;
} }
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif
private: private:
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg); MqttError publish(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds); void clientAlive(uint32_t more_seconds);
void processMessage(); void processMessage();
@@ -162,33 +188,18 @@ class MqttClient
uint32_t keep_alive; uint32_t keep_alive;
uint32_t alive; uint32_t alive;
MqttMessage message; MqttMessage message;
// TODO having a pointer on MqttBroker may produce larger binaries
// due to unecessary function linked if ever parent is not used
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
}; };
/***********************************************
* R1 - accept external cnx
* R2 - allows all clients pusblish to go outside
* R3 - allows ext publish to all clients
* R4 - allows local publish to local clients
* R5 - tries to connect elsewhere (*)
* R6 - disconnect external clients
* R7 - allows all publish to go everywhere
* ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* ================================================+
* | connected | not connected |
* -------------+---------------+------------------+
* proxy broker | R2 R3 R5 R6 | R5 R7 |
* normal broker| R2 R3 R5 R6 | R1 R5 R7 |
* -------------+---------------+------------------+
*
*/
class MqttBroker class MqttBroker
{ {
enum State enum State
@@ -200,6 +211,7 @@ class MqttBroker
public: public:
// TODO limit max number of clients // TODO limit max number of clients
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
~MqttBroker();
void begin() { server.begin(); } void begin() { server.begin(); }
void loop(); void loop();
@@ -209,15 +221,17 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883); void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
Serial << "broker: " << clients.size() << " client/s" << endl; Serial << clients.size() << " client/s" << endl;
for(auto client: clients) for(auto client: clients)
{ {
Serial << " "; Serial << " ";
client->dump(); client->dump();
} }
} }
#endif
private: private:
friend class MqttClient; friend class MqttClient;
@@ -229,7 +243,7 @@ class MqttBroker
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
// For clients that are added not by the broker itself // For clients that are added not by the broker itself
void addClient(MqttClient* client); void addClient(MqttClient* client);