Big rewrite of MqttClient in order to avoid code duplicate

This commit is contained in:
hsaturn
2021-04-04 05:57:48 +02:00
parent 510ff514a9
commit 297a22efb5
4 changed files with 158 additions and 68 deletions

View File

@@ -98,6 +98,13 @@ void MqttBroker::addClient(MqttClient* client)
clients.push_back(client);
}
void MqttBroker::connect(const std::string& host, uint16_t port)
{
if (broker == nullptr) broker = new MqttClient;
broker->connect(host, port);
broker->parent = this; // Because connect removed the link
}
void MqttBroker::removeClient(MqttClient* remove)
{
for(auto it=clients.begin(); it!=clients.end(); it++)
@@ -105,6 +112,11 @@ void MqttBroker::removeClient(MqttClient* remove)
auto client=*it;
if (client==remove)
{
// TODO if this broker is connected to an external broker
// we have to unsubscribe remove's topics.
// (but doing this, check that other clients are not subscribed...)
// Unless -> we could receive useless messages
// -> we are using (memory) one IndexedString plus its string for nothing.
debug("Remove " << clients.size());
clients.erase(it);
debug("Client removed " << clients.size());
@@ -118,6 +130,13 @@ void MqttBroker::loop()
{
WiFiClient client = server.available();
if (broker)
{
// TODO should monitor broker's activity.
// 1 When broker disconnect and reconnect we have to re-subscribe
broker->loop();
}
if (client)
{
addClient(new MqttClient(this, client));
@@ -143,7 +162,15 @@ void MqttBroker::loop()
}
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
if (broker && broker->connected())
{
return broker->subscribe(topic, qos);
}
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
{
MqttError retval = MqttOk;
@@ -157,28 +184,27 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
bool doit = false;
if (broker && broker->connected()) // Broker is connected
if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
{
// ext broker -> clients or
// or clients -> ext broker
if (source == broker) // broker -> clients
// ext_broker -> clients or clients -> ext_broker
if (source == broker) // external broker -> internal clients
doit = true;
else // clients -> broker
else // external clients -> this broker
{
MqttError ret = broker->publish(topic, msg);
// As this broker is connected to another broker, simply forward the msg
MqttError ret = broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret;
}
}
else // Disconnected: R7
else // Disconnected
{
// All is allowed
doit = true;
}
#if TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' ';
#endif
if (doit) retval = client->publish(topic, msg);
if (doit) retval = client->publishIfSubscribed(topic, msg);
debug("");
}
return retval;
@@ -237,7 +263,8 @@ void MqttClient::loop()
message.incoming(client->read());
if (message.type())
{
processMessage();
processMessage(&message);
message.reset();
}
}
}
@@ -273,6 +300,10 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
}
else
{
return parent->subscribe(topic, qos);
}
return ret;
}
@@ -307,22 +338,22 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint
long MqttClient::counter=0;
void MqttClient::processMessage()
void MqttClient::processMessage(const MqttMessage* mesg)
{
counter++;
#if TINY_MQTT_DEBUG
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// message.hexdump("Incoming");
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming");
}
#endif
auto header = message.getVHeader();
auto header = mesg->getVHeader();
const char* payload;
uint16_t len;
bool bclose=true;
switch(message.type() & 0XF0)
switch(mesg->type() & 0XF0)
{
case MqttMessage::Type::Connect:
if (mqtt_connected)
@@ -345,30 +376,30 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
}
// ClientId
message.getString(payload, len);
mesg->getString(payload, len);
clientId = std::string(payload, len);
payload += len;
if (mqtt_flags & FlagWill) // Will topic
{
message.getString(payload, len); // Will Topic
mesg->getString(payload, len); // Will Topic
outstring("WillTopic", payload, len);
payload += len;
message.getString(payload, len); // Will Message
mesg->getString(payload, len); // Will Message
outstring("WillMessage", payload, len);
payload += len;
}
// FIXME forgetting credential is allowed (security hole)
if (mqtt_flags & FlagUserName)
{
message.getString(payload, len);
mesg->getString(payload, len);
if (!parent->checkUser(payload, len)) break;
payload += len;
}
if (mqtt_flags & FlagPassword)
{
message.getString(payload, len);
mesg->getString(payload, len);
if (!parent->checkPassword(payload, len)) break;
payload += len;
}
@@ -423,14 +454,14 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
payload = header+2;
debug("subscribe loop");
while(payload < message.end())
while(payload < mesg->end())
{
message.getString(payload, len); // Topic
mesg->getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len));
Topic topic(payload, len);
if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe)
if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe)
subscriptions.insert(topic);
else
{
@@ -449,37 +480,43 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break;
case MqttMessage::Type::Publish:
if (!mqtt_connected) break;
if (mqtt_connected or client == nullptr)
{
uint8_t qos = message.type() & 0x6;
uint8_t qos = mesg->type() & 0x6;
payload = header;
message.getString(payload, len);
mesg->getString(payload, len);
Topic published(payload, len);
payload += len;
len=message.end()-payload;
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl;
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
if (qos) payload+=2; // ignore packet identifier if any
len=mesg->end()-payload;
// TODO reset DUP
// TODO reset RETAIN
if (parent)
if (client==nullptr) // internal MqttClient receives publish
{
if (callback and isSubscribedTo(published))
{
callback(this, published, payload, len); // TODO send the real payload
mesg->changeType(MqttMessage::Type::PubAck); // TODO constness design but saves memory & speed
// TODO re-add packet identifier if any
mesg->sendTo(this);
mesg->changeType(MqttMessage::Type::Publish); // mesg is const (...)
}
}
else if (parent) // from outside to inside
{
debug("publishing to parent");
parent->publish(this, published, message);
parent->publish(this, published, *mesg);
}
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
message.create(MqttMessage::Type::PubAck);
// TODO re-add packet identifier if any
message.sendTo(this);
bclose = false;
}
break;
case MqttMessage::Type::Disconnect:
// TODO should discard any will message
// TODO should discard any will msg
if (!mqtt_connected) break;
mqtt_connected = false;
close(false);
@@ -492,8 +529,9 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
};
if (bclose)
{
Serial << "*************** Error msg 0x" << _HEX(message.type());
message.hexdump("-------ERROR ------");
Serial << "*************** Error msg 0x" << _HEX(mesg->type());
mesg->hexdump("-------ERROR ------");
dump();
Serial << endl;
close();
}
@@ -501,7 +539,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
{
clientAlive(parent ? 5 : 0);
}
message.reset();
}
bool Topic::matches(const Topic& topic) const
@@ -517,8 +554,11 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
MqttMessage msg(MqttMessage::Publish);
msg.add(topic);
msg.add(payload, pay_length, false);
msg.complete();
if (parent)
{
return parent->publish(this, topic, msg);
}
else if (client)
return msg.sendTo(this);
else
@@ -526,29 +566,33 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
}
// republish a received publish if it matches any in subscriptions
MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg)
{
MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions)
if (isSubscribedTo(topic))
{
if (subscription.matches(topic))
{
debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ');
if (client)
{
retval = msg.sendTo(this);
}
else if (callback)
else
{
callback(this, topic, nullptr, 0); // TODO Payload
}
processMessage(&msg);
// callback(this, topic, nullptr, 0); // TODO Payload
}
}
return retval;
}
bool MqttClient::isSubscribedTo(const Topic& topic) const
{
for(const auto& subscription: subscriptions)
if (subscription.matches(topic))
return true;
return false;
}
void MqttMessage::reset()
{
buffer.clear();
@@ -623,7 +667,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
while(len--) incoming(*p++);
}
void MqttMessage::encodeLength(char* msb, int length)
void MqttMessage::encodeLength(char* msb, int length) const
{
do
{
@@ -634,7 +678,13 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length);
};
MqttError MqttMessage::sendTo(MqttClient* client)
void MqttMessage::complete()
{
encodeLength(&buffer[1], buffer.size()-2);
state = Complete;
}
MqttError MqttMessage::sendTo(MqttClient* client) const
{
if (buffer.size())
{

View File

@@ -72,6 +72,7 @@ class MqttMessage
const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return buffer.size(); }
void complete();
void reset();
@@ -85,6 +86,13 @@ class MqttMessage
return state == Complete ? static_cast<Type>(buffer[0]) : Unknown;
}
// shouldn't exist because it breaks constness :-(
// but this saves memory so ...
void changeType(Type type) const
{
buffer[0] = type;
}
void create(Type type)
{
buffer=(char)type;
@@ -93,13 +101,13 @@ class MqttMessage
size=0;
state=Create;
}
MqttError sendTo(MqttClient*);
MqttError sendTo(MqttClient*) const;
void hexdump(const char* prefix=nullptr) const;
private:
void encodeLength(char* msb, int length);
void encodeLength(char* msb, int length) const;
std::string buffer;
mutable std::string buffer; // mutable -> sendTo()
uint8_t vheader;
uint16_t size; // bytes left to receive
State state;
@@ -152,6 +160,7 @@ class MqttClient
MqttError subscribe(Topic topic, uint8_t qos=0);
MqttError unsubscribe(Topic topic);
bool isSubscribedTo(const Topic& topic) const;
// connected to local broker
// TODO seems to be useless
@@ -186,10 +195,10 @@ class MqttClient
friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions
MqttError publish(const Topic& topic, MqttMessage& msg);
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
void clientAlive(uint32_t more_seconds);
void processMessage();
void processMessage(const MqttMessage* message);
bool mqtt_connected = false;
char mqtt_flags;
@@ -226,7 +235,7 @@ class MqttBroker
uint16_t port() const { return server.port(); }
void connect(std::string host, uint16_t port=1883);
void connect(const std::string& host, uint16_t port=1883);
bool connected() const { return state == Connected; }
size_t clientsCount() const { return clients.size(); }
@@ -251,7 +260,9 @@ class MqttBroker
{ return compareString(auth_password, password, len); }
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const;
MqttError subscribe(const Topic& topic, uint8_t qos);
// For clients that are added not by the broker itself
void addClient(MqttClient* client);

