Compare commits

...

12 Commits

Author SHA1 Message Date
Francois BIOT
e41452edf0 [fixes] Timeout and broker to broker modifications 2023-03-23 13:42:09 +01:00
Francois BIOT
bf84e29831 retain is coming git status! 2023-03-22 00:29:55 +01:00
Francois BIOT
0c7c830a26 Release 0.9.19 2023-03-11 18:51:08 +01:00
hsaturn
6e601228e6 Added compatibility with me-no-dev/ESPAsyncTCP@^1.2.2 2023-03-11 18:49:51 +01:00
hsaturn
46798ff0de Fix bug with Async Tcp 2023-03-11 18:47:41 +01:00
Francois BIOT
45fedf84c9 tinymqtt-tests.ino fix bad color after underrun 2023-02-24 00:18:34 +01:00
Francois BIOT
f9c8dca1e5 tinymqtt-tests fix prompt bug 2023-02-23 23:56:22 +01:00
Francois BIOT
7e1586c0b5 platformio, make tiny-tests example compilation ok 2023-02-23 23:08:30 +01:00
hsaturn
123c5a8fa5 Release 0.9.18 2023-02-23 20:43:27 +01:00
hsaturn
21fb01848d Release 0.9.17 2023-02-23 20:42:26 +01:00
hsaturn
66b1e71ee2 Fix depends 2023-02-23 20:36:10 +01:00
hsaturn
e5115087ea Release 0.9.16 2023-02-23 20:15:38 +01:00
9 changed files with 385 additions and 257 deletions

View File

@@ -24,6 +24,7 @@ TinyMqtt is a small, fast and capable Mqtt Broker and Client for Esp8266 / Esp32
## Features ## Features
- Async Wifi compatible (me-no-dev/ESPAsyncTCP@^1.2.2)
- Very fast broker I saw it re-sent 1000 topics per second for two - Very fast broker I saw it re-sent 1000 topics per second for two
clients that had subscribed (payload ~15 bytes ESP8266). No topic lost. clients that had subscribed (payload ~15 bytes ESP8266). No topic lost.
The max I've seen was 2k msg/s (1 client 1 subscription) The max I've seen was 2k msg/s (1 client 1 subscription)

View File

@@ -1,16 +1,17 @@
#!/bin/bash #!/bin/bash
current_version=$(git describe --tags --abbrev=0) current_version=$(git describe --tags --abbrev=0)
cp library.json.skeleton library.json if [ "$1" == "-d" ]; then
while ifs= read -r line; do do=0
name=$(echo "$line" | sed "s/=.*//g") shift
value=$(echo "$line" | cut -d= -f 2 | sed 's/"//g') else
sed -i "s/#$name/$value/g" library.json do=1
done < library.properties fi
if [ "$1" == "" ]; then if [ "$1" == "" ]; then
echo echo
echo "Syntax: $0 {new_version}" echo "Syntax: $0 [-d] {new_version}"
echo echo
echo " -d : dry run, generate json and update properties but do not run git commands"
echo ""
echo " Current version: $current_version" echo " Current version: $current_version"
echo echo
else else
@@ -23,15 +24,35 @@ else
grep $current_version library.properties grep $current_version library.properties
if [ "$?" == "0" ]; then if [ "$?" == "0" ]; then
sed -i "s/$current_version/$1/" library.properties sed -i "s/$current_version/$1/" library.properties
if [ 0 == 1 ]; then
cp library.json.skeleton library.json
while ifs= read -r line; do
name=$(echo "$line" | sed "s/=.*//g")
value=$(echo "$line" | cut -d= -f 2 | sed 's/"//g')
echo " Replacing $name in json"
if [ "$name" == "depends" ]; then
depends=$(echo "$value" | sed "s/,/ /g")
echo " Depends=$depends"
fi
echo " " sed -i "s@#$name@$value@g" library.json
sed -i "s@#$name@$value@g" library.json
done < library.properties
deps=""
for depend in $depends; do
if [ "$deps" != "" ]; then
deps="$deps, "
fi
deps="$deps'$depend' : '*'"
done
sed -i "s@#dependencies@$deps@g" library.json
sed -i "s/'/\"/g" library.json
if [ "$do" == "1" ]; then
git tag $1 git tag $1
git add library.properties git add library.properties
git add library.json git add library.json
git commit -m "Release $1" git commit -m "Release $1"
git push git push
git push --tags git push --tags
else
echo "No git operation made"
fi fi
else else
echo "Current version does not match library.property version, aborting" echo "Current version does not match library.property version, aborting"

