Supports multiple subscriptions

This commit is contained in:
hsaturn
2021-03-23 23:51:33 +01:00
parent efe6a05bbd
commit 7b20e7deb5
6 changed files with 73 additions and 29 deletions

View File

@@ -24,9 +24,9 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
* Add a max_clients in MqttBroker. Used with zeroconf, there will be * Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed. * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* MqttClient auto reconnection
* MqttClient auto re-subscribe * MqttClient auto re-subscribe
* MqttClient auto reconnection
* MqttClient does not callback payload... * MqttClient does not callback payload...
* MqttClient user/password * MqttClient user/password

View File

@@ -1,5 +1,4 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <my_credentials.h> #include <my_credentials.h>

View File

@@ -225,6 +225,8 @@ void loop()
last_cmd=cmd; last_cmd=cmd;
while(cmd.length()) while(cmd.length())
{ {
MqttError retval = MqttOk;
std::string s; std::string s;
MqttBroker* broker = nullptr; MqttBroker* broker = nullptr;
MqttClient* client = nullptr; MqttClient* client = nullptr;
@@ -314,11 +316,7 @@ void loop()
} }
else if (compare(s,"publish")) else if (compare(s,"publish"))
{ {
auto ok=client->publish(getword(cmd, topic.c_str())); retval = client->publish(getword(cmd, topic.c_str()));
if (ok != MqttOk)
{
Serial << "## ERROR " << ok << endl;
}
} }
else if (compare(s,"subscribe")) else if (compare(s,"subscribe"))
{ {
@@ -431,6 +429,11 @@ void loop()
if (s.length()) if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl; Serial << "Unknown command (" << s.c_str() << ")" << endl;
} }
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
} }
} }
else else

View File

@@ -2,7 +2,6 @@
#include <map> #include <map>
#include <string> #include <string>
#include <string.h> #include <string.h>
// #include <Streaming.h>
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
/*** /***

View File

@@ -146,16 +146,19 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
{ {
i++; i++;
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected(); " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6 if (broker && broker->connected()) // Broker is connected
{ {
// ext broker -> clients or // ext broker -> clients or
// or clients -> ext broker // or clients -> ext broker
if (source == broker) // broker -> clients if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // clients -> broker
retval=broker->publish(topic, msg); {
MqttError ret = broker->publish(topic, msg);
if (ret != MqttOk) retval = ret;
}
} }
else // Disconnected: R7 else // Disconnected: R7
{ {
@@ -164,7 +167,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
} }
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
if (doit) client->publish(topic, msg); if (doit) retval = client->publish(topic, msg);
debug(""); debug("");
} }
return retval; return retval;
@@ -227,6 +230,29 @@ void MqttClient::loop()
} }
} }
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
debug("subsribe(" << topic.c_str() << ")");
MqttError ret = MqttOk;
subscriptions.insert(topic);
if (parent==nullptr) // remote broker ?
{
debug("remote subscribe");
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO Qos > 0 needs a packet identifier
msg.add(topic.str());
msg.add(qos);
ret = msg.sendTo(this);
// TODO we should wait (state machine) for SUBACK
}
return ret;
}
void MqttClient::processMessage() void MqttClient::processMessage()
{ {
std::string error; std::string error;
@@ -327,14 +353,26 @@ void MqttClient::processMessage()
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
{
if (!mqtt_connected) break; if (!mqtt_connected) break;
payload = header+2; payload = header+2;
message.getString(payload, len); // Topic
outstring("Subscribes", payload, len);
subscribe(Topic(payload, len)); debug("subscribe loop");
while(payload < message.end())
{
message.getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len));
payload += len;
uint8_t qos = *payload++;
debug(" qos=" << qos);
}
debug("end loop");
bclose = false; bclose = false;
// TODO SUBACK // TODO SUBACK
}
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
@@ -398,7 +436,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
if (parent) if (parent)
return parent->publish(this, topic, msg); return parent->publish(this, topic, msg);
else if (client) else if (client)
msg.sendTo(this); return msg.sendTo(this);
else else
return MqttNowhereToSend; return MqttNowhereToSend;
} }
@@ -417,7 +455,7 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
Serial << " match/send"; Serial << " match/send";
if (client) if (client)
{ {
msg.sendTo(this); retval = msg.sendTo(this);
} }
else if (callback) else if (callback)
{ {
@@ -506,7 +544,7 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
void MqttMessage::sendTo(MqttClient* client) MqttError MqttMessage::sendTo(MqttClient* client)
{ {
if (buffer.size()>2) if (buffer.size()>2)
{ {
@@ -517,7 +555,9 @@ void MqttMessage::sendTo(MqttClient* client)
else else
{ {
Serial << "??? Invalid send" << endl; Serial << "??? Invalid send" << endl;
return MqttInvalidMessage;
} }
return MqttOk;
} }
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const

View File

@@ -3,9 +3,11 @@
#include <set> #include <set>
#include <string> #include <string>
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h>
#define TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
#include <Streaming.h>
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else #else
#define debug(what) {} #define debug(what) {}
@@ -15,6 +17,7 @@ enum MqttError
{ {
MqttOk = 0, MqttOk = 0,
MqttNowhereToSend=1, MqttNowhereToSend=1,
MqttInvalidMessage=2,
}; };
class Topic : public IndexedString class Topic : public IndexedString
@@ -57,7 +60,7 @@ class MqttMessage
}; };
MqttMessage() { reset(); } MqttMessage() { reset(); }
MqttMessage(Type t) { create(t); } MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len, bool addLength=true ); void add(const char* p, size_t len, bool addLength=true );
@@ -87,7 +90,7 @@ class MqttMessage
size=0; size=0;
state=Create; state=Create;
} }
void sendTo(MqttClient*); MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
@@ -141,8 +144,8 @@ class MqttClient
MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); } MqttError subscribe(Topic topic, uint8_t qos=0);
void unsubscribe(Topic& topic); MqttError unsubscribe(Topic& topic);
// connected to local broker // connected to local broker
// TODO seems to be useless // TODO seems to be useless