View File

@@ -17,10 +17,15 @@ MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
}
test(local_client_should_unregister_when_destroyed)

View File

@@ -6,7 +6,7 @@
* TinyMqtt nowifi unit tests.
*
* No wifi connection unit tests.
* Checks with a local broker. Clients must connect to the local client
* Checks with a local broker. Clients must connect to the local broker
**/
using namespace std;
@@ -15,10 +15,15 @@ MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
}
test(nowifi_client_should_unregister_when_destroyed)
@@ -60,7 +65,7 @@ test(nowifi_publish_should_be_dispatched)
assertEqual(published[""]["a/c"], 2);
}
test(nowifi_publish_should_be_dispatched_to_nowifi_clients)
test(nowifi_publish_should_be_dispatched_to_clients)
{
published.clear();
assertEqual(broker.clientsCount(), (size_t)0);
@@ -75,8 +80,8 @@ test(nowifi_publish_should_be_dispatched_to_nowifi_clients)
subscriber_b.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b");
publisher.publish("a/c");
publisher.publish("a/b"); // A and B should receive this
publisher.publish("a/c"); // A should receive this
assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertEqual(published["A"]["a/b"], 1);
@@ -124,6 +129,25 @@ test(nowifi_nocallback_when_destroyed)
assertEqual(published.size(), (size_t)1); // Only one publish has been received
}
test(nowifi_payload_nullptr)
{
return; // FIXME
published.clear();
const char* payload="abcd";
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b", payload, strlen(payload)); // This publish is received
// coming from MqttClient::publish(...)
assertEqual(payload, lastPayload);
assertEqual(lastLength, (size_t)4);
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {