Compare commits

...

19 Commits
0.3.0 ... 0.4.0

Author SHA1 Message Date
hsaturn
84dbb80106 Release 0.4.0 2021-03-22 02:45:57 +01:00
hsaturn
47bc06f0ce removed need of Streaming.h if no debug 2021-03-22 02:44:30 +01:00
hsaturn
07c96c19a5 Better simple-client example 2021-03-22 02:35:34 +01:00
hsaturn
de8813f9f6 No more serial prints 2021-03-22 02:35:10 +01:00
hsaturn
fbc24c94e3 MqttClient::publish with String added 2021-03-22 02:34:45 +01:00
hsaturn
169abf8099 allow MqttClient to be constructed with nothing 2021-03-22 02:34:23 +01:00
hsaturn
5cee67095e Fix payload content 2021-03-22 02:33:54 +01:00
hsaturn
0cb2e99b4b Better debug defines 2021-03-22 02:32:45 +01:00
hsaturn
54c905a32f API Changed
Fix too long time
2021-03-22 02:10:54 +01:00
hsaturn
befab9dd6e More TODOs (happy betas) 2021-03-22 01:59:40 +01:00
hsaturn
bd2e7cc5f6 Fix crash on MqttClient timeout when not linked to a broker 2021-03-22 01:59:17 +01:00
hsaturn
18b5f0c27b Better client creation 2021-03-22 01:19:50 +01:00
hsaturn
e71a4d5e87 MqttClient was unable to publish in some cases 2021-03-22 01:19:26 +01:00
hsaturn
620dbf31af Rewrite interpreter, can handle brokers now 2021-03-22 00:28:05 +01:00
hsaturn
52690ec7e7 Fix some rare case crashes 2021-03-22 00:27:23 +01:00
hsaturn
9f28e7f92f Version 0.3.0 library files 2021-03-21 19:34:40 +01:00
hsaturn
ed9efbb5ce Removed buffer 256 thus less memory is needed for MqttClient instances 2021-03-21 17:21:36 +01:00
hsaturn
4fd34bfffa More commands, and dot notation added 2021-03-21 16:34:14 +01:00
hsaturn
7be4d86f46 TODO list changed 2021-03-21 16:33:53 +01:00
9 changed files with 329 additions and 186 deletions

View File

@@ -25,6 +25,10 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed. * MqttMessage uses a buffer 256 bytes which is usually far than needed.
* MqttClient auto reconnection
* MqttClient auto re-subscribe
* MqttClient does not callback payload...
* MqttClient user/password
## Quickstart ## Quickstart

View File

