Compare commits

...

15 Commits
0.5.0 ... 0.6.0

Author SHA1 Message Date
hsaturn
470cde62da Version 0.6.0 2021-03-27 10:26:41 +01:00
hsaturn
3fb9b6317d README.md modified (typo) 2021-03-27 02:00:49 +01:00
hsaturn
ee9ad93bfd README.md modified 2021-03-27 02:00:00 +01:00
hsaturn
0b1a932244 MqttClient resubscribe on reconnect 2021-03-27 01:55:03 +01:00
hsaturn
972759237c Speed and stability improved 2021-03-27 01:40:49 +01:00
hsaturn
cb00d7f82a MqttClient / UnSubscribe message implemented 2021-03-27 01:39:37 +01:00
hsaturn
0b735d22a5 Less debug as the code is more stable now 2021-03-27 01:38:32 +01:00
hsaturn
9178aac02c MqttClient client length augmented to 60 (was not passing MqttBox tests 2021-03-26 01:59:08 +01:00
hsaturn
c706fbcff2 Update readme 2021-03-26 01:52:31 +01:00
hsaturn
a0c41a0ccb Smart ip feature for connect 2021-03-26 01:51:30 +01:00
hsaturn
b780dcf99c test fix, was unable to use set when replacements occurs 2021-03-26 01:10:33 +01:00
hsaturn
f122d5e902 relase 0.5.1 2021-03-25 01:26:27 +01:00
hsaturn
d63793cf77 Avoid to use message member, minor changes 2021-03-25 01:26:03 +01:00
hsaturn
8386779e92 tinytest great enhancements 2021-03-25 01:24:46 +01:00
hsaturn
1b988a06a2 Relase 0.5.0 2021-03-24 21:21:46 +01:00
7 changed files with 295 additions and 88 deletions

View File

@@ -2,14 +2,17 @@
![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt) ![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![](https://img.shields.io/github/issues/hsaturn/TinyMqtt) ![](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/paltform-ESP8266-green) ![](https://img.shields.io/badge/platform-ESP8266-green)
![](https://img.shields.io/github/license/hsaturn/TinyMqtt) ![](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow) ![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small and very capable Mqtt Broker and Client ESP 8266 is a small, fast and capable Mqtt Broker and Client
## Features ## Features
- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription)
- Act as as a mqtt broker and/or a mqtt client - Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported - Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode) - Standalone (can work without WiFi) (degraded/local mode)
@@ -26,10 +29,12 @@ no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~ * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* ~~MqttClient does not support more than one subscription at time~~ * ~~MqttClient does not support more than one subscription at time~~
* MqttClient auto re-subscribe * ~~MqttClient auto re-subscribe (::resubscribe works bad on broker.emqx.io)~~
* MqttClient auto reconnection * MqttClient auto reconnection
* MqttClient does not callback payload... * ~~MqttClient unsubscribe~~
* MqttClient does not sent payload to callback...
* MqttClient user/password * MqttClient user/password
* Wildcards (I may implement only # as I'm not interrested by a clever and cpu consuming matching)
## Quickstart ## Quickstart

View File

@@ -1,6 +1,15 @@
// vim: ts=30 // vim: ts=40
Exemple of commands that can be sent via the serial monitor to tinymqtt-test Exemple of commands that can be sent via the serial monitor to tinymqtt-test
---------------------------------------------------------------------------- ----------------------------------------------------------------------------
Commands can usually be abbreviated to their first letters.
ex: cl for client, a / a.con / a.sub / a.p for publish.
--------
set name value set variable name to value (later replaced)
set name if no value, then var is erased
set view all vars
reserved keywords are forbidden
client a starts a client (not connected no internal broker) client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883 a.connect [server][port][alive] connects the client, default port=1883
@@ -8,4 +17,23 @@ a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic a.subscribe topic subscribes to a topic
delete a destroy the client delete a destroy the client
* note, if 'server' is a number, then it replaces the end of the local ip.
i.e. if local ip is 192.168.1.10, connect 2.35 becomes 192.168.2.35
--------
example:
--------
client c
c.connect broker.emqx.io
set topic sensor/temperature
c.subscribe topic
c.publish topic 15
c.publish topic 20
macro exansion example
----------------------
set temp publish sensor/temperature
c.temp 20 -> c.publish sensor/temperature 20

View File

@@ -1,6 +1,7 @@
#define TINY_MQTT_DEBUG #define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <MqttStreaming.h> #include <MqttStreaming.h>
#include <sstream>
#include <map> #include <map>
/** /**
@@ -78,6 +79,111 @@ std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' '
return sword; return sword;
} }
bool isaddr(std::string s)
{
if (s.length()==0 or s.length()>3) return false;
for(char c: s)
if (c<'0' or c>'9') return false;
return true;
}
std::string getip(std::string& str, const char* if_empty=nullptr, char sep=' ')
{
std::string addr=getword(str, if_empty, sep);
std::string ip=addr;
std::vector<std::string> build;
bool ok=true;
while(ip.length())
{
std::string b=getword(ip,nullptr,'.');
if (isaddr(b) && build.size()<4)
{
build.push_back(b);
}
else
return addr;
}
IPAddress local=WiFi.localIP();
addr="";
while(build.size()!=4)
{
std::stringstream b;
b << (int)local[3-build.size()];
build.insert(build.begin(), b.str());
}
for(std::string s: build)
{
if (addr.length()) addr += '.';
addr += s;
}
return addr;
}
std::map<std::string, std::string> vars;
std::set<std::string> commands = {
"auto", "broker", "client", "connect",
"create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "view"
};
void getCommand(std::string& search)
{
while(search[0]==' ') search.erase(0,1);
if (search.length()==0) return;
std::string matches;
int count=0;
for(std::string cmd: commands)
{
if (cmd.substr(0, search.length()) == search)
{
if (count) matches +=", ";
count++;
matches += cmd;
}
}
if (count==1)
search = matches;
else if (count>1)
{
Serial << "Ambiguous command: " << matches << endl;
search="";
}
}
void replace(const char* d, std::string& str, std::string srch, std::string to)
{
if (d[0] && d[1])
{
srch=d[0]+srch+d[1];
to=d[0]+to+d[1];
size_t pos = 0;
while((pos=str.find(srch, pos)) != std::string::npos)
{
str.erase(pos, srch.length());
str.insert(pos, to);
pos += to.length();
}
}
}
void replaceVars(std::string& cmd)
{
cmd = ' '+cmd+' ';
for(auto it: vars)
{
replace("..", cmd, it.first, it.second);
replace(". ", cmd, it.first, it.second);
replace(" .", cmd, it.first, it.second);
replace(" ", cmd, it.first, it.second);
}
cmd.erase(0, cmd.find_first_not_of(" "));
cmd.erase(cmd.find_last_not_of(" ")+1);
}
// publish at regular interval // publish at regular interval
class automatic class automatic
{ {
@@ -180,6 +286,7 @@ class automatic
{ {
Serial << " auto [$id] on/off" << endl; Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl; Serial << " auto [$id] view" << endl;
Serial << " auto [$id] interval [s]" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl; Serial << " auto [$id] create [millis] [topic]" << endl;
} }
@@ -204,6 +311,12 @@ using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void loop() void loop()
{ {
static long count;
if (MqttClient::counter != count)
{
Serial << "# " << MqttClient::counter << endl;
count = MqttClient::counter;
}
for(auto it: brokers) for(auto it: brokers)
it.second->loop(); it.second->loop();
@@ -219,12 +332,15 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl; Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd; static std::string last_cmd;
if (cmd=="!") if (cmd=="!")
cmd=last_cmd; cmd=last_cmd;
else else
last_cmd=cmd; last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd);
while(cmd.length()) while(cmd.length())
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk;
@@ -235,27 +351,34 @@ void loop()
// client.function notation // client.function notation
// ("a.fun " becomes "fun a ") // ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos) if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{ {
s=getword(cmd, nullptr, '.'); s=getword(cmd, nullptr, '.');
if (clients.find(s) != clients.end()) if (s.length())
{ {
client = clients[s]; if (clients.find(s) != clients.end())
} {
else if (brokers.find(s) != brokers.end()) client = clients[s];
{ }
broker = brokers[s]; else if (brokers.find(s) != brokers.end())
} {
else broker = brokers[s];
{ }
Serial << "Unknown class (" << s.c_str() << ")" << endl; else
cmd=""; {
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
} }
} }
s = getword(cmd); s = getword(cmd);
if (compare(s, "delete")) if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{ {
if (client==nullptr && broker==nullptr) if (client==nullptr && broker==nullptr)
{ {
@@ -313,7 +436,7 @@ void loop()
{ {
if (compare(s,"connect")) if (compare(s,"connect"))
{ {
client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl; Serial << (client->connected() ? "connected." : "not connected") << endl;
} }
else if (compare(s,"publish")) else if (compare(s,"publish"))
@@ -384,6 +507,32 @@ void loop()
Serial << "Missing or existing client name" << endl; Serial << "Missing or existing client name" << endl;
cmd+=" ls"; cmd+=" ls";
} }
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view")) else if (compare(s, "ls") or compare(s, "view"))
{ {
Serial << "--< " << clients.size() << " client/s. >--" << endl; Serial << "--< " << clients.size() << " client/s. >--" << endl;
@@ -421,6 +570,7 @@ void loop()
Serial << endl; Serial << endl;
Serial << " help" << endl; Serial << " help" << endl;
Serial << " ls / ip / reset" << endl; Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl; Serial << " ! repeat last command" << endl;
Serial << endl; Serial << endl;
Serial << " $id : name of the client." << endl; Serial << " $id : name of the client." << endl;

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.4.0", "version": "0.6.0",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,5 +1,5 @@
name=TinyMqtt name=TinyMqtt
version=0.4.0 version=0.6.0
author=Francois BIOT, HSaturn, <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.

View File

@@ -69,18 +69,18 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); debug("cnx: connecting");
message.create(MqttMessage::Type::Connect); MqttMessage msg(MqttMessage::Type::Connect);
message.add("MQTT",4); msg.add("MQTT",4);
message.add(0x4); // Mqtt protocol version 3.1.1 msg.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name msg.add(0x0); // Connect flags TODO user / name
keep_alive = ka; // TODO not configurable keep_alive = ka;
message.add(0x00); // keep_alive msg.add(0x00); // keep_alive
message.add((char)keep_alive); msg.add((char)keep_alive);
message.add(clientId); msg.add(clientId);
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
message.sendTo(this); msg.sendTo(this);
message.reset(); msg.reset();
debug("cnx: mqtt sent " << (int32_t)parent); debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0); clientAlive(0);
@@ -146,8 +146,10 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
for(auto client: clients) for(auto client: clients)
{ {
i++; i++;
#if TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Broker is connected if (broker && broker->connected()) // Broker is connected
{ {
@@ -166,7 +168,9 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
// All is allowed // All is allowed
doit = true; doit = true;
} }
#if TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
#endif
if (doit) retval = client->publish(topic, msg); if (doit) retval = client->publish(topic, msg);
debug(""); debug("");
@@ -232,6 +236,26 @@ void MqttClient::loop()
} }
} }
void MqttClient::resubscribe()
{
// TODO resubscription limited to 256 bytes
if (subscriptions.size())
{
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
for(auto topic: subscriptions)
{
msg.add(topic);
msg.add(0); // TODO qos
}
msg.sendTo(this); // TODO return value
}
}
MqttError MqttClient::subscribe(Topic topic, uint8_t qos) MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{ {
debug("subsribe(" << topic.c_str() << ")"); debug("subsribe(" << topic.c_str() << ")");
@@ -248,7 +272,7 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
msg.add(0); msg.add(0);
msg.add(0); msg.add(0);
msg.add(topic.str()); msg.add(topic);
msg.add(qos); msg.add(qos);
ret = msg.sendTo(this); ret = msg.sendTo(this);
@@ -257,15 +281,18 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
return ret; return ret;
} }
long MqttClient::counter=0;
void MqttClient::processMessage() void MqttClient::processMessage()
{ {
std::string error; counter++;
std::string s; #if TINY_MQTT_DEBUG
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{ {
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
message.hexdump("Incoming"); // message.hexdump("Incoming");
} }
#endif
auto header = message.getVHeader(); auto header = message.getVHeader();
const char* payload; const char* payload;
uint16_t len; uint16_t len;
@@ -295,20 +322,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
// ClientId // ClientId
message.getString(payload, len); message.getString(payload, len);
debug("client id len=" << len);
if (len>30)
{
Serial << '(';
for(int i=0; i<30; i++)
{
if (i%5==0) Serial << ' ';
char c=*(header+i);
Serial << (c < 32 ? '.' : c);
}
Serial << " )" << endl;
debug("Bad client id length");
break;
}
clientId = std::string(payload, len); clientId = std::string(payload, len);
payload += len; payload += len;
@@ -339,21 +352,22 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl; Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl;
bclose = false; bclose = false;
mqtt_connected=true; mqtt_connected=true;
// Reuse received msg {
message.create(MqttMessage::Type::Connack); MqttMessage msg(MqttMessage::Type::ConnAck);
message.add(0); // Session present (not implemented) msg.add(0); // Session present (not implemented)
message.add(0); // Connection accepted msg.add(0); // Connection accepted
message.sendTo(this); msg.sendTo(this);
}
break; break;
case MqttMessage::Type::Connack: case MqttMessage::Type::ConnAck:
// TODO what more on connack ?
mqtt_connected = true; mqtt_connected = true;
bclose = false; bclose = false;
resubscribe();
break; break;
case MqttMessage::Type::Suback: case MqttMessage::Type::SubAck:
case MqttMessage::Type::Puback: case MqttMessage::Type::PubAck:
if (!mqtt_connected) break; if (!mqtt_connected) break;
// Ignore acks // Ignore acks
bclose = false; bclose = false;
@@ -379,6 +393,7 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
case MqttMessage::Type::UnSubscribe:
{ {
if (!mqtt_connected) break; if (!mqtt_connected) break;
payload = header+2; payload = header+2;
@@ -390,7 +405,15 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
debug( " topic (" << std::string(payload, len) << ')'); debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len); outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len)); // subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len)); Topic topic(payload, len);
if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe)
subscriptions.insert(topic);
else
{
auto it=subscriptions.find(topic);
if (it != subscriptions.end())
subscriptions.erase(it);
}
payload += len; payload += len;
uint8_t qos = *payload++; uint8_t qos = *payload++;
debug(" qos=" << qos); debug(" qos=" << qos);
@@ -429,11 +452,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
} }
break; break;
case MqttMessage::Type::PubAck:
if (!mqtt_connected) break;
bclose = false;
break;
default: default:
bclose=true; bclose=true;
break; break;
@@ -441,7 +459,7 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
if (bclose) if (bclose)
{ {
Serial << "*************** Error msg 0x" << _HEX(message.type()); Serial << "*************** Error msg 0x" << _HEX(message.type());
if (error.length()) Serial << ':' << error.c_str(); message.hexdump("-------ERROR ------");
Serial << endl; Serial << endl;
close(); close();
} }
@@ -462,8 +480,7 @@ bool Topic::matches(const Topic& topic) const
// publish from local client // publish from local client
MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{ {
MqttMessage msg; MqttMessage msg(MqttMessage::Publish);
msg.create(MqttMessage::Publish);
msg.add(topic); msg.add(topic);
msg.add(payload, pay_length, false); msg.add(payload, pay_length, false);
if (parent) if (parent)
@@ -482,10 +499,9 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ';
if (subscription.matches(topic)) if (subscription.matches(topic))
{ {
Serial << " match/send"; debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ');
if (client) if (client)
{ {
retval = msg.sendTo(this); retval = msg.sendTo(this);
@@ -495,7 +511,6 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
callback(this, topic, nullptr, 0); // TODO Payload callback(this, topic, nullptr, 0); // TODO Payload
} }
} }
Serial << endl;
} }
return retval; return retval;
} }
@@ -527,8 +542,15 @@ void MqttMessage::incoming(char in_byte)
vheader = buffer.length(); vheader = buffer.length();
if (size==0) if (size==0)
state = Complete; state = Complete;
else if (size > 500) // TODO magic
{
state = Error;
}
else else
{
buffer.reserve(size);
state = VariableHeader; state = VariableHeader;
}
} }
break; break;
case VariableHeader: case VariableHeader:
@@ -546,6 +568,7 @@ void MqttMessage::incoming(char in_byte)
case Complete: case Complete:
default: default:
Serial << "Spurious " << _HEX(in_byte) << endl; Serial << "Spurious " << _HEX(in_byte) << endl;
hexdump("spurious");
reset(); reset();
break; break;
} }
@@ -583,7 +606,7 @@ MqttError MqttMessage::sendTo(MqttClient* client)
{ {
debug("sending " << buffer.size() << " bytes"); debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2); encodeLength(&buffer[1], buffer.size()-2);
hexdump("snd"); // hexdump("snd");
client->write(&buffer[0], buffer.size()); client->write(&buffer[0], buffer.size());
} }
else else

View File

@@ -5,11 +5,11 @@
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h> #include <MqttStreaming.h>
#define TINY_MQTT_DEBUG #if 0
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#define TINY_MQTT_DEBUG 1
#else #else
#define TINY_MQTT_DEBUG 0
#define debug(what) {} #define debug(what) {}
#endif #endif
@@ -39,15 +39,16 @@ class MqttMessage
public: public:
enum Type enum Type
{ {
Unknown = 0, Unknown = 0,
Connect = 0x10, Connect = 0x10,
Connack = 0x20, ConnAck = 0x20,
Publish = 0x30, Publish = 0x30,
PubAck = 0x40, PubAck = 0x40,
Subscribe = 0x80, Subscribe = 0x80,
Suback = 0x90, SubAck = 0x90,
PingReq = 0xC0, UnSubscribe = 0xA0,
PingResp = 0xD0, PingReq = 0xC0,
PingResp = 0xD0,
}; };
enum State enum State
{ {
@@ -152,7 +153,6 @@ class MqttClient
// TODO seems to be useless // TODO seems to be useless
bool isLocal() const { return client == nullptr; } bool isLocal() const { return client == nullptr; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
uint32_t ms=millis(); uint32_t ms=millis();
@@ -160,9 +160,9 @@ class MqttClient
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); << " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive; Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << " cnx " << (client && client->connected()); Serial << " cnx " << (client && client->connected());
Serial << " [";
message.hexdump("entrant msg"); message.hexdump("entrant msg");
bool c=false; bool c=false;
Serial << " [";
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
@@ -172,9 +172,12 @@ class MqttClient
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif
static long counter; // Number of messages sent
private: private:
void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
@@ -221,7 +224,6 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883); void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
#ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
Serial << clients.size() << " client/s" << endl; Serial << clients.size() << " client/s" << endl;
@@ -231,7 +233,6 @@ class MqttBroker
client->dump(); client->dump();
} }
} }
#endif
private: private:
friend class MqttClient; friend class MqttClient;