View File

@@ -15,8 +15,6 @@
#include <string> #include <string>
#include <map> #include <map>
using string = TinyString;
bool echo_on = true; bool echo_on = true;
auto green = TinyConsole::green; auto green = TinyConsole::green;
auto red = TinyConsole::red; auto red = TinyConsole::red;
@@ -28,8 +26,11 @@ auto save_cursor = TinyConsole::save_cursor;
auto restore_cursor = TinyConsole::restore_cursor; auto restore_cursor = TinyConsole::restore_cursor;
auto erase_to_end = TinyConsole::erase_to_end; auto erase_to_end = TinyConsole::erase_to_end;
const char* ssid = ""; const char *ssid = "Freebox-786A2F";
const char* password = ""; const char *password = "usurpavi8dalum64lumine?";
void onCommand(const string &command);
void eval(string &cmd);
struct free_broker struct free_broker
{ {
@@ -43,8 +44,7 @@ struct free_broker
const std::map<string, free_broker> list = const std::map<string, free_broker> list =
{ {
{"mqtthq", {"public.mqtthq.com", 8083, "publish/subscribe"}}, {"mqtthq", {"public.mqtthq.com", 8083, "publish/subscribe"}},
{ "hivemq", { "broker.hivemq.com", 1883, "" }} {"hivemq", {"broker.hivemq.com", 1883, ""}}};
};
/** Very complex example /** Very complex example
* Console allowing to make any kind of test, * Console allowing to make any kind of test,
@@ -88,7 +88,8 @@ void setup()
delay(500); delay(500);
Console.cls(); Console.cls();
Console << endl << endl; Console << endl
<< endl;
Console << yellow Console << yellow
<< "***************************************************************" << endl; << "***************************************************************" << endl;
Console << "* Welcome to the TinyMqtt console" << endl; Console << "* Welcome to the TinyMqtt console" << endl;
@@ -99,7 +100,8 @@ void setup()
if (strlen(ssid) == 0) if (strlen(ssid) == 0)
Console << red << "* ERROR: You must modify ssid/password in order" << endl Console << red << "* ERROR: You must modify ssid/password in order" << endl
<< " to be able to connect to your Wifi network." << endl; << " to be able to connect to your Wifi network." << endl;
Console << endl << white; Console << endl
<< white;
Console << "Connecting to '" << ssid << "' "; Console << "Connecting to '" << ssid << "' ";
@@ -109,9 +111,13 @@ void setup()
WiFi.begin(ssid, password); WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) while (WiFi.status() != WL_CONNECTED)
{ Console << '-'; delay(500); } {
Console << '-';
delay(500);
}
Console << endl << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Console << endl
<< "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
const char *name = "tinytest"; const char *name = "tinytest";
Console << "Starting MDNS, name= " << name; Console << "Starting MDNS, name= " << name;
@@ -120,12 +126,14 @@ void setup()
else else
Console << " ok." << endl; Console << " ok." << endl;
MqttBroker *broker = new MqttBroker(1883); MqttBroker *broker = new MqttBroker(1883);
broker->begin(); broker->begin();
brokers["broker"] = broker; brokers["broker"] = broker;
if (Console.isTerm()) onCommand("every 333 view"); if (Console.isTerm())
onCommand("every 333 view");
Console.prompt();
} }
string getword(string &str, const char *if_empty = nullptr, char sep = ' '); string getword(string &str, const char *if_empty = nullptr, char sep = ' ');
@@ -137,8 +145,10 @@ int getint(string& str, const int if_empty=0)
if (sword[0] and isdigit(sword[0])) if (sword[0] and isdigit(sword[0]))
{ {
int ret = atoi(sword.c_str()); int ret = atoi(sword.c_str());
while(isdigit(sword[0]) or sword[0]==' ') sword.erase(0,1); while (isdigit(sword[0]) or sword[0] == ' ')
if (sword.length()) str = sword+' '+str; sword.erase(0, 1);
if (sword.length())
str = sword + ' ' + str;
return ret; return ret;
} }
str = str2; str = str2;
@@ -148,7 +158,8 @@ int getint(string& str, const int if_empty=0)
string getword(string &str, const char *if_empty /*=nullptr*/, char sep /*=' '*/) string getword(string &str, const char *if_empty /*=nullptr*/, char sep /*=' '*/)
{ {
char quote = (str[0] == '"' or str[0] == '\'' ? str[0] : 0); char quote = (str[0] == '"' or str[0] == '\'' ? str[0] : 0);
if (quote) str.erase(0,1); if (quote)
str.erase(0, 1);
string sword; string sword;
while (str.length() and (str[0] != sep or quote)) while (str.length() and (str[0] != sep or quote))
{ {
@@ -160,8 +171,10 @@ string getword(string& str, const char* if_empty/*=nullptr*/, char sep/*=' '*/)
sword += str[0]; sword += str[0];
str.erase(0, 1); str.erase(0, 1);
} }
while(str[0]==sep) str.erase(0,1); while (str[0] == sep)
if (if_empty and sword.length()==0) return if_empty; str.erase(0, 1);
if (if_empty and sword.length() == 0)
return if_empty;
if (quote == false and sword.length() >= 4 and sword.substr(0, 3) == "rnd") if (quote == false and sword.length() >= 4 and sword.substr(0, 3) == "rnd")
{ {
sword.erase(0, 3); sword.erase(0, 3);
@@ -174,7 +187,8 @@ string getword(string& str, const char* if_empty/*=nullptr*/, char sep/*=' '*/)
{ {
sword.erase(0, 1); sword.erase(0, 1);
to = getint(sword); to = getint(sword);
if (sword[0]!=')') Console << "Missing ')'" << endl; if (sword[0] != ')')
Console << "Missing ')'" << endl;
} }
else else
{ {
@@ -188,15 +202,18 @@ string getword(string& str, const char* if_empty/*=nullptr*/, char sep/*=' '*/)
Console << "Missing '('" << endl; Console << "Missing '('" << endl;
} }
} }
while(str[0]==' ') str.erase(0,1); while (str[0] == ' ')
str.erase(0, 1);
return sword; return sword;
} }
bool isaddr(string s) bool isaddr(string s)
{ {
if (s.length()==0 or s.length()>3) return false; if (s.length() == 0 or s.length() > 3)
return false;
for (char c : s) for (char c : s)
if (c<'0' or c>'9') return false; if (c < '0' or c > '9')
return false;
return true; return true;
} }
@@ -225,7 +242,8 @@ string getip(string& str, const char* if_empty=nullptr, char sep=' ')
} }
for (string s : build) for (string s : build)
{ {
if (addr.length()) addr += '.'; if (addr.length())
addr += '.';
addr += s; addr += s;
} }
Console << "connect address: " << addr << endl; Console << "connect address: " << addr << endl;
@@ -238,20 +256,22 @@ std::set<string> commands = {
"broker", "blink", "client", "connect", "broker", "blink", "client", "connect",
"create", "delete", "debug", "help", "interval", "create", "delete", "debug", "help", "interval",
"list", "ls", "ip", "off", "on", "set", "list", "ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "unsubscribe", "view", "echo", "every" "publish", "reset", "subscribe", "unsubscribe", "view", "echo", "every"};
};
void convertToCommand(string &search) void convertToCommand(string &search)
{ {
while(search[0]==' ') search.erase(0,1); while (search[0] == ' ')
if (search.length()==0) return; search.erase(0, 1);
if (search.length() == 0)
return;
string matches; string matches;
int count = 0; int count = 0;
for (string cmd : commands) for (string cmd : commands)
{ {
if (cmd.substr(0, search.length()) == search) if (cmd.substr(0, search.length()) == search)
{ {
if (count) matches +=", "; if (count)
matches += ", ";
count++; count++;
matches += cmd; matches += cmd;
} }
@@ -295,7 +315,6 @@ void replaceVars(string& cmd)
} }
cmd.erase(0, cmd.find_first_not_of(' ')); cmd.erase(0, cmd.find_first_not_of(' '));
cmd.erase(cmd.find_last_not_of(' ') + 1); cmd.erase(cmd.find_last_not_of(' ') + 1);
} }
bool compare(string s, const char *cmd) bool compare(string s, const char *cmd)
@@ -303,8 +322,10 @@ bool compare(string s, const char* cmd)
uint8_t p = 0; uint8_t p = 0;
while (s[p++] == *cmd++) while (s[p++] == *cmd++)
{ {
if (*cmd==0 or s[p]==0) return true; if (*cmd == 0 or s[p] == 0)
if (s[p]==' ') return true; return true;
if (s[p] == ' ')
return true;
} }
return false; return false;
} }
@@ -347,10 +368,10 @@ void onCommand(const string& command)
{ {
Console << endl; Console << endl;
string cmd = command; string cmd = command;
if (cmd.substr(0,3)!="set") replaceVars(cmd); if (cmd.substr(0, 3) != "set")
replaceVars(cmd);
eval(cmd); eval(cmd);
Console << endl; Console << endl;
Console.prompt();
} }
void clientConnect(MqttClient *client, string &cmd) void clientConnect(MqttClient *client, string &cmd)
@@ -406,9 +427,11 @@ void eval(string& cmd)
} }
s = getword(cmd); s = getword(cmd);
if (s.length()) convertToCommand(s); if (s.length())
convertToCommand(s);
if (s.length() == 0) if (s.length() == 0)
{} {
}
else if (compare(s, "debug")) else if (compare(s, "debug"))
{ {
#if TINY_MQTT_DEBUG #if TINY_MQTT_DEBUG
@@ -445,7 +468,8 @@ void eval(string& cmd)
{ {
for (auto it : clients) for (auto it : clients)
{ {
if (it.second != client) continue; if (it.second != client)
continue;
Console << "deleted" << endl; Console << "deleted" << endl;
delete (it.second); delete (it.second);
clients.erase(it.first); clients.erase(it.first);
@@ -457,7 +481,8 @@ void eval(string& cmd)
{ {
for (auto it : brokers) for (auto it : brokers)
{ {
if (broker != it.second) continue; if (broker != it.second)
continue;
Console << "deleted" << endl; Console << "deleted" << endl;
delete (it.second); delete (it.second);
brokers.erase(it.first); brokers.erase(it.first);
@@ -677,7 +702,8 @@ void eval(string& cmd)
if (s == "" or brokers.find(s) != brokers.end() or list.find(s) != list.end()) if (s == "" or brokers.find(s) != brokers.end() or list.find(s) != list.end())
{ {
MqttBroker *broker = nullptr; MqttBroker *broker = nullptr;
if (s.length()) broker = brokers[s]; if (s.length())
broker = brokers[s];
MqttClient *client = new MqttClient(broker, id); MqttClient *client = new MqttClient(broker, id);
clients[id] = client; clients[id] = client;
client->setCallback(onPublish); client->setCallback(onPublish);
@@ -790,7 +816,8 @@ void eval(string& cmd)
} }
else else
{ {
while(s[0]==' ') s.erase(0,1); while (s[0] == ' ')
s.erase(0, 1);
if (s.length()) if (s.length())
Console << "Unknown command (" << s.c_str() << ")" << endl; Console << "Unknown command (" << s.c_str() << ")" << endl;
} }
@@ -811,7 +838,8 @@ void loop()
for (auto &every : everies) for (auto &every : everies)
{ {
if (not every.active) continue; if (not every.active)
continue;
if (every.ms && every.cmd.length() && ms > every.next) if (every.ms && every.cmd.length() && ms > every.next)
{ {
string cmd(every.cmd); string cmd(every.cmd);
@@ -821,6 +849,7 @@ void loop()
{ {
every.next += every.ms; every.next += every.ms;
Console << yellow << "Underrun every #" << e << ", " << (ms - every.next) << "ms late" << endl; Console << yellow << "Underrun every #" << e << ", " << (ms - every.next) << "ms late" << endl;
Console.fg(white);
every.underrun = ms + 5000; every.underrun = ms + 5000;
} }
} }

View File

@@ -6,7 +6,9 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.9.15", "dependencies":
{ "hsaturn/TinyConsole" : "*" },
"version": "0.9.19",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -6,6 +6,8 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"dependencies":
{ #dependencies },
"version": "#version", "version": "#version",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",

View File

@@ -1,5 +1,5 @@
name=TinyMqtt name=TinyMqtt
version=0.9.16 version=0.9.19
author=Francois BIOT, HSaturn, <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT <hsaturn@gmail.com> maintainer=Francois BIOT <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
@@ -7,5 +7,5 @@ paragraph=MQTT is a lightweight messaging protocol. This library allows to host
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*
depends=TinyConsole depends=hsaturn/TinyConsole
includes=TinyMqtt.h includes=TinyMqtt.h

View File

@@ -107,6 +107,8 @@ class IndexedString
index = source.index; index = source.index;
} }
IndexedString(IndexedString&& i) : index(i.index) {}
IndexedString(const char* str, uint8_t len) IndexedString(const char* str, uint8_t len)
{ {
index=StringIndexer::strToIndex(str, len); index=StringIndexer::strToIndex(str, len);

View File

@@ -15,8 +15,10 @@ int TinyMqtt::debug=2;
std::map<MqttMessage::Type, int> MqttClient::counters; std::map<MqttMessage::Type, int> MqttClient::counters;
#endif #endif
MqttBroker::MqttBroker(uint16_t port) MqttBroker::MqttBroker(uint16_t port, uint8_t max_retain_size)
{ {
debug("New broker" << port);
retain_size = max_retain_size;
server = new TcpServer(port); server = new TcpServer(port);
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
server->onClient(onClient, this); server->onClient(onClient, this);
@@ -116,7 +118,7 @@ void MqttClient::connect(string broker, uint16_t port, uint16_t ka)
#ifdef TINY_MQTT_ASYNC #ifdef TINY_MQTT_ASYNC
tcp_client->onData(onData, this); tcp_client->onData(onData, this);
tcp_client->onConnect(onConnect, this); tcp_client->onConnect(onConnect, this);
tcp_client->connect(broker.c_str(), port, ka); tcp_client->connect(broker.c_str(), port);
#else #else
if (tcp_client->connect(broker.c_str(), port)) if (tcp_client->connect(broker.c_str(), port))
{ {
@@ -142,6 +144,7 @@ void MqttBroker::connect(const string& host, uint16_t port)
if (remote_broker == nullptr) remote_broker = new MqttClient; if (remote_broker == nullptr) remote_broker = new MqttClient;
remote_broker->connect(host, port); remote_broker->connect(host, port);
remote_broker->local_broker = this; // Because connect removed the link remote_broker->local_broker = this; // Because connect removed the link
// TODO shouldn't we resubscribe to all client subscriptions ?
} }
void MqttBroker::removeClient(MqttClient* remove) void MqttBroker::removeClient(MqttClient* remove)
@@ -211,9 +214,19 @@ void MqttBroker::loop()
} }
} }
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos) // Obvioulsy called when the broker is connected to another broker.
MqttError MqttBroker::subscribe(MqttClient* client, const Topic& topic, uint8_t qos)
{ {
debug("MqttBroker::subscribe"); debug("MqttBroker::subscribe to " << topic.str() << ", retained=" << retained.size() );
for(auto& [retained_topic, retain]: retained)
{
debug(" retained: " << retained_topic.str());
if (topic.matches(retained_topic))
{
debug(" -> sending");
client->publishIfSubscribed(retained_topic, retain.msg);
}
}
if (remote_broker && remote_broker->connected()) if (remote_broker && remote_broker->connected())
{ {
return remote_broker->subscribe(topic, qos); return remote_broker->subscribe(topic, qos);
@@ -221,10 +234,12 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
return MqttNowhereToSend; return MqttNowhereToSend;
} }
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk;
retain(topic, msg);
debug("MqttBroker::publish"); debug("MqttBroker::publish");
int i=0; int i=0;
for(auto client: clients) for(auto client: clients)
@@ -294,25 +309,26 @@ void MqttClient::clientAlive(uint32_t more_seconds)
void MqttClient::loop() void MqttClient::loop()
{ {
if (keep_alive && (millis() >= alive)) if (keep_alive && (millis() >= alive - 5000))
{ {
if (local_broker) if (tcp_client && tcp_client->connected())
{
debug(red << "timeout client");
close();
debug(red << "closed");
}
else if (tcp_client && tcp_client->connected())
{ {
debug("pingreq"); debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; static MqttMessage pingreq(MqttMessage::Type::PingReq);
tcp_client->write((const char*)(&pingreq), 2); pingreq.sendTo(this);
clientAlive(0); clientAlive(0);
// TODO when many MqttClient passes through a local broker // TODO when many MqttClient passes through a local broker
// there is no need to send one PingReq per instance. // there is no need to send one PingReq per instance.
} }
else if (local_broker)
{
debug(red << "timeout client");
close();
debug(red << "closed");
} }
}
#ifndef TINY_MQTT_ASYNC #ifndef TINY_MQTT_ASYNC
while(tcp_client && tcp_client->available()>0) while(tcp_client && tcp_client->available()>0)
{ {
@@ -391,13 +407,13 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
subscriptions.insert(topic); subscriptions.insert(topic);
if (local_broker==nullptr) // remote broker if (local_broker==nullptr) // connected to a remote broker
{ {
return sendTopic(topic, MqttMessage::Type::Subscribe, qos); return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
} }
else else
{ {
return local_broker->subscribe(topic, qos); return local_broker->subscribe(this, topic, qos);
} }
return ret; return ret;
} }
@@ -568,7 +584,7 @@ void MqttClient::processMessage(MqttMessage* mesg)
} }
else else
qoss.push_back(qos); qoss.push_back(qos);
subscriptions.insert(topic); subscribe(topic);
} }
else else
{ {
@@ -618,8 +634,8 @@ void MqttClient::processMessage(MqttMessage* mesg)
#if TINY_MQTT_DEBUG #if TINY_MQTT_DEBUG
if (TinyMqtt::debug >= 2) if (TinyMqtt::debug >= 2)
{ {
Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n"; Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\r\n";
Console << "has " << (callback ? "" : "no ") << " callback.\n"; Console << "has " << (callback ? "" : "no ") << " callback.\r\n";
} }
#endif #endif
if (callback and isSubscribedTo(published)) if (callback and isSubscribedTo(published))
@@ -728,9 +744,9 @@ bool Topic::matches(const Topic& topic) const
// publish from local client // publish from local client
MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length) MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length, bool retain)
{ {
MqttMessage msg(MqttMessage::Publish); MqttMessage msg(MqttMessage::Publish, retain ? 1 : 0);
msg.add(topic); msg.add(topic);
msg.add(payload, pay_length, false); msg.add(payload, pay_length, false);
msg.complete(); msg.complete();
@@ -894,6 +910,35 @@ MqttError MqttMessage::sendTo(MqttClient* client)
return MqttOk; return MqttOk;
} }
void MqttBroker::retainDrop()
{
if (retained.size() >= retain_size)
{
std::map<Topic, Retain>::iterator oldest = retained.begin();
auto it = oldest;
while(++it != retained.end())
{
if (oldest->second.timestamp > it->second.timestamp)
oldest = it;
}
retained.erase(oldest);
}
}
void MqttBroker::retain(const Topic& topic, const MqttMessage& msg)
{
debug("MqttBroker::retain msg_type=" << _HEX(msg.type()));
if (retain_size==0 or msg.type() != MqttMessage::Publish) return;
if (msg.flags() & 1) // flag RETAIN
{
debug(" retaining " << topic.str());
if (retained.find(topic) == retained.end()) retainDrop();
// FIXME if payload size == 0 remove message from retained
Retain r(micros(), msg);
retained.insert({ topic, std::move(r)});
}
}
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const
{ {
(void)prefix; (void)prefix;

View File

@@ -121,7 +121,10 @@ class MqttMessage
return (*bun << 8) | bun[1]; } return (*bun << 8) | bun[1]; }
MqttMessage() { reset(); } MqttMessage() { reset(); }
MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; } MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= (bits_d3_d0 & 0xF); }
MqttMessage(const MqttMessage& m)
: buffer(m.buffer), vheader(m.vheader), size(m.size), state(m.state) {}
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len, bool addLength=true ); void add(const char* p, size_t len, bool addLength=true );
@@ -156,6 +159,15 @@ class MqttMessage
MqttError sendTo(MqttClient*); MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
MqttMessage& operator = (MqttMessage&& m)
{
buffer = std::move(m.buffer);
vheader = m.vheader;
size = m.size;
state = m.state;
return *this;
}
private: private:
void encodeLength(); void encodeLength();
@@ -228,11 +240,11 @@ class MqttClient
}; };
// Publish from client to the world // Publish from client to the world
MqttError publish(const Topic&, const char* payload, size_t pay_length); MqttError publish(const Topic&, const char* payload, size_t pay_length, bool retain=false);
MqttError publish(const Topic& t, const char* payload) { return publish(t, payload, strlen(payload)); } MqttError publish(const Topic& t, const char* payload, bool retain=false) { return publish(t, payload, strlen(payload), retain); }
MqttError publish(const Topic& t, const String& s) { return publish(t, s.c_str(), s.length()); } MqttError publish(const Topic& t, const String& s, bool retain=false) { return publish(t, s.c_str(), s.length(), retain); }
MqttError publish(const Topic& t, const string& s) { return publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t, const string& s, bool retain=false) { return publish(t,s.c_str(),s.length(), retain);}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; MqttError publish(const Topic& t, bool retain=false) { return publish(t, nullptr, 0, retain);};
MqttError subscribe(Topic topic, uint8_t qos=0); MqttError subscribe(Topic topic, uint8_t qos=0);
MqttError unsubscribe(Topic topic); MqttError unsubscribe(Topic topic);
@@ -317,15 +329,9 @@ class MqttClient
class MqttBroker class MqttBroker
{ {
enum __attribute__((packed)) State
{
Disconnected, // Also the initial state
Connecting, // connect and sends a fake publish to avoid circular cnx
Connected, // this->broker is connected and circular cnx avoided
};
public: public:
// TODO limit max number of clients // TODO limit max number of clients
MqttBroker(uint16_t port); MqttBroker(uint16_t port, uint8_t retain_size=0);
~MqttBroker(); ~MqttBroker();
void begin() { server->begin(); } void begin() { server->begin(); }
@@ -334,9 +340,10 @@ class MqttBroker
/** Connect the broker to a parent broker */ /** Connect the broker to a parent broker */
void connect(const string& host, uint16_t port=1883); void connect(const string& host, uint16_t port=1883);
/** returns true if connected to another broker */ /** returns true if connected to another broker */
bool connected() const { return state == Connected; } bool connected() const { return remote_broker ? remote_broker->connected() : false; }
size_t clientsCount() const { return clients.size(); } size_t clientsCount() const { return clients.size(); }
void retain(uint8_t size) { retain_size = size; }
void dump(string indent="") void dump(string indent="")
{ {
@@ -356,10 +363,9 @@ class MqttBroker
bool checkPassword(const char* password, uint8_t len) const bool checkPassword(const char* password, uint8_t len) const
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const; MqttError subscribe(MqttClient*, const Topic& topic, uint8_t qos);
MqttError subscribe(const Topic& topic, uint8_t qos);
// For clients that are added not by the broker itself (local clients) // For clients that are added not by the broker itself (local clients)
void addClient(MqttClient* client); void addClient(MqttClient* client);
@@ -375,5 +381,25 @@ class MqttBroker
const char* auth_password = "guest"; const char* auth_password = "guest";
MqttClient* remote_broker = nullptr; MqttClient* remote_broker = nullptr;
State state = Disconnected; void retain(const Topic& topic, const MqttMessage& msg);
void retainDrop();
struct Retain
{
Retain(unsigned long ts, const MqttMessage& m) : timestamp(ts), msg(m) {}
Retain(Retain&& r) : timestamp(r.timestamp), msg(std::move(r.msg)) {}
Retain& operator=(Retain&& r)
{
timestamp = r.timestamp;
msg = std::move(r.msg);
return *this;
}
unsigned long timestamp;
MqttMessage msg;
};
std::map<Topic, Retain> retained;
uint8_t retain_size;
}; };