@@ -1,10 +1,8 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming #include <Streaming.h> // https://github.com/janelia-arduino/Streaming
// TODO should be renamed to most-complete setup
/** /**
* Local broker that accept connections * Local broker that accept connections and two local clients
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
@@ -31,11 +29,11 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker); MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker); MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length) void onPublishA(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; } { Serial << endl << "---------> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length) void onPublishB(const MqttClient* source, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; } { Serial << endl << "---------> B Received " << topic.c_str() << endl; }
void setup() void setup()
{ {
@@ -67,7 +65,7 @@ void loop()
mqtt_b.loop(); mqtt_b.loop();
// ============= client A publish ================ // ============= client A publish ================
static const int intervalA = 50000; static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA; static uint32_t timerA = millis() + intervalA;
if (millis() > timerA) if (millis() > timerA)
@@ -78,7 +76,7 @@ void loop()
} }
// ============= client B publish ================ // ============= client B publish ================
static const int intervalB = 30000; // will send topic each 5000 ms static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB; static uint32_t timerB = millis() + intervalB;
if (millis() > timerB) if (millis() > timerB)

View File

@@ -16,6 +16,9 @@
#include <my_credentials.h> #include <my_credentials.h>
static float temp=19;
static MqttClient client;
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
@@ -29,8 +32,18 @@ void setup()
{ delay(500); Serial << '.'; } { delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
client.connect("192.168.1.40", 1883); // Put here your broker ip / port
} }
void loop() void loop()
{ {
client.loop();
delay(1000);
temp += (random(100)>50 ? 0.1 : -0.1);
client.publish("sensor/temperature", String(temp));
} }

View File

@@ -1,9 +1,10 @@
#define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming #include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <map> #include <map>
/** /**
* Local broker that accept connections * Console allowing to make any kind of test.
* *
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
@@ -25,11 +26,13 @@
std::string topic="sensor/temperature"; std::string topic="sensor/temperature";
MqttBroker broker(1883);
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; } { Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; }
std::map<std::string, MqttClient*> clients;
std::map<std::string, MqttBroker*> brokers;
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
@@ -46,17 +49,31 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin(); MqttBroker* broker = new MqttBroker(1883);
broker->begin();
brokers["broker"] = broker;
} }
std::string getword(std::string& str, const char* if_empty=nullptr) int getint(std::string& str, const int if_empty=0, char sep=' ')
{ {
std::string sword; std::string sword;
while(str.length() && str[0]!=' ') while(str.length() && str[0]!=sep)
{ {
sword += str[0]; str.erase(0,1); sword += str[0]; str.erase(0,1);
} }
while(str[0]==' ') str.erase(0,1); while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) sword=if_empty;
return atoi(sword.c_str());
}
std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string sword;
while(str.length() && str[0]!=sep)
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==sep) str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty; if (if_empty and sword.length()==0) return if_empty;
return sword; return sword;
} }
@@ -128,7 +145,7 @@ class automatic
autop->bon=false; autop->bon=false;
else if (s=="interval") else if (s=="interval")
{ {
int32_t i=atol(getword(cmd).c_str()); int32_t i=getint(cmd);
if (i) if (i)
autop->interval(atol(s.c_str())); autop->interval(atol(s.c_str()));
else else
@@ -181,35 +198,12 @@ bool compare(std::string s, const char* cmd)
return strncmp(cmd, s.c_str(), s.length())==0; return strncmp(cmd, s.c_str(), s.length())==0;
} }
std::map<std::string, MqttClient*> clients;
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false)
{
std::string s=getword(cmd);
bool found = clients.find(s) != clients.end();
if (canBeNull && found==false)
{
cmd += ' ' + s;
}
if (found or canBeNull)
{
MqttClient* publish = publish = clients[s];
func(cmd, publish);
}
else
{
Serial << "client not found (" << s.c_str() << ")" << endl;
cmd="";
}
}
void loop() void loop()
{ {
broker.loop(); for(auto it: brokers)
it.second->loop();
for(auto it: clients) for(auto it: clients)
it.second->loop(); it.second->loop();
@@ -223,90 +217,209 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "------------------------------------------------------" << endl; Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd;
if (cmd=="!")
cmd=last_cmd;
else
last_cmd=cmd;
while(cmd.length()) while(cmd.length())
{ {
std::string s = getword(cmd); std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos)
{
s=getword(cmd, nullptr, '.');
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
s = getword(cmd);
if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
clients.erase(s);
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
Serial << (int32_t)it.second << '/' << (int32_t)broker << endl;
if (broker != it.second) continue;
Serial << "deleted" << endl;
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect")) if (compare(s,"connect"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) Serial << "NYI" << endl;
{ publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); });
}
else if (compare(s,"publish"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->publish(getword(cmd, topic.c_str())); });
}
else if (compare(s,"subscribe"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->subscribe(getword(cmd, topic.c_str())); });
} }
else if (compare(s, "view")) else if (compare(s, "view"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) broker->dump();
{ publish->dump(); }); }
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getword(cmd,"192.168.1.40").c_str(), 1883);
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
auto ok=client->publish(getword(cmd, topic.c_str()));
if (ok != MqttOk)
{
Serial << "## ERROR " << ok << endl;
}
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
} }
else if (compare(s, "auto")) else if (compare(s, "auto"))
{ {
clientCommand(cmd, [](std::string& cmd, MqttClient* publish) automatic::command(client, cmd);
{ automatic::command(publish, cmd); if (client == nullptr)
if (publish == nullptr)
cmd.clear(); cmd.clear();
}, true);
} }
else if (compare(s, "new")) else if (compare(s, "broker"))
{ {
std::string id=getword(cmd); std::string id=getword(cmd);
if (id.length()) if (id.length() or brokers.find(id)!=brokers.end())
{ {
MqttClient* client = new MqttClient(&broker); int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id); client->id(id);
clients[id]=client; clients[id]=client;
client->setCallback(onPublish); client->setCallback(onPublish);
client->subscribe(topic); client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
} }
else else
Serial << "missing id" << endl; Serial << "Missing or existing client name" << endl;
cmd+=" ls"; cmd+=" ls";
} }
else if (compare(s, "delete")) else if (compare(s, "ls") or compare(s, "view"))
{ {
s = getword(cmd); Serial << "--< " << clients.size() << " client/s. >--" << endl;
auto it=clients.find(s);
if (it != clients.end())
{
delete it->second;
clients.erase(it);
cmd+=" ls";
}
else
Serial << "Unknown client (" << s.c_str() << ")" << endl;
}
else if (compare(s, "ls"))
{
Serial << "main : " << clients.size() << " client/s." << endl;
for(auto it: clients) for(auto it: clients)
{ {
Serial << " "; it.second->dump(); Serial << " "; it.second->dump();
} }
broker.dump();
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
} }
else if (compare(s, "reset")) else if (compare(s, "reset"))
ESP.restart(); ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help")) else if (compare(s,"help"))
{ {
Serial << "syntax:" << endl; Serial << "syntax:" << endl;
Serial << " new/delete $id" << endl; Serial << " MqttBroker:" << endl;
Serial << " connect $id [ip]" << endl; Serial << " broker {name} {port} : create a new broker" << endl;
Serial << " subscribe $id [topic]" << endl; Serial << endl;
Serial << " publish $id [topic]" << endl; Serial << " MqttClient:" << endl;
Serial << " view $id " << endl; Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip]" << endl;
Serial << " name.subscribe [topic]" << endl;
Serial << " name.publish [topic]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help(); automatic::help();
Serial << endl; Serial << endl;
Serial << " help" << endl; Serial << " help" << endl;
Serial << " ls" << endl; Serial << " ls / ip / reset" << endl;
Serial << " reset" << endl; Serial << " ! repeat last command" << endl;
Serial << endl; Serial << endl;
Serial << " $id : name of the client." << endl; Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl; Serial << " default topic is '" << topic.c_str() << "'" << endl;
@@ -314,6 +427,8 @@ void loop()
} }
else else
{ {
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl; Serial << "Unknown command (" << s.c_str() << ")" << endl;
} }
} }

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.2", "version": "0.4.0",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,9 +1,9 @@
name=TinyMqtt name=TinyMqtt
version=0.2.0 version=0.4.0
author=HSaturn <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages and to jhost a broker in your ESP. It does support MQTT 3.1.1 without any QOS.
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*

View File

@@ -2,7 +2,7 @@
#include <map> #include <map>
#include <string> #include <string>
#include <string.h> #include <string.h>
#include <Streaming.h> // #include <Streaming.h>
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
/*** /***
@@ -36,7 +36,7 @@ class StringIndexer
{ {
strings[index].str = std::string(str, len); strings[index].str = std::string(str, len);
strings[index].used++; strings[index].used++;
Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl; // Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl;
return index; return index;
} }
} }
@@ -66,7 +66,7 @@ class StringIndexer
if (it->second.used == 0) if (it->second.used == 0)
{ {
strings.erase(it); strings.erase(it);
Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl; // Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl;
} }
} }
} }

View File

@@ -2,12 +2,6 @@
#include <sstream> #include <sstream>
#include <Streaming.h> #include <Streaming.h>
#if 1
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
void outstring(const char* prefix, const char*p, uint16_t len) void outstring(const char* prefix, const char*p, uint16_t len)
{ {
return; return;
@@ -20,10 +14,19 @@ MqttBroker::MqttBroker(uint16_t port) : server(port)
{ {
} }
MqttBroker::~MqttBroker()
{
while(clients.size())
{
delete clients[0];
}
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent) : parent(parent)
{ {
client = new_client ? new WiFiClient(new_client) : nullptr; client = new WiFiClient(new_client);
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
@@ -32,7 +35,7 @@ MqttClient::MqttClient(MqttBroker* parent)
{ {
client = nullptr; client = nullptr;
parent->addClient(this); if (parent) parent->addClient(this);
} }
MqttClient::~MqttClient() MqttClient::~MqttClient()
@@ -61,9 +64,9 @@ void MqttClient::connect(std::string broker, uint16_t port)
{ {
debug("cnx: closing"); debug("cnx: closing");
close(); close();
debug("cnx: closed");
if (client) delete client; if (client) delete client;
client = new WiFiClient; client = new WiFiClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); debug("cnx: connecting");
@@ -72,7 +75,7 @@ void MqttClient::connect(std::string broker, uint16_t port)
message.add(0x4); // Mqtt protocol version 3.1.1 message.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name message.add(0x0); // Connect flags TODO user / name
keep_alive = 1; keep_alive = 1; // TODO not configurable
message.add(0x00); // keep_alive message.add(0x00); // keep_alive
message.add((char)keep_alive); message.add((char)keep_alive);
message.add(clientId); message.add(clientId);
@@ -133,8 +136,10 @@ void MqttBroker::loop()
} }
} }
void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{ {
MqttError retval = MqttOk;
debug("publish "); debug("publish ");
int i=0; int i=0;
for(auto client: clients) for(auto client: clients)
@@ -150,7 +155,7 @@ void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessa
if (source == broker) // broker -> clients if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // clients -> broker
broker->publish(topic, msg); retval=broker->publish(topic, msg);
} }
else // Disconnected: R7 else // Disconnected: R7
{ {
@@ -162,6 +167,7 @@ void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessa
if (doit) client->publish(topic, msg); if (doit) client->publish(topic, msg);
debug(""); debug("");
} }
return retval;
} }
bool MqttBroker::compareString( bool MqttBroker::compareString(
@@ -174,10 +180,10 @@ bool MqttBroker::compareString(
return *good==0; return *good==0;
} }
void MqttMessage::getString(char* &buffer, uint16_t& len) void MqttMessage::getString(const char* &buff, uint16_t& len)
{ {
len = (buffer[0]<<8)|(buffer[1]); len = (buff[0]<<8)|(buff[1]);
buffer+=2; buff+=2;
} }
void MqttClient::clientAlive(uint32_t more_seconds) void MqttClient::clientAlive(uint32_t more_seconds)
@@ -198,8 +204,9 @@ void MqttClient::loop()
{ {
debug("timeout client"); debug("timeout client");
close(); close();
debug("closed");
} }
else else if (client && client->connected())
{ {
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2); client->write((uint8_t*)(&pingreq), 2);
@@ -226,7 +233,7 @@ void MqttClient::processMessage()
std::string s; std::string s;
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; // Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl;
auto header = message.getVHeader(); auto header = message.getVHeader();
char* payload; const char* payload;
uint16_t len; uint16_t len;
bool bclose=true; bool bclose=true;
@@ -382,22 +389,25 @@ bool Topic::matches(const Topic& topic) const
} }
// publish from local client // publish from local client
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{ {
message.create(MqttMessage::Publish); MqttMessage msg;
message.add(topic); msg.create(MqttMessage::Publish);
message.add(payload, pay_length); msg.add(topic);
msg.add(payload, pay_length, false);
if (parent) if (parent)
parent->publish(this, topic, message); return parent->publish(this, topic, msg);
else if (client) else if (client)
publish(topic, message); msg.sendTo(this);
else else
Serial << " Should not happen" << endl; return MqttNowhereToSend;
} }
// republish a received publish if it matches any in subscriptions // republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg) MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
{ {
MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
@@ -411,24 +421,24 @@ void MqttClient::publish(const Topic& topic, MqttMessage& msg)
} }
else if (callback) else if (callback)
{ {
callback(this, topic, nullptr, 0); // TODO callback(this, topic, nullptr, 0); // TODO Payload
} }
} }
Serial << endl; Serial << endl;
} }
return retval;
} }
void MqttMessage::reset() void MqttMessage::reset()
{ {
curr=buffer; buffer.clear();
*curr=0; // Type Unknown
state=FixedHeader; state=FixedHeader;
size=0; size=0;
} }
void MqttMessage::incoming(char in_byte) void MqttMessage::incoming(char in_byte)
{ {
*curr++ = in_byte; buffer += in_byte;
switch(state) switch(state)
{ {
case FixedHeader: case FixedHeader:
@@ -443,7 +453,7 @@ void MqttMessage::incoming(char in_byte)
} }
else if ((in_byte & 0x80) == 0) else if ((in_byte & 0x80) == 0)
{ {
vheader = curr; vheader = buffer.length();
if (size==0) if (size==0)
state = Complete; state = Complete;
else else
@@ -464,22 +474,24 @@ void MqttMessage::incoming(char in_byte)
break; break;
case Complete: case Complete:
default: default:
curr--;
Serial << "Spurious " << _HEX(in_byte) << endl; Serial << "Spurious " << _HEX(in_byte) << endl;
state = Error; reset();
break; break;
} }
if (curr-buffer > 250) if (buffer.length() > MaxBufferLength) // TODO magic 256 ?
{ {
debug("Spurious byte " << _HEX(in_byte)); debug("Too long " << state);
curr=buffer; reset();
} }
} }
void MqttMessage::add(const char* p, size_t len) void MqttMessage::add(const char* p, size_t len, bool addLength)
{
if (addLength)
{ {
incoming(len>>8); incoming(len>>8);
incoming(len & 0xFF); incoming(len & 0xFF);
}
while(len--) incoming(*p++); while(len--) incoming(*p++);
} }
@@ -496,29 +508,26 @@ void MqttMessage::encodeLength(char* msb, int length)
void MqttMessage::sendTo(MqttClient* client) void MqttMessage::sendTo(MqttClient* client)
{ {
if (curr-buffer-2 >= 0) if (buffer.size()>2)
{ {
encodeLength(buffer+1, curr-buffer-2); encodeLength(&buffer[1], buffer.size()-2);
// hexdump("snd"); // hexdump("snd");
client->write(buffer, curr-buffer); client->write(&buffer[0], buffer.size());
} }
else else
{ {
Serial << "??? Invalid send" << endl; Serial << "??? Invalid send" << endl;
Serial << (long)end() << "-" << (long)buffer << endl;
} }
} }
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const
{ {
if (prefix) Serial << prefix << ' '; if (prefix) Serial << prefix << ' ';
Serial << (long)buffer << "-" << (long)curr << " : "; Serial << "size(" << buffer.size() << ") : ";
const char* p=buffer; for(const char chr: buffer)
while(p!=curr)
{ {
if (*p<16) Serial << '0'; if (chr<16) Serial << '0';
Serial << _HEX(*p) << ' '; Serial << _HEX(chr) << ' ';
p++;
} }
Serial << endl; Serial << endl;
} }

View File

@@ -4,7 +4,18 @@
#include <string> #include <string>
#include "StringIndexer.h" #include "StringIndexer.h"
#define MaxBufferLength 255 #ifdef TINY_MQTT_DEBUG
#include <Streaming.h>
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
enum MqttError
{
MqttOk = 0,
MqttNowhereToSend=1,
};
class Topic : public IndexedString class Topic : public IndexedString
{ {
@@ -21,6 +32,7 @@ class Topic : public IndexedString
class MqttClient; class MqttClient;
class MqttMessage class MqttMessage
{ {
const uint16_t MaxBufferLength = 255;
public: public:
enum Type enum Type
{ {
@@ -48,18 +60,18 @@ class MqttMessage
MqttMessage(Type t) { create(t); } MqttMessage(Type t) { create(t); }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len); void add(const char* p, size_t len, bool addLength=true );
void add(const std::string& s) { add(s.c_str(), s.length()); } void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); } void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; } const char* end() const { return &buffer[0]+buffer.size(); }
char* end() const { return curr; } const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return curr-buffer; } uint16_t length() const { return buffer.size(); }
void reset(); void reset();
// buff is MSB/LSB/STRING // buff is MSB/LSB/STRING
// output buff+=2, len=length(str) // output buff+=2, len=length(str)
void getString(char* &buff, uint16_t& len); static void getString(const char* &buff, uint16_t& len);
Type type() const Type type() const
@@ -69,9 +81,9 @@ class MqttMessage
void create(Type type) void create(Type type)
{ {
buffer[0]=type; buffer=(char)type;
curr=buffer+2; buffer+='\0';
vheader=curr; vheader=2;
size=0; size=0;
state=Create; state=Create;
} }
@@ -81,9 +93,8 @@ class MqttMessage
private: private:
void encodeLength(char* msb, int length); void encodeLength(char* msb, int length);
char buffer[256]; // TODO why 256 ? (should be replaced by a std::string) std::string buffer;
char* vheader; uint8_t vheader;
char* curr;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
State state; State state;
}; };
@@ -104,13 +115,16 @@ class MqttClient
}; };
public: public:
MqttClient(MqttBroker*); MqttClient(MqttBroker*);
MqttClient() : MqttClient(nullptr) {};
~MqttClient(); ~MqttClient();
void connect(MqttBroker* parent); void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port); void connect(std::string broker, uint16_t port);
bool connected() { return client==nullptr || client->connected(); } bool connected() { return
(parent!=nullptr and client==nullptr) or
(client and client->connected()); }
void write(const char* buf, size_t length) void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); } { if (client) client->write(buf, length); }
@@ -122,9 +136,10 @@ class MqttClient
void setCallback(CallBack fun) {callback=fun; }; void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world // Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length); MqttError publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); }
void publish(const Topic& t) { publish(t, nullptr, 0);}; MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); } void subscribe(Topic topic) { subscriptions.insert(topic); }
void unsubscribe(Topic& topic); void unsubscribe(Topic& topic);
@@ -133,6 +148,7 @@ class MqttClient
// TODO seems to be useless // TODO seems to be useless
bool isLocal() const { return client == nullptr; } bool isLocal() const { return client == nullptr; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
@@ -146,13 +162,13 @@ class MqttClient
} }
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif
private: private:
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg); MqttError publish(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds); void clientAlive(uint32_t more_seconds);
void processMessage(); void processMessage();
@@ -162,33 +178,18 @@ class MqttClient
uint32_t keep_alive; uint32_t keep_alive;
uint32_t alive; uint32_t alive;
MqttMessage message; MqttMessage message;
// TODO having a pointer on MqttBroker may produce larger binaries
// due to unecessary function linked if ever parent is not used
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
}; };
/***********************************************
* R1 - accept external cnx
* R2 - allows all clients pusblish to go outside
* R3 - allows ext publish to all clients
* R4 - allows local publish to local clients
* R5 - tries to connect elsewhere (*)
* R6 - disconnect external clients
* R7 - allows all publish to go everywhere
* ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* ================================================+
* | connected | not connected |
* -------------+---------------+------------------+
* proxy broker | R2 R3 R5 R6 | R5 R7 |
* normal broker| R2 R3 R5 R6 | R1 R5 R7 |
* -------------+---------------+------------------+
*
*/
class MqttBroker class MqttBroker
{ {
enum State enum State
@@ -200,6 +201,7 @@ class MqttBroker
public: public:
// TODO limit max number of clients // TODO limit max number of clients
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
~MqttBroker();
void begin() { server.begin(); } void begin() { server.begin(); }
void loop(); void loop();
@@ -209,15 +211,17 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883); void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
Serial << "broker: " << clients.size() << " client/s" << endl; Serial << clients.size() << " client/s" << endl;
for(auto client: clients) for(auto client: clients)
{ {
Serial << " "; Serial << " ";
client->dump(); client->dump();
} }
} }
#endif
private: private:
friend class MqttClient; friend class MqttClient;
@@ -229,7 +233,7 @@ class MqttBroker
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
// For clients that are added not by the broker itself // For clients that are added not by the broker itself
void addClient(MqttClient* client); void addClient(MqttClient* client);