Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5834a278c7 | ||
|
|
146d0de1d4 | ||
|
|
297a22efb5 | ||
|
|
510ff514a9 | ||
|
|
ad6f7155e5 | ||
|
|
3ed5874373 | ||
|
|
e1a936e081 | ||
|
|
0757a95fbf |
@@ -9,13 +9,19 @@
|
|||||||
TinyMqtt KEYWORD1
|
TinyMqtt KEYWORD1
|
||||||
|
|
||||||
MqttBroker KEYWORD1
|
MqttBroker KEYWORD1
|
||||||
|
connect KEYWORD2
|
||||||
|
clientsCount KEYWORD2
|
||||||
begin KEYWORD2
|
begin KEYWORD2
|
||||||
loop KEYWORD2
|
loop KEYWORD2
|
||||||
|
port KEYWORD2
|
||||||
|
|
||||||
MqttClient KEYWORD1
|
MqttClient KEYWORD1
|
||||||
|
connect KEYWORD2
|
||||||
|
connected KEYWORD2
|
||||||
publish KEYWORD2
|
publish KEYWORD2
|
||||||
setCallback KEYWORD2
|
setCallback KEYWORD2
|
||||||
subscribe KEYWORD2
|
subscribe KEYWORD2
|
||||||
|
unsubscribe KEYWORD2
|
||||||
|
|
||||||
Topic KEYWORD1
|
Topic KEYWORD1
|
||||||
matches KEYWORD2
|
matches KEYWORD2
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "https://github.com/hsaturn/TinyMqtt.git"
|
"url": "https://github.com/hsaturn/TinyMqtt.git"
|
||||||
},
|
},
|
||||||
"version": "0.7.2",
|
"version": "0.7.3",
|
||||||
"exclude": "",
|
"exclude": "",
|
||||||
"examples": "examples/*/*.ino",
|
"examples": "examples/*/*.ino",
|
||||||
"frameworks": "arduino",
|
"frameworks": "arduino",
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
name=TinyMqtt
|
name=TinyMqtt
|
||||||
version=0.7.2
|
version=0.7.3
|
||||||
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
||||||
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
||||||
sentence=A tiny broker and client library for MQTT messaging.
|
sentence=A tiny broker and client library for MQTT messaging.
|
||||||
|
|||||||
@@ -39,7 +39,7 @@ class StringIndexer
|
|||||||
return it->first;
|
return it->first;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for(index_t index=0; index<255; index++)
|
for(index_t index=1; index; index++)
|
||||||
{
|
{
|
||||||
if (strings.find(index)==strings.end())
|
if (strings.find(index)==strings.end())
|
||||||
{
|
{
|
||||||
@@ -80,6 +80,8 @@ class StringIndexer
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static uint16_t count() { return strings.size(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static std::map<index_t, StringCounter> strings;
|
static std::map<index_t, StringCounter> strings;
|
||||||
};
|
};
|
||||||
@@ -98,6 +100,8 @@ class IndexedString
|
|||||||
index=StringIndexer::strToIndex(str, len);
|
index=StringIndexer::strToIndex(str, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
IndexedString(const std::string& str) : IndexedString(str.c_str(), str.length()) {};
|
||||||
|
|
||||||
~IndexedString() { StringIndexer::release(index); }
|
~IndexedString() { StringIndexer::release(index); }
|
||||||
|
|
||||||
IndexedString& operator=(const IndexedString& source)
|
IndexedString& operator=(const IndexedString& source)
|
||||||
@@ -112,6 +116,11 @@ class IndexedString
|
|||||||
return i1.index < i2.index;
|
return i1.index < i2.index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
friend bool operator==(const IndexedString& i1, const IndexedString& i2)
|
||||||
|
{
|
||||||
|
return i1.index == i2.index;
|
||||||
|
}
|
||||||
|
|
||||||
const std::string& str() const { return StringIndexer::str(index); }
|
const std::string& str() const { return StringIndexer::str(index); }
|
||||||
|
|
||||||
const StringIndexer::index_t& getIndex() const { return index; }
|
const StringIndexer::index_t& getIndex() const { return index; }
|
||||||
|
|||||||
155
src/TinyMqtt.cpp
155
src/TinyMqtt.cpp
@@ -98,6 +98,13 @@ void MqttBroker::addClient(MqttClient* client)
|
|||||||
clients.push_back(client);
|
clients.push_back(client);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MqttBroker::connect(const std::string& host, uint16_t port)
|
||||||
|
{
|
||||||
|
if (broker == nullptr) broker = new MqttClient;
|
||||||
|
broker->connect(host, port);
|
||||||
|
broker->parent = this; // Because connect removed the link
|
||||||
|
}
|
||||||
|
|
||||||
void MqttBroker::removeClient(MqttClient* remove)
|
void MqttBroker::removeClient(MqttClient* remove)
|
||||||
{
|
{
|
||||||
for(auto it=clients.begin(); it!=clients.end(); it++)
|
for(auto it=clients.begin(); it!=clients.end(); it++)
|
||||||
@@ -105,6 +112,11 @@ void MqttBroker::removeClient(MqttClient* remove)
|
|||||||
auto client=*it;
|
auto client=*it;
|
||||||
if (client==remove)
|
if (client==remove)
|
||||||
{
|
{
|
||||||
|
// TODO if this broker is connected to an external broker
|
||||||
|
// we have to unsubscribe remove's topics.
|
||||||
|
// (but doing this, check that other clients are not subscribed...)
|
||||||
|
// Unless -> we could receive useless messages
|
||||||
|
// -> we are using (memory) one IndexedString plus its string for nothing.
|
||||||
debug("Remove " << clients.size());
|
debug("Remove " << clients.size());
|
||||||
clients.erase(it);
|
clients.erase(it);
|
||||||
debug("Client removed " << clients.size());
|
debug("Client removed " << clients.size());
|
||||||
@@ -118,6 +130,13 @@ void MqttBroker::loop()
|
|||||||
{
|
{
|
||||||
WiFiClient client = server.available();
|
WiFiClient client = server.available();
|
||||||
|
|
||||||
|
if (broker)
|
||||||
|
{
|
||||||
|
// TODO should monitor broker's activity.
|
||||||
|
// 1 When broker disconnect and reconnect we have to re-subscribe
|
||||||
|
broker->loop();
|
||||||
|
}
|
||||||
|
|
||||||
if (client)
|
if (client)
|
||||||
{
|
{
|
||||||
addClient(new MqttClient(this, client));
|
addClient(new MqttClient(this, client));
|
||||||
@@ -143,7 +162,15 @@ void MqttBroker::loop()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
|
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
|
||||||
|
{
|
||||||
|
if (broker && broker->connected())
|
||||||
|
{
|
||||||
|
return broker->subscribe(topic, qos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
|
||||||
{
|
{
|
||||||
MqttError retval = MqttOk;
|
MqttError retval = MqttOk;
|
||||||
|
|
||||||
@@ -157,28 +184,27 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
|
|||||||
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
|
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
|
||||||
#endif
|
#endif
|
||||||
bool doit = false;
|
bool doit = false;
|
||||||
if (broker && broker->connected()) // Broker is connected
|
if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
|
||||||
{
|
{
|
||||||
// ext broker -> clients or
|
// ext_broker -> clients or clients -> ext_broker
|
||||||
// or clients -> ext broker
|
if (source == broker) // external broker -> internal clients
|
||||||
if (source == broker) // broker -> clients
|
|
||||||
doit = true;
|
doit = true;
|
||||||
else // clients -> broker
|
else // external clients -> this broker
|
||||||
{
|
{
|
||||||
MqttError ret = broker->publish(topic, msg);
|
// As this broker is connected to another broker, simply forward the msg
|
||||||
|
MqttError ret = broker->publishIfSubscribed(topic, msg);
|
||||||
if (ret != MqttOk) retval = ret;
|
if (ret != MqttOk) retval = ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else // Disconnected: R7
|
else // Disconnected
|
||||||
{
|
{
|
||||||
// All is allowed
|
|
||||||
doit = true;
|
doit = true;
|
||||||
}
|
}
|
||||||
#if TINY_MQTT_DEBUG
|
#if TINY_MQTT_DEBUG
|
||||||
Serial << ", doit=" << doit << ' ';
|
Serial << ", doit=" << doit << ' ';
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (doit) retval = client->publish(topic, msg);
|
if (doit) retval = client->publishIfSubscribed(topic, msg);
|
||||||
debug("");
|
debug("");
|
||||||
}
|
}
|
||||||
return retval;
|
return retval;
|
||||||
@@ -237,7 +263,8 @@ void MqttClient::loop()
|
|||||||
message.incoming(client->read());
|
message.incoming(client->read());
|
||||||
if (message.type())
|
if (message.type())
|
||||||
{
|
{
|
||||||
processMessage();
|
processMessage(&message);
|
||||||
|
message.reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -273,6 +300,10 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
|
|||||||
{
|
{
|
||||||
return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
|
return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return parent->subscribe(topic, qos);
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -307,22 +338,22 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint
|
|||||||
|
|
||||||
long MqttClient::counter=0;
|
long MqttClient::counter=0;
|
||||||
|
|
||||||
void MqttClient::processMessage()
|
void MqttClient::processMessage(const MqttMessage* mesg)
|
||||||
{
|
{
|
||||||
counter++;
|
counter++;
|
||||||
#if TINY_MQTT_DEBUG
|
#if TINY_MQTT_DEBUG
|
||||||
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
|
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
|
||||||
{
|
{
|
||||||
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
|
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
|
||||||
// message.hexdump("Incoming");
|
// mesg->hexdump("Incoming");
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
auto header = message.getVHeader();
|
auto header = mesg->getVHeader();
|
||||||
const char* payload;
|
const char* payload;
|
||||||
uint16_t len;
|
uint16_t len;
|
||||||
bool bclose=true;
|
bool bclose=true;
|
||||||
|
|
||||||
switch(message.type() & 0XF0)
|
switch(mesg->type() & 0XF0)
|
||||||
{
|
{
|
||||||
case MqttMessage::Type::Connect:
|
case MqttMessage::Type::Connect:
|
||||||
if (mqtt_connected)
|
if (mqtt_connected)
|
||||||
@@ -345,30 +376,30 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ClientId
|
// ClientId
|
||||||
message.getString(payload, len);
|
mesg->getString(payload, len);
|
||||||
clientId = std::string(payload, len);
|
clientId = std::string(payload, len);
|
||||||
payload += len;
|
payload += len;
|
||||||
|
|
||||||
if (mqtt_flags & FlagWill) // Will topic
|
if (mqtt_flags & FlagWill) // Will topic
|
||||||
{
|
{
|
||||||
message.getString(payload, len); // Will Topic
|
mesg->getString(payload, len); // Will Topic
|
||||||
outstring("WillTopic", payload, len);
|
outstring("WillTopic", payload, len);
|
||||||
payload += len;
|
payload += len;
|
||||||
|
|
||||||
message.getString(payload, len); // Will Message
|
mesg->getString(payload, len); // Will Message
|
||||||
outstring("WillMessage", payload, len);
|
outstring("WillMessage", payload, len);
|
||||||
payload += len;
|
payload += len;
|
||||||
}
|
}
|
||||||
// FIXME forgetting credential is allowed (security hole)
|
// FIXME forgetting credential is allowed (security hole)
|
||||||
if (mqtt_flags & FlagUserName)
|
if (mqtt_flags & FlagUserName)
|
||||||
{
|
{
|
||||||
message.getString(payload, len);
|
mesg->getString(payload, len);
|
||||||
if (!parent->checkUser(payload, len)) break;
|
if (!parent->checkUser(payload, len)) break;
|
||||||
payload += len;
|
payload += len;
|
||||||
}
|
}
|
||||||
if (mqtt_flags & FlagPassword)
|
if (mqtt_flags & FlagPassword)
|
||||||
{
|
{
|
||||||
message.getString(payload, len);
|
mesg->getString(payload, len);
|
||||||
if (!parent->checkPassword(payload, len)) break;
|
if (!parent->checkPassword(payload, len)) break;
|
||||||
payload += len;
|
payload += len;
|
||||||
}
|
}
|
||||||
@@ -423,14 +454,14 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
|
|||||||
payload = header+2;
|
payload = header+2;
|
||||||
|
|
||||||
debug("subscribe loop");
|
debug("subscribe loop");
|
||||||
while(payload < message.end())
|
while(payload < mesg->end())
|
||||||
{
|
{
|
||||||
message.getString(payload, len); // Topic
|
mesg->getString(payload, len); // Topic
|
||||||
debug( " topic (" << std::string(payload, len) << ')');
|
debug( " topic (" << std::string(payload, len) << ')');
|
||||||
outstring("Subscribes", payload, len);
|
outstring("Subscribes", payload, len);
|
||||||
// subscribe(Topic(payload, len));
|
// subscribe(Topic(payload, len));
|
||||||
Topic topic(payload, len);
|
Topic topic(payload, len);
|
||||||
if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe)
|
if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe)
|
||||||
subscriptions.insert(topic);
|
subscriptions.insert(topic);
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@@ -449,37 +480,38 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::Publish:
|
case MqttMessage::Type::Publish:
|
||||||
if (!mqtt_connected) break;
|
if (mqtt_connected or client == nullptr)
|
||||||
{
|
{
|
||||||
uint8_t qos = message.type() & 0x6;
|
uint8_t qos = mesg->type() & 0x6;
|
||||||
payload = header;
|
payload = header;
|
||||||
message.getString(payload, len);
|
mesg->getString(payload, len);
|
||||||
Topic published(payload, len);
|
Topic published(payload, len);
|
||||||
payload += len;
|
payload += len;
|
||||||
len=message.end()-payload;
|
|
||||||
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
|
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
|
||||||
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl;
|
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
|
||||||
if (qos) payload+=2; // ignore packet identifier if any
|
if (qos) payload+=2; // ignore packet identifier if any
|
||||||
|
len=mesg->end()-payload;
|
||||||
// TODO reset DUP
|
// TODO reset DUP
|
||||||
// TODO reset RETAIN
|
// TODO reset RETAIN
|
||||||
if (parent)
|
|
||||||
|
if (client==nullptr) // internal MqttClient receives publish
|
||||||
|
{
|
||||||
|
if (callback and isSubscribedTo(published))
|
||||||
|
{
|
||||||
|
callback(this, published, payload, len); // TODO send the real payload
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if (parent) // from outside to inside
|
||||||
{
|
{
|
||||||
debug("publishing to parent");
|
debug("publishing to parent");
|
||||||
parent->publish(this, published, message);
|
parent->publish(this, published, *mesg);
|
||||||
}
|
}
|
||||||
else if (callback && subscriptions.find(published)!=subscriptions.end())
|
|
||||||
{
|
|
||||||
callback(this, published, nullptr, 0); // TODO send the real payload
|
|
||||||
}
|
|
||||||
message.create(MqttMessage::Type::PubAck);
|
|
||||||
// TODO re-add packet identifier if any
|
|
||||||
message.sendTo(this);
|
|
||||||
bclose = false;
|
bclose = false;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::Disconnect:
|
case MqttMessage::Type::Disconnect:
|
||||||
// TODO should discard any will message
|
// TODO should discard any will msg
|
||||||
if (!mqtt_connected) break;
|
if (!mqtt_connected) break;
|
||||||
mqtt_connected = false;
|
mqtt_connected = false;
|
||||||
close(false);
|
close(false);
|
||||||
@@ -492,8 +524,9 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
|
|||||||
};
|
};
|
||||||
if (bclose)
|
if (bclose)
|
||||||
{
|
{
|
||||||
Serial << "*************** Error msg 0x" << _HEX(message.type());
|
Serial << "*************** Error msg 0x" << _HEX(mesg->type());
|
||||||
message.hexdump("-------ERROR ------");
|
mesg->hexdump("-------ERROR ------");
|
||||||
|
dump();
|
||||||
Serial << endl;
|
Serial << endl;
|
||||||
close();
|
close();
|
||||||
}
|
}
|
||||||
@@ -501,7 +534,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
|
|||||||
{
|
{
|
||||||
clientAlive(parent ? 5 : 0);
|
clientAlive(parent ? 5 : 0);
|
||||||
}
|
}
|
||||||
message.reset();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Topic::matches(const Topic& topic) const
|
bool Topic::matches(const Topic& topic) const
|
||||||
@@ -517,8 +549,11 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
|
|||||||
MqttMessage msg(MqttMessage::Publish);
|
MqttMessage msg(MqttMessage::Publish);
|
||||||
msg.add(topic);
|
msg.add(topic);
|
||||||
msg.add(payload, pay_length, false);
|
msg.add(payload, pay_length, false);
|
||||||
|
msg.complete();
|
||||||
if (parent)
|
if (parent)
|
||||||
|
{
|
||||||
return parent->publish(this, topic, msg);
|
return parent->publish(this, topic, msg);
|
||||||
|
}
|
||||||
else if (client)
|
else if (client)
|
||||||
return msg.sendTo(this);
|
return msg.sendTo(this);
|
||||||
else
|
else
|
||||||
@@ -526,29 +561,33 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
|
|||||||
}
|
}
|
||||||
|
|
||||||
// republish a received publish if it matches any in subscriptions
|
// republish a received publish if it matches any in subscriptions
|
||||||
MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
|
MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg)
|
||||||
{
|
{
|
||||||
MqttError retval=MqttOk;
|
MqttError retval=MqttOk;
|
||||||
|
|
||||||
debug("mqttclient publish " << subscriptions.size());
|
debug("mqttclient publish " << subscriptions.size());
|
||||||
for(const auto& subscription: subscriptions)
|
if (isSubscribedTo(topic))
|
||||||
{
|
{
|
||||||
if (subscription.matches(topic))
|
|
||||||
{
|
|
||||||
debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ');
|
|
||||||
if (client)
|
if (client)
|
||||||
{
|
|
||||||
retval = msg.sendTo(this);
|
retval = msg.sendTo(this);
|
||||||
}
|
else
|
||||||
else if (callback)
|
|
||||||
{
|
{
|
||||||
callback(this, topic, nullptr, 0); // TODO Payload
|
processMessage(&msg);
|
||||||
}
|
// callback(this, topic, nullptr, 0); // TODO Payload
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return retval;
|
return retval;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool MqttClient::isSubscribedTo(const Topic& topic) const
|
||||||
|
{
|
||||||
|
for(const auto& subscription: subscriptions)
|
||||||
|
if (subscription.matches(topic))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void MqttMessage::reset()
|
void MqttMessage::reset()
|
||||||
{
|
{
|
||||||
buffer.clear();
|
buffer.clear();
|
||||||
@@ -623,7 +662,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
|
|||||||
while(len--) incoming(*p++);
|
while(len--) incoming(*p++);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttMessage::encodeLength(char* msb, int length)
|
void MqttMessage::encodeLength(char* msb, int length) const
|
||||||
{
|
{
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
@@ -634,7 +673,13 @@ void MqttMessage::encodeLength(char* msb, int length)
|
|||||||
} while (length);
|
} while (length);
|
||||||
};
|
};
|
||||||
|
|
||||||
MqttError MqttMessage::sendTo(MqttClient* client)
|
void MqttMessage::complete()
|
||||||
|
{
|
||||||
|
encodeLength(&buffer[1], buffer.size()-2);
|
||||||
|
state = Complete;
|
||||||
|
}
|
||||||
|
|
||||||
|
MqttError MqttMessage::sendTo(MqttClient* client) const
|
||||||
{
|
{
|
||||||
if (buffer.size())
|
if (buffer.size())
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -72,6 +72,7 @@ class MqttMessage
|
|||||||
const char* end() const { return &buffer[0]+buffer.size(); }
|
const char* end() const { return &buffer[0]+buffer.size(); }
|
||||||
const char* getVHeader() const { return &buffer[vheader]; }
|
const char* getVHeader() const { return &buffer[vheader]; }
|
||||||
uint16_t length() const { return buffer.size(); }
|
uint16_t length() const { return buffer.size(); }
|
||||||
|
void complete();
|
||||||
|
|
||||||
void reset();
|
void reset();
|
||||||
|
|
||||||
@@ -85,6 +86,13 @@ class MqttMessage
|
|||||||
return state == Complete ? static_cast<Type>(buffer[0]) : Unknown;
|
return state == Complete ? static_cast<Type>(buffer[0]) : Unknown;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// shouldn't exist because it breaks constness :-(
|
||||||
|
// but this saves memory so ...
|
||||||
|
void changeType(Type type) const
|
||||||
|
{
|
||||||
|
buffer[0] = type;
|
||||||
|
}
|
||||||
|
|
||||||
void create(Type type)
|
void create(Type type)
|
||||||
{
|
{
|
||||||
buffer=(char)type;
|
buffer=(char)type;
|
||||||
@@ -93,13 +101,13 @@ class MqttMessage
|
|||||||
size=0;
|
size=0;
|
||||||
state=Create;
|
state=Create;
|
||||||
}
|
}
|
||||||
MqttError sendTo(MqttClient*);
|
MqttError sendTo(MqttClient*) const;
|
||||||
void hexdump(const char* prefix=nullptr) const;
|
void hexdump(const char* prefix=nullptr) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void encodeLength(char* msb, int length);
|
void encodeLength(char* msb, int length) const;
|
||||||
|
|
||||||
std::string buffer;
|
mutable std::string buffer; // mutable -> sendTo()
|
||||||
uint8_t vheader;
|
uint8_t vheader;
|
||||||
uint16_t size; // bytes left to receive
|
uint16_t size; // bytes left to receive
|
||||||
State state;
|
State state;
|
||||||
@@ -120,13 +128,15 @@ class MqttClient
|
|||||||
FlagReserved = 1
|
FlagReserved = 1
|
||||||
};
|
};
|
||||||
public:
|
public:
|
||||||
MqttClient(MqttBroker* brk = nullptr, const std::string& id="");
|
/** Constructor. If broker is not null, this is the adress of a local broker.
|
||||||
|
If you want to connect elsewhere, leave broker null and use connect() **/
|
||||||
|
MqttClient(MqttBroker* broker = nullptr, const std::string& id="");
|
||||||
MqttClient(const std::string& id) : MqttClient(nullptr, id){}
|
MqttClient(const std::string& id) : MqttClient(nullptr, id){}
|
||||||
|
|
||||||
~MqttClient();
|
~MqttClient();
|
||||||
|
|
||||||
void connect(MqttBroker* parent);
|
void connect(MqttBroker* parent);
|
||||||
void connect(std::string broker, uint16_t port, uint16_t ka=10);
|
void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10);
|
||||||
|
|
||||||
bool connected() { return
|
bool connected() { return
|
||||||
(parent!=nullptr and client==nullptr) or
|
(parent!=nullptr and client==nullptr) or
|
||||||
@@ -137,6 +147,7 @@ class MqttClient
|
|||||||
const std::string& id() const { return clientId; }
|
const std::string& id() const { return clientId; }
|
||||||
void id(std::string& new_id) { clientId = new_id; }
|
void id(std::string& new_id) { clientId = new_id; }
|
||||||
|
|
||||||
|
/** Should be called in main loop() */
|
||||||
void loop();
|
void loop();
|
||||||
void close(bool bSendDisconnect=true);
|
void close(bool bSendDisconnect=true);
|
||||||
void setCallback(CallBack fun) {callback=fun; };
|
void setCallback(CallBack fun) {callback=fun; };
|
||||||
@@ -149,6 +160,7 @@ class MqttClient
|
|||||||
|
|
||||||
MqttError subscribe(Topic topic, uint8_t qos=0);
|
MqttError subscribe(Topic topic, uint8_t qos=0);
|
||||||
MqttError unsubscribe(Topic topic);
|
MqttError unsubscribe(Topic topic);
|
||||||
|
bool isSubscribedTo(const Topic& topic) const;
|
||||||
|
|
||||||
// connected to local broker
|
// connected to local broker
|
||||||
// TODO seems to be useless
|
// TODO seems to be useless
|
||||||
@@ -173,7 +185,8 @@ class MqttClient
|
|||||||
Serial << "]" << endl;
|
Serial << "]" << endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
static long counter; // Number of messages sent
|
/** Count the number of messages that have been sent **/
|
||||||
|
static long counter;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
|
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
|
||||||
@@ -182,10 +195,10 @@ class MqttClient
|
|||||||
friend class MqttBroker;
|
friend class MqttBroker;
|
||||||
MqttClient(MqttBroker* parent, WiFiClient& client);
|
MqttClient(MqttBroker* parent, WiFiClient& client);
|
||||||
// republish a received publish if topic matches any in subscriptions
|
// republish a received publish if topic matches any in subscriptions
|
||||||
MqttError publish(const Topic& topic, MqttMessage& msg);
|
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
|
||||||
|
|
||||||
void clientAlive(uint32_t more_seconds);
|
void clientAlive(uint32_t more_seconds);
|
||||||
void processMessage();
|
void processMessage(const MqttMessage* message);
|
||||||
|
|
||||||
bool mqtt_connected = false;
|
bool mqtt_connected = false;
|
||||||
char mqtt_flags;
|
char mqtt_flags;
|
||||||
@@ -222,7 +235,7 @@ class MqttBroker
|
|||||||
|
|
||||||
uint16_t port() const { return server.port(); }
|
uint16_t port() const { return server.port(); }
|
||||||
|
|
||||||
void connect(std::string host, uint16_t port=1883);
|
void connect(const std::string& host, uint16_t port=1883);
|
||||||
bool connected() const { return state == Connected; }
|
bool connected() const { return state == Connected; }
|
||||||
|
|
||||||
size_t clientsCount() const { return clients.size(); }
|
size_t clientsCount() const { return clients.size(); }
|
||||||
@@ -247,7 +260,9 @@ class MqttBroker
|
|||||||
{ return compareString(auth_password, password, len); }
|
{ return compareString(auth_password, password, len); }
|
||||||
|
|
||||||
|
|
||||||
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
|
MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const;
|
||||||
|
|
||||||
|
MqttError subscribe(const Topic& topic, uint8_t qos);
|
||||||
|
|
||||||
// For clients that are added not by the broker itself
|
// For clients that are added not by the broker itself
|
||||||
void addClient(MqttClient* client);
|
void addClient(MqttClient* client);
|
||||||
|
|||||||
@@ -17,10 +17,15 @@ MqttBroker broker(1883);
|
|||||||
|
|
||||||
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
||||||
|
|
||||||
|
const char* lastPayload;
|
||||||
|
size_t lastLength;
|
||||||
|
|
||||||
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
||||||
{
|
{
|
||||||
if (srce)
|
if (srce)
|
||||||
published[srce->id()][topic]++;
|
published[srce->id()][topic]++;
|
||||||
|
lastPayload = payload;
|
||||||
|
lastLength = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
test(local_client_should_unregister_when_destroyed)
|
test(local_client_should_unregister_when_destroyed)
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
* TinyMqtt nowifi unit tests.
|
* TinyMqtt nowifi unit tests.
|
||||||
*
|
*
|
||||||
* No wifi connection unit tests.
|
* No wifi connection unit tests.
|
||||||
* Checks with a local broker. Clients must connect to the local client
|
* Checks with a local broker. Clients must connect to the local broker
|
||||||
**/
|
**/
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
@@ -15,10 +15,15 @@ MqttBroker broker(1883);
|
|||||||
|
|
||||||
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
||||||
|
|
||||||
|
const char* lastPayload;
|
||||||
|
size_t lastLength;
|
||||||
|
|
||||||
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
||||||
{
|
{
|
||||||
if (srce)
|
if (srce)
|
||||||
published[srce->id()][topic]++;
|
published[srce->id()][topic]++;
|
||||||
|
lastPayload = payload;
|
||||||
|
lastLength = length;
|
||||||
}
|
}
|
||||||
|
|
||||||
test(nowifi_client_should_unregister_when_destroyed)
|
test(nowifi_client_should_unregister_when_destroyed)
|
||||||
@@ -56,11 +61,11 @@ test(nowifi_publish_should_be_dispatched)
|
|||||||
publisher.publish("a/c");
|
publisher.publish("a/c");
|
||||||
|
|
||||||
assertEqual(published.size(), (size_t)1); // 1 client has received something
|
assertEqual(published.size(), (size_t)1); // 1 client has received something
|
||||||
assertTrue(published[""]["a/b"] == 1);
|
assertEqual(published[""]["a/b"], 1);
|
||||||
assertTrue(published[""]["a/c"] == 2);
|
assertEqual(published[""]["a/c"], 2);
|
||||||
}
|
}
|
||||||
|
|
||||||
test(nowifi_publish_should_be_dispatched_to_nowifi_clients)
|
test(nowifi_publish_should_be_dispatched_to_clients)
|
||||||
{
|
{
|
||||||
published.clear();
|
published.clear();
|
||||||
assertEqual(broker.clientsCount(), (size_t)0);
|
assertEqual(broker.clientsCount(), (size_t)0);
|
||||||
@@ -75,14 +80,14 @@ test(nowifi_publish_should_be_dispatched_to_nowifi_clients)
|
|||||||
subscriber_b.subscribe("a/b");
|
subscriber_b.subscribe("a/b");
|
||||||
|
|
||||||
MqttClient publisher(&broker);
|
MqttClient publisher(&broker);
|
||||||
publisher.publish("a/b");
|
publisher.publish("a/b"); // A and B should receive this
|
||||||
publisher.publish("a/c");
|
publisher.publish("a/c"); // A should receive this
|
||||||
|
|
||||||
assertEqual(published.size(), (size_t)2); // 2 clients have received something
|
assertEqual(published.size(), (size_t)2); // 2 clients have received something
|
||||||
assertTrue(published["A"]["a/b"] == 1);
|
assertEqual(published["A"]["a/b"], 1);
|
||||||
assertTrue(published["A"]["a/c"] == 1);
|
assertEqual(published["A"]["a/c"], 1);
|
||||||
assertTrue(published["B"]["a/b"] == 1);
|
assertEqual(published["B"]["a/b"], 1);
|
||||||
assertTrue(published["B"]["a/c"] == 0);
|
assertEqual(published["B"]["a/c"], 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
test(nowifi_unsubscribe)
|
test(nowifi_unsubscribe)
|
||||||
@@ -95,14 +100,14 @@ test(nowifi_unsubscribe)
|
|||||||
subscriber.subscribe("a/b");
|
subscriber.subscribe("a/b");
|
||||||
|
|
||||||
MqttClient publisher(&broker);
|
MqttClient publisher(&broker);
|
||||||
publisher.publish("a/b");
|
publisher.publish("a/b"); // This publish is received
|
||||||
|
|
||||||
subscriber.unsubscribe("a/b");
|
subscriber.unsubscribe("a/b");
|
||||||
|
|
||||||
publisher.publish("a/b");
|
publisher.publish("a/b"); // Those one, no (unsubscribed)
|
||||||
publisher.publish("a/b");
|
publisher.publish("a/b");
|
||||||
|
|
||||||
assertTrue(published[""]["a/b"] == 1); // Only one publish has been received
|
assertEqual(published[""]["a/b"], 1); // Only one publish has been received
|
||||||
}
|
}
|
||||||
|
|
||||||
test(nowifi_nocallback_when_destroyed)
|
test(nowifi_nocallback_when_destroyed)
|
||||||
@@ -124,6 +129,25 @@ test(nowifi_nocallback_when_destroyed)
|
|||||||
assertEqual(published.size(), (size_t)1); // Only one publish has been received
|
assertEqual(published.size(), (size_t)1); // Only one publish has been received
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test(nowifi_payload_nullptr)
|
||||||
|
{
|
||||||
|
return; // FIXME
|
||||||
|
published.clear();
|
||||||
|
|
||||||
|
const char* payload="abcd";
|
||||||
|
|
||||||
|
MqttClient subscriber(&broker);
|
||||||
|
subscriber.setCallback(onPublish);
|
||||||
|
subscriber.subscribe("a/b");
|
||||||
|
|
||||||
|
MqttClient publisher(&broker);
|
||||||
|
publisher.publish("a/b", payload, strlen(payload)); // This publish is received
|
||||||
|
|
||||||
|
// coming from MqttClient::publish(...)
|
||||||
|
assertEqual(payload, lastPayload);
|
||||||
|
assertEqual(lastLength, (size_t)4);
|
||||||
|
}
|
||||||
|
|
||||||
//----------------------------------------------------------------------------
|
//----------------------------------------------------------------------------
|
||||||
// setup() and loop()
|
// setup() and loop()
|
||||||
void setup() {
|
void setup() {
|
||||||
|
|||||||
6
tests/string-indexer-tests/Makefile
Normal file
6
tests/string-indexer-tests/Makefile
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
# See https://github.com/bxparks/EpoxyDuino for documentation about this
|
||||||
|
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
|
||||||
|
|
||||||
|
APP_NAME := string-indexer-tests
|
||||||
|
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
|
||||||
|
include ../../../EpoxyDuino/EpoxyDuino.mk
|
||||||
123
tests/string-indexer-tests/string-indexer-tests.ino
Normal file
123
tests/string-indexer-tests/string-indexer-tests.ino
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
#include <AUnit.h>
|
||||||
|
#include <TinyMqtt.h>
|
||||||
|
#include <map>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* TinyMqtt local unit tests.
|
||||||
|
*
|
||||||
|
* Clients are connected to pseudo remote broker
|
||||||
|
* The remote will be 127.0.0.1:1883
|
||||||
|
* We are using 127.0.0.1 because this is simpler to test with a single ESP
|
||||||
|
* Also, this will allow to mock and thus run Action on github
|
||||||
|
**/
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
MqttBroker broker(1883);
|
||||||
|
|
||||||
|
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
|
||||||
|
|
||||||
|
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
||||||
|
{
|
||||||
|
if (srce)
|
||||||
|
published[srce->id()][topic]++;
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_empty)
|
||||||
|
{
|
||||||
|
assertEqual(StringIndexer::count(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_strings_deleted_should_empty_indexer)
|
||||||
|
{
|
||||||
|
assertTrue(StringIndexer::count()==0);
|
||||||
|
{
|
||||||
|
IndexedString one("one");
|
||||||
|
assertEqual(StringIndexer::count(), 1);
|
||||||
|
IndexedString two("two");
|
||||||
|
assertEqual(StringIndexer::count(), 2);
|
||||||
|
IndexedString three("three");
|
||||||
|
assertEqual(StringIndexer::count(), 3);
|
||||||
|
IndexedString four("four");
|
||||||
|
assertEqual(StringIndexer::count(), 4);
|
||||||
|
}
|
||||||
|
assertEqual(StringIndexer::count(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_same_strings_count_as_one)
|
||||||
|
{
|
||||||
|
IndexedString one ("one");
|
||||||
|
IndexedString two ("one");
|
||||||
|
IndexedString three("one");
|
||||||
|
IndexedString fourt("one");
|
||||||
|
|
||||||
|
assertEqual(StringIndexer::count(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_size_of_indexed_string)
|
||||||
|
{
|
||||||
|
assertEqual(sizeof(IndexedString), (size_t)1);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_different_strings_are_different)
|
||||||
|
{
|
||||||
|
IndexedString one("one");
|
||||||
|
IndexedString two("two");
|
||||||
|
|
||||||
|
assertFalse(one == two);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_same_strings_should_equal)
|
||||||
|
{
|
||||||
|
IndexedString one("one");
|
||||||
|
IndexedString two("one");
|
||||||
|
|
||||||
|
assertTrue(one == two);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_indexed_operator_eq)
|
||||||
|
{
|
||||||
|
IndexedString one("one");
|
||||||
|
{
|
||||||
|
IndexedString same = one;
|
||||||
|
assertTrue(one == same);
|
||||||
|
assertEqual(StringIndexer::count(), 1);
|
||||||
|
}
|
||||||
|
assertEqual(StringIndexer::count(), 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_get_string)
|
||||||
|
{
|
||||||
|
std::string sone("one");
|
||||||
|
IndexedString one(sone);
|
||||||
|
|
||||||
|
assertTrue(sone==one.str());
|
||||||
|
}
|
||||||
|
|
||||||
|
test(indexer_get_index)
|
||||||
|
{
|
||||||
|
IndexedString one1("one");
|
||||||
|
IndexedString one2("one");
|
||||||
|
IndexedString two1("two");
|
||||||
|
IndexedString two2("two");
|
||||||
|
|
||||||
|
assertTrue(one1.getIndex() == one2.getIndex());
|
||||||
|
assertTrue(two1.getIndex() == two2.getIndex());
|
||||||
|
assertTrue(one1.getIndex() != two1.getIndex());
|
||||||
|
}
|
||||||
|
|
||||||
|
//----------------------------------------------------------------------------
|
||||||
|
// setup() and loop()
|
||||||
|
void setup() {
|
||||||
|
delay(1000);
|
||||||
|
Serial.begin(115200);
|
||||||
|
while(!Serial);
|
||||||
|
|
||||||
|
Serial.println("=============[ TinyMqtt StringIndexer TESTS ]========================");
|
||||||
|
}
|
||||||
|
|
||||||
|
void loop() {
|
||||||
|
aunit::TestRunner::run();
|
||||||
|
|
||||||
|
if (Serial.available()) ESP.reset();
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user