Compare commits

...

11 Commits
0.5.1 ... 0.6.0

Author SHA1 Message Date
hsaturn
470cde62da Version 0.6.0 2021-03-27 10:26:41 +01:00
hsaturn
3fb9b6317d README.md modified (typo) 2021-03-27 02:00:49 +01:00
hsaturn
ee9ad93bfd README.md modified 2021-03-27 02:00:00 +01:00
hsaturn
0b1a932244 MqttClient resubscribe on reconnect 2021-03-27 01:55:03 +01:00
hsaturn
972759237c Speed and stability improved 2021-03-27 01:40:49 +01:00
hsaturn
cb00d7f82a MqttClient / UnSubscribe message implemented 2021-03-27 01:39:37 +01:00
hsaturn
0b735d22a5 Less debug as the code is more stable now 2021-03-27 01:38:32 +01:00
hsaturn
9178aac02c MqttClient client length augmented to 60 (was not passing MqttBox tests 2021-03-26 01:59:08 +01:00
hsaturn
c706fbcff2 Update readme 2021-03-26 01:52:31 +01:00
hsaturn
a0c41a0ccb Smart ip feature for connect 2021-03-26 01:51:30 +01:00
hsaturn
b780dcf99c test fix, was unable to use set when replacements occurs 2021-03-26 01:10:33 +01:00
7 changed files with 139 additions and 54 deletions

View File

@@ -2,14 +2,17 @@
![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt) ![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![](https://img.shields.io/github/issues/hsaturn/TinyMqtt) ![](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/paltform-ESP8266-green) ![](https://img.shields.io/badge/platform-ESP8266-green)
![](https://img.shields.io/github/license/hsaturn/TinyMqtt) ![](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow) ![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small and very capable Mqtt Broker and Client ESP 8266 is a small, fast and capable Mqtt Broker and Client
## Features ## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client - Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported - Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode) - Standalone (can work without WiFi) (degraded/local mode)
@@ -26,10 +29,12 @@ no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~ * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* ~~MqttClient does not support more than one subscription at time~~ * ~~MqttClient does not support more than one subscription at time~~
* MqttClient auto re-subscribe * ~~MqttClient auto re-subscribe (::resubscribe works bad on broker.emqx.io)~~
* MqttClient auto reconnection * MqttClient auto reconnection
* MqttClient does not callback payload... * ~~MqttClient unsubscribe~~
* MqttClient does not sent payload to callback...
* MqttClient user/password * MqttClient user/password
* Wildcards (I may implement only # as I'm not interrested by a clever and cpu consuming matching)
## Quickstart ## Quickstart

View File

@@ -1,9 +1,9 @@
// vim: ts=30 // vim: ts=40
Exemple of commands that can be sent via the serial monitor to tinymqtt-test Exemple of commands that can be sent via the serial monitor to tinymqtt-test
---------------------------------------------------------------------------- ----------------------------------------------------------------------------
Commands can usually be abbreviated to their first letters. Commands can usually be abbreviated to their first letters.
ex: cl for client, a / a.con / a.sub / a.p for publish. ex: cl for client, a / a.con / a.sub / a.p for publish.
--------
set name value set variable name to value (later replaced) set name value set variable name to value (later replaced)
set name if no value, then var is erased set name if no value, then var is erased
@@ -17,9 +17,12 @@ a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic a.subscribe topic subscribes to a topic
delete a destroy the client delete a destroy the client
---------------------------------------------------- * note, if 'server' is a number, then it replaces the end of the local ip.
i.e. if local ip is 192.168.1.10, connect 2.35 becomes 192.168.2.35
--------
example: example:
--------
client c client c
c.connect broker.emqx.io c.connect broker.emqx.io
@@ -30,5 +33,7 @@ c.publish topic 15
c.publish topic 20 c.publish topic 20
macro exansion example macro exansion example
----------------------
set temp publish sensor/temperature set temp publish sensor/temperature
c.temp 20 -> c.publish sensor/temperature 20 c.temp 20 -> c.publish sensor/temperature 20

View File

@@ -1,6 +1,7 @@
#define TINY_MQTT_DEBUG #define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h> #include <MqttStreaming.h>
#include <sstream>
#include <map> #include <map>
/** /**
@@ -78,6 +79,46 @@ std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' '
return sword; return sword;
} }
bool isaddr(std::string s)
{
if (s.length()==0 or s.length()>3) return false;
for(char c: s)
if (c<'0' or c>'9') return false;
return true;
}
std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string addr=getword(str, if_empty, sep);
std::string ip=addr;
std::vector<std::string> build;
bool ok=true;
while(ip.length())
{
std::string b=getword(ip,nullptr,'.');
if (isaddr(b) && build.size()<4)
{
build.push_back(b);
}
else
return addr;
}
IPAddress local=WiFi.localIP();
addr="";
while(build.size()!=4)
{
std::stringstream b;
b << (int)local[3-build.size()];
build.insert(build.begin(), b.str());
}
for(std::string s: build)
{
if (addr.length()) addr += '.';
addr += s;
}
return addr;
}
std::map<std::string, std::string> vars; std::map<std::string, std::string> vars;
std::set<std::string> commands = { std::set<std::string> commands = {
@@ -270,6 +311,12 @@ using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void loop() void loop()
{ {
static long count;
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers) for(auto it: brokers)
it.second->loop(); it.second->loop();
@@ -293,8 +340,7 @@ void loop()
else else
last_cmd=cmd; last_cmd=cmd;
replaceVars(cmd); if (cmd.substr(0,3)!="set") replaceVars(cmd);
Serial << "---------------@[ " << cmd.c_str() << " ]--------------" << endl;
while(cmd.length()) while(cmd.length())
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk;
@@ -390,7 +436,7 @@ void loop()
{ {
if (compare(s,"connect")) if (compare(s,"connect"))
{ {
client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl; Serial << (client->connected() ? "connected." : "not connected") << endl;
} }
else if (compare(s,"publish")) else if (compare(s,"publish"))

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.5.1", "version": "0.6.0",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,5 +1,5 @@
name=TinyMqtt name=TinyMqtt
version=0.5.1 version=0.6.0
author=Francois BIOT, HSaturn, <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.

View File

@@ -146,8 +146,10 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
for(auto client: clients) for(auto client: clients)
{ {
i++; i++;
#if TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Broker is connected if (broker && broker->connected()) // Broker is connected
{ {
@@ -166,7 +168,9 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
// All is allowed // All is allowed
doit = true; doit = true;
} }
#if TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
#endif
if (doit) retval = client->publish(topic, msg); if (doit) retval = client->publish(topic, msg);
debug(""); debug("");
@@ -232,6 +236,26 @@ void MqttClient::loop()
} }
} }
void MqttClient::resubscribe()
{
// TODO resubscription limited to 256 bytes
if (subscriptions.size())
{
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
for(auto topic: subscriptions)
{
msg.add(topic);
msg.add(0); // TODO qos
}
msg.sendTo(this); // TODO return value
}
}
MqttError MqttClient::subscribe(Topic topic, uint8_t qos) MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{ {
debug("subsribe(" << topic.c_str() << ")"); debug("subsribe(" << topic.c_str() << ")");
@@ -248,7 +272,7 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
msg.add(0); msg.add(0);
msg.add(0); msg.add(0);
msg.add(topic.str()); msg.add(topic);
msg.add(qos); msg.add(qos);
ret = msg.sendTo(this); ret = msg.sendTo(this);
@@ -257,15 +281,18 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
return ret; return ret;
} }
long MqttClient::counter=0;
void MqttClient::processMessage() void MqttClient::processMessage()
{ {
std::string error; counter++;
std::string s; #if TINY_MQTT_DEBUG
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{ {
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
message.hexdump("Incoming"); // message.hexdump("Incoming");
} }
#endif
auto header = message.getVHeader(); auto header = message.getVHeader();
const char* payload; const char* payload;
uint16_t len; uint16_t len;
@@ -295,20 +322,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
// ClientId // ClientId
message.getString(payload, len); message.getString(payload, len);
debug("client id len=" << len);
if (len>30)
{
Serial << '(';
for(int i=0; i<30; i++)
{
if (i%5==0) Serial << ' ';
char c=*(header+i);
Serial << (c < 32 ? '.' : c);
}
Serial << " )" << endl;
debug("Bad client id length");
break;
}
clientId = std::string(payload, len); clientId = std::string(payload, len);
payload += len; payload += len;
@@ -348,9 +361,9 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break; break;
case MqttMessage::Type::ConnAck: case MqttMessage::Type::ConnAck:
// TODO what more on connack ?
mqtt_connected = true; mqtt_connected = true;
bclose = false; bclose = false;
resubscribe();
break; break;
case MqttMessage::Type::SubAck: case MqttMessage::Type::SubAck:
@@ -380,6 +393,7 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
case MqttMessage::Type::UnSubscribe:
{ {
if (!mqtt_connected) break; if (!mqtt_connected) break;
payload = header+2; payload = header+2;
@@ -391,7 +405,15 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
debug( " topic (" << std::string(payload, len) << ')'); debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len); outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len)); // subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len)); Topic topic(payload, len);
if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe)
subscriptions.insert(topic);
else
{
auto it=subscriptions.find(topic);
if (it != subscriptions.end())
subscriptions.erase(it);
}
payload += len; payload += len;
uint8_t qos = *payload++; uint8_t qos = *payload++;
debug(" qos=" << qos); debug(" qos=" << qos);
@@ -437,7 +459,7 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
if (bclose) if (bclose)
{ {
Serial << "*************** Error msg 0x" << _HEX(message.type()); Serial << "*************** Error msg 0x" << _HEX(message.type());
if (error.length()) Serial << ':' << error.c_str(); message.hexdump("-------ERROR ------");
Serial << endl; Serial << endl;
close(); close();
} }
@@ -477,10 +499,9 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ';
if (subscription.matches(topic)) if (subscription.matches(topic))
{ {
Serial << " match/send"; debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ');
if (client) if (client)
{ {
retval = msg.sendTo(this); retval = msg.sendTo(this);
@@ -490,7 +511,6 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
callback(this, topic, nullptr, 0); // TODO Payload callback(this, topic, nullptr, 0); // TODO Payload
} }
} }
Serial << endl;
} }
return retval; return retval;
} }
@@ -522,8 +542,15 @@ void MqttMessage::incoming(char in_byte)
vheader = buffer.length(); vheader = buffer.length();
if (size==0) if (size==0)
state = Complete; state = Complete;
else if (size > 500) // TODO magic
{
state = Error;
}
else else
{
buffer.reserve(size);
state = VariableHeader; state = VariableHeader;
}
} }
break; break;
case VariableHeader: case VariableHeader:
@@ -541,6 +568,7 @@ void MqttMessage::incoming(char in_byte)
case Complete: case Complete:
default: default:
Serial << "Spurious " << _HEX(in_byte) << endl; Serial << "Spurious " << _HEX(in_byte) << endl;
hexdump("spurious");
reset(); reset();
break; break;
} }
@@ -578,7 +606,7 @@ MqttError MqttMessage::sendTo(MqttClient* client)
{ {
debug("sending " << buffer.size() << " bytes"); debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2); encodeLength(&buffer[1], buffer.size()-2);
hexdump("snd"); // hexdump("snd");
client->write(&buffer[0], buffer.size()); client->write(&buffer[0], buffer.size());
} }
else else

View File

@@ -5,11 +5,11 @@
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h> #include <MqttStreaming.h>
#define TINY_MQTT_DEBUG #if 0
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#define TINY_MQTT_DEBUG 1
#else #else
#define TINY_MQTT_DEBUG 0
#define debug(what) {} #define debug(what) {}
#endif #endif
@@ -39,15 +39,16 @@ class MqttMessage
public: public:
enum Type enum Type
{ {
Unknown = 0, Unknown = 0,
Connect = 0x10, Connect = 0x10,
ConnAck = 0x20, ConnAck = 0x20,
Publish = 0x30, Publish = 0x30,
PubAck = 0x40, PubAck = 0x40,
Subscribe = 0x80, Subscribe = 0x80,
SubAck = 0x90, SubAck = 0x90,
PingReq = 0xC0, UnSubscribe = 0xA0,
PingResp = 0xD0, PingReq = 0xC0,
PingResp = 0xD0,
}; };
enum State enum State
{ {
@@ -152,7 +153,6 @@ class MqttClient
// TODO seems to be useless // TODO seems to be useless
bool isLocal() const { return client == nullptr; } bool isLocal() const { return client == nullptr; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
uint32_t ms=millis(); uint32_t ms=millis();
@@ -160,9 +160,9 @@ class MqttClient
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); << " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive; Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << " cnx " << (client && client->connected()); Serial << " cnx " << (client && client->connected());
Serial << " [";
message.hexdump("entrant msg"); message.hexdump("entrant msg");
bool c=false; bool c=false;
Serial << " [";
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
@@ -172,9 +172,12 @@ class MqttClient
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif
static long counter; // Number of messages sent
private: private:
void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
@@ -221,7 +224,6 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883); void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
Serial << clients.size() << " client/s" << endl; Serial << clients.size() << " client/s" << endl;
@@ -231,7 +233,6 @@ class MqttBroker
client->dump(); client->dump();
} }
} }
#endif
private: private:
friend class MqttClient; friend class MqttClient;