Compare commits

...

14 Commits
test ... 0.7.3

Author SHA1 Message Date
hsaturn
5834a278c7 Release 0.7.3 2021-04-04 06:36:47 +02:00
hsaturn
146d0de1d4 MqttClient: bug fix, connection lost at each publish received 2021-04-04 06:35:50 +02:00
hsaturn
297a22efb5 Big rewrite of MqttClient in order to avoid code duplicate 2021-04-04 05:57:48 +02:00
hsaturn
510ff514a9 [tests] Changed assertions 2021-04-04 01:07:12 +02:00
hsaturn
ad6f7155e5 Added test for StringIndexer 2021-04-03 21:11:54 +02:00
hsaturn
3ed5874373 Fix compilation 2021-04-02 19:57:04 +02:00
hsaturn
e1a936e081 Merge branch 'main' of github.com:hsaturn/TinyMqtt into main 2021-04-02 19:23:53 +02:00
hsaturn
0757a95fbf Keywords updated, code clean 2021-04-02 19:22:59 +02:00
hsaturn
7c8d71262f Fix test (not yet finished) 2021-04-02 18:59:07 +02:00
hsaturn
138ce973f2 Typos in libraries 2021-04-02 18:48:37 +02:00
hsaturn
bf499117b7 Merge branch 'main' of github.com:hsaturn/TinyMqtt into main 2021-03-31 19:12:28 +02:00
hsaturn
4ed6f72602 Merge branch 'test' into main 2021-03-31 19:11:24 +02:00
hsaturn
549a23ffb7 Fix delete was not really deleting in tinytest 2021-03-31 10:40:06 +02:00
hsaturn
3a1af655d7 Fix compilation problem 2021-03-31 00:33:15 +02:00
11 changed files with 325 additions and 88 deletions

View File

@@ -40,6 +40,7 @@ std::map<std::string, MqttBroker*> brokers;
void setup() void setup()
{ {
WiFi.persistent(false); // https://github.com/esp8266/Arduino/issues/1054
Serial.begin(115200); Serial.begin(115200);
delay(500); delay(500);
Serial << endl << endl << endl Serial << endl << endl << endl
@@ -399,11 +400,11 @@ void loop()
} }
if (client) if (client)
{ {
clients.erase(s);
for (auto it: clients) for (auto it: clients)
{ {
if (it.second != client) continue; if (it.second != client) continue;
Serial << "deleted" << endl; Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first); clients.erase(it.first);
break; break;
} }
@@ -413,9 +414,9 @@ void loop()
{ {
for(auto it: brokers) for(auto it: brokers)
{ {
Serial << (int32_t)it.second << '/' << (int32_t)broker << endl;
if (broker != it.second) continue; if (broker != it.second) continue;
Serial << "deleted" << endl; Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first); brokers.erase(it.first);
break; break;
} }

View File

@@ -9,13 +9,19 @@
TinyMqtt KEYWORD1 TinyMqtt KEYWORD1
MqttBroker KEYWORD1 MqttBroker KEYWORD1
connect KEYWORD2
clientsCount KEYWORD2
begin KEYWORD2 begin KEYWORD2
loop KEYWORD2 loop KEYWORD2
port KEYWORD2
MqttClient KEYWORD1 MqttClient KEYWORD1
connect KEYWORD2
connected KEYWORD2
publish KEYWORD2 publish KEYWORD2
setCallback KEYWORD2 setCallback KEYWORD2
subscribe KEYWORD2 subscribe KEYWORD2
unsubscribe KEYWORD2
Topic KEYWORD1 Topic KEYWORD1
matches KEYWORD2 matches KEYWORD2

View File

@@ -1,12 +1,12 @@
{ {
"name": "TinyMqtt", "name": "TinyMqtt",
"keywords": "ethernet, mqtt, m2m, iot", "keywords": "ethernet, mqtt, m2m, iot",
"description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS.", "description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages. It does support MQTT 3.1.1 without QOS=0.",
"repository": { "repository": {
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.7.1", "version": "0.7.3",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,10 +1,11 @@
name=TinyMqtt name=TinyMqtt
version=0.7.1 version=0.7.3
author=Francois BIOT, HSaturn, <hsaturn@gmail.com> author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com> maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages and to jhost a broker in your ESP. It does support MQTT 3.1.1 without any QOS. paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows to send and receive MQTT messages and to host a broker in your ESP. It does support MQTT 3.1.1 without QoS=0.
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*
includes=TinyMqtt.h includes=TinyMqtt.h
depends=

View File

@@ -39,7 +39,7 @@ class StringIndexer
return it->first; return it->first;
} }
} }
for(index_t index=0; index<255; index++) for(index_t index=1; index; index++)
{ {
if (strings.find(index)==strings.end()) if (strings.find(index)==strings.end())
{ {
@@ -80,6 +80,8 @@ class StringIndexer
} }
} }
static uint16_t count() { return strings.size(); }
private: private:
static std::map<index_t, StringCounter> strings; static std::map<index_t, StringCounter> strings;
}; };
@@ -98,6 +100,8 @@ class IndexedString
index=StringIndexer::strToIndex(str, len); index=StringIndexer::strToIndex(str, len);
} }
IndexedString(const std::string& str) : IndexedString(str.c_str(), str.length()) {};
~IndexedString() { StringIndexer::release(index); } ~IndexedString() { StringIndexer::release(index); }
IndexedString& operator=(const IndexedString& source) IndexedString& operator=(const IndexedString& source)
@@ -112,6 +116,11 @@ class IndexedString
return i1.index < i2.index; return i1.index < i2.index;
} }
friend bool operator==(const IndexedString& i1, const IndexedString& i2)
{
return i1.index == i2.index;
}
const std::string& str() const { return StringIndexer::str(index); } const std::string& str() const { return StringIndexer::str(index); }
const StringIndexer::index_t& getIndex() const { return index; } const StringIndexer::index_t& getIndex() const { return index; }

View File

@@ -19,6 +19,7 @@ MqttBroker::~MqttBroker()
{ {
delete clients[0]; delete clients[0];
} }
server.close();
} }
// private constructor used by broker only // private constructor used by broker only
@@ -97,6 +98,13 @@ void MqttBroker::addClient(MqttClient* client)
clients.push_back(client); clients.push_back(client);
} }
void MqttBroker::connect(const std::string& host, uint16_t port)
{
if (broker == nullptr) broker = new MqttClient;
broker->connect(host, port);
broker->parent = this; // Because connect removed the link
}
void MqttBroker::removeClient(MqttClient* remove) void MqttBroker::removeClient(MqttClient* remove)
{ {
for(auto it=clients.begin(); it!=clients.end(); it++) for(auto it=clients.begin(); it!=clients.end(); it++)
@@ -104,6 +112,11 @@ void MqttBroker::removeClient(MqttClient* remove)
auto client=*it; auto client=*it;
if (client==remove) if (client==remove)
{ {
// TODO if this broker is connected to an external broker
// we have to unsubscribe remove's topics.
// (but doing this, check that other clients are not subscribed...)
// Unless -> we could receive useless messages
// -> we are using (memory) one IndexedString plus its string for nothing.
debug("Remove " << clients.size()); debug("Remove " << clients.size());
clients.erase(it); clients.erase(it);
debug("Client removed " << clients.size()); debug("Client removed " << clients.size());
@@ -117,6 +130,13 @@ void MqttBroker::loop()
{ {
WiFiClient client = server.available(); WiFiClient client = server.available();
if (broker)
{
// TODO should monitor broker's activity.
// 1 When broker disconnect and reconnect we have to re-subscribe
broker->loop();
}
if (client) if (client)
{ {
addClient(new MqttClient(this, client)); addClient(new MqttClient(this, client));
@@ -142,7 +162,15 @@ void MqttBroker::loop()
} }
} }
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
if (broker && broker->connected())
{
return broker->subscribe(topic, qos);
}
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
{ {
MqttError retval = MqttOk; MqttError retval = MqttOk;
@@ -156,28 +184,27 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif #endif
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Broker is connected if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
{ {
// ext broker -> clients or // ext_broker -> clients or clients -> ext_broker
// or clients -> ext broker if (source == broker) // external broker -> internal clients
if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // external clients -> this broker
{ {
MqttError ret = broker->publish(topic, msg); // As this broker is connected to another broker, simply forward the msg
MqttError ret = broker->publishIfSubscribed(topic, msg);
if (ret != MqttOk) retval = ret; if (ret != MqttOk) retval = ret;
} }
} }
else // Disconnected: R7 else // Disconnected
{ {
// All is allowed
doit = true; doit = true;
} }
#if TINY_MQTT_DEBUG #if TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
#endif #endif
if (doit) retval = client->publish(topic, msg); if (doit) retval = client->publishIfSubscribed(topic, msg);
debug(""); debug("");
} }
return retval; return retval;
@@ -236,7 +263,8 @@ void MqttClient::loop()
message.incoming(client->read()); message.incoming(client->read());
if (message.type()) if (message.type())
{ {
processMessage(); processMessage(&message);
message.reset();
} }
} }
} }
@@ -272,6 +300,10 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{ {
return sendTopic(topic, MqttMessage::Type::Subscribe, qos); return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
} }
else
{
return parent->subscribe(topic, qos);
}
return ret; return ret;
} }
@@ -306,22 +338,22 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint
long MqttClient::counter=0; long MqttClient::counter=0;
void MqttClient::processMessage() void MqttClient::processMessage(const MqttMessage* mesg)
{ {
counter++; counter++;
#if TINY_MQTT_DEBUG #if TINY_MQTT_DEBUG
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{ {
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// message.hexdump("Incoming"); // mesg->hexdump("Incoming");
} }
#endif #endif
auto header = message.getVHeader(); auto header = mesg->getVHeader();
const char* payload; const char* payload;
uint16_t len; uint16_t len;
bool bclose=true; bool bclose=true;
switch(message.type() & 0XF0) switch(mesg->type() & 0XF0)
{ {
case MqttMessage::Type::Connect: case MqttMessage::Type::Connect:
if (mqtt_connected) if (mqtt_connected)
@@ -344,30 +376,30 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
} }
// ClientId // ClientId
message.getString(payload, len); mesg->getString(payload, len);
clientId = std::string(payload, len); clientId = std::string(payload, len);
payload += len; payload += len;
if (mqtt_flags & FlagWill) // Will topic if (mqtt_flags & FlagWill) // Will topic
{ {
message.getString(payload, len); // Will Topic mesg->getString(payload, len); // Will Topic
outstring("WillTopic", payload, len); outstring("WillTopic", payload, len);
payload += len; payload += len;
message.getString(payload, len); // Will Message mesg->getString(payload, len); // Will Message
outstring("WillMessage", payload, len); outstring("WillMessage", payload, len);
payload += len; payload += len;
} }
// FIXME forgetting credential is allowed (security hole) // FIXME forgetting credential is allowed (security hole)
if (mqtt_flags & FlagUserName) if (mqtt_flags & FlagUserName)
{ {
message.getString(payload, len); mesg->getString(payload, len);
if (!parent->checkUser(payload, len)) break; if (!parent->checkUser(payload, len)) break;
payload += len; payload += len;
} }
if (mqtt_flags & FlagPassword) if (mqtt_flags & FlagPassword)
{ {
message.getString(payload, len); mesg->getString(payload, len);
if (!parent->checkPassword(payload, len)) break; if (!parent->checkPassword(payload, len)) break;
payload += len; payload += len;
} }
@@ -422,14 +454,14 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
payload = header+2; payload = header+2;
debug("subscribe loop"); debug("subscribe loop");
while(payload < message.end()) while(payload < mesg->end())
{ {
message.getString(payload, len); // Topic mesg->getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')'); debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len); outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len)); // subscribe(Topic(payload, len));
Topic topic(payload, len); Topic topic(payload, len);
if ((message.type() & 0XF0) == MqttMessage::Type::Subscribe) if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe)
subscriptions.insert(topic); subscriptions.insert(topic);
else else
{ {
@@ -448,37 +480,38 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
if (!mqtt_connected) break; if (mqtt_connected or client == nullptr)
{ {
uint8_t qos = message.type() & 0x6; uint8_t qos = mesg->type() & 0x6;
payload = header; payload = header;
message.getString(payload, len); mesg->getString(payload, len);
Topic published(payload, len); Topic published(payload, len);
payload += len; payload += len;
len=message.end()-payload;
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl; // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
len=mesg->end()-payload;
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
if (parent)
if (client==nullptr) // internal MqttClient receives publish
{
if (callback and isSubscribedTo(published))
{
callback(this, published, payload, len); // TODO send the real payload
}
}
else if (parent) // from outside to inside
{ {
debug("publishing to parent"); debug("publishing to parent");
parent->publish(this, published, message); parent->publish(this, published, *mesg);
} }
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
message.create(MqttMessage::Type::PubAck);
// TODO re-add packet identifier if any
message.sendTo(this);
bclose = false; bclose = false;
} }
break; break;
case MqttMessage::Type::Disconnect: case MqttMessage::Type::Disconnect:
// TODO should discard any will message // TODO should discard any will msg
if (!mqtt_connected) break; if (!mqtt_connected) break;
mqtt_connected = false; mqtt_connected = false;
close(false); close(false);
@@ -491,8 +524,9 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
}; };
if (bclose) if (bclose)
{ {
Serial << "*************** Error msg 0x" << _HEX(message.type()); Serial << "*************** Error msg 0x" << _HEX(mesg->type());
message.hexdump("-------ERROR ------"); mesg->hexdump("-------ERROR ------");
dump();
Serial << endl; Serial << endl;
close(); close();
} }
@@ -500,7 +534,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag
{ {
clientAlive(parent ? 5 : 0); clientAlive(parent ? 5 : 0);
} }
message.reset();
} }
bool Topic::matches(const Topic& topic) const bool Topic::matches(const Topic& topic) const
@@ -516,8 +549,11 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
MqttMessage msg(MqttMessage::Publish); MqttMessage msg(MqttMessage::Publish);
msg.add(topic); msg.add(topic);
msg.add(payload, pay_length, false); msg.add(payload, pay_length, false);
msg.complete();
if (parent) if (parent)
{
return parent->publish(this, topic, msg); return parent->publish(this, topic, msg);
}
else if (client) else if (client)
return msg.sendTo(this); return msg.sendTo(this);
else else
@@ -525,29 +561,33 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
} }
// republish a received publish if it matches any in subscriptions // republish a received publish if it matches any in subscriptions
MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg) MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg)
{ {
MqttError retval=MqttOk; MqttError retval=MqttOk;
debug("mqttclient publish " << subscriptions.size()); debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) if (isSubscribedTo(topic))
{ {
if (subscription.matches(topic)) if (client)
retval = msg.sendTo(this);
else
{ {
debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' '); processMessage(&msg);
if (client) // callback(this, topic, nullptr, 0); // TODO Payload
{
retval = msg.sendTo(this);
}
else if (callback)
{
callback(this, topic, nullptr, 0); // TODO Payload
}
} }
} }
return retval; return retval;
} }
bool MqttClient::isSubscribedTo(const Topic& topic) const
{
for(const auto& subscription: subscriptions)
if (subscription.matches(topic))
return true;
return false;
}
void MqttMessage::reset() void MqttMessage::reset()
{ {
buffer.clear(); buffer.clear();
@@ -622,7 +662,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
while(len--) incoming(*p++); while(len--) incoming(*p++);
} }
void MqttMessage::encodeLength(char* msb, int length) void MqttMessage::encodeLength(char* msb, int length) const
{ {
do do
{ {
@@ -633,7 +673,13 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
MqttError MqttMessage::sendTo(MqttClient* client) void MqttMessage::complete()
{
encodeLength(&buffer[1], buffer.size()-2);
state = Complete;
}
MqttError MqttMessage::sendTo(MqttClient* client) const
{ {
if (buffer.size()) if (buffer.size())
{ {

View File

@@ -72,6 +72,7 @@ class MqttMessage
const char* end() const { return &buffer[0]+buffer.size(); } const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; } const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return buffer.size(); } uint16_t length() const { return buffer.size(); }
void complete();
void reset(); void reset();
@@ -85,6 +86,13 @@ class MqttMessage
return state == Complete ? static_cast<Type>(buffer[0]) : Unknown; return state == Complete ? static_cast<Type>(buffer[0]) : Unknown;
} }
// shouldn't exist because it breaks constness :-(
// but this saves memory so ...
void changeType(Type type) const
{
buffer[0] = type;
}
void create(Type type) void create(Type type)
{ {
buffer=(char)type; buffer=(char)type;
@@ -93,13 +101,13 @@ class MqttMessage
size=0; size=0;
state=Create; state=Create;
} }
MqttError sendTo(MqttClient*); MqttError sendTo(MqttClient*) const;
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
void encodeLength(char* msb, int length); void encodeLength(char* msb, int length) const;
std::string buffer; mutable std::string buffer; // mutable -> sendTo()
uint8_t vheader; uint8_t vheader;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
State state; State state;
@@ -120,13 +128,15 @@ class MqttClient
FlagReserved = 1 FlagReserved = 1
}; };
public: public:
MqttClient(MqttBroker* brk = nullptr, const std::string& id=""); /** Constructor. If broker is not null, this is the adress of a local broker.
If you want to connect elsewhere, leave broker null and use connect() **/
MqttClient(MqttBroker* broker = nullptr, const std::string& id="");
MqttClient(const std::string& id) : MqttClient(nullptr, id){} MqttClient(const std::string& id) : MqttClient(nullptr, id){}
~MqttClient(); ~MqttClient();
void connect(MqttBroker* parent); void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port, uint16_t ka=10); void connect(std::string broker, uint16_t port, uint16_t keep_alive = 10);
bool connected() { return bool connected() { return
(parent!=nullptr and client==nullptr) or (parent!=nullptr and client==nullptr) or
@@ -137,6 +147,7 @@ class MqttClient
const std::string& id() const { return clientId; } const std::string& id() const { return clientId; }
void id(std::string& new_id) { clientId = new_id; } void id(std::string& new_id) { clientId = new_id; }
/** Should be called in main loop() */
void loop(); void loop();
void close(bool bSendDisconnect=true); void close(bool bSendDisconnect=true);
void setCallback(CallBack fun) {callback=fun; }; void setCallback(CallBack fun) {callback=fun; };
@@ -149,6 +160,7 @@ class MqttClient
MqttError subscribe(Topic topic, uint8_t qos=0); MqttError subscribe(Topic topic, uint8_t qos=0);
MqttError unsubscribe(Topic topic); MqttError unsubscribe(Topic topic);
bool isSubscribedTo(const Topic& topic) const;
// connected to local broker // connected to local broker
// TODO seems to be useless // TODO seems to be useless
@@ -160,7 +172,7 @@ class MqttClient
Serial << "MqttClient (" << clientId.c_str() << ") " << (connected() ? " ON " : " OFF"); Serial << "MqttClient (" << clientId.c_str() << ") " << (connected() ? " ON " : " OFF");
Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive; Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive;
Serial << (client && client->connected() ? "" : "dis") << "connected"; Serial << (client && client->connected() ? "" : "dis") << "connected";
message.hexdump("entrant msg"); message.hexdump("entrant msg");
bool c=false; bool c=false;
Serial << " ["; Serial << " [";
for(auto s: subscriptions) for(auto s: subscriptions)
@@ -173,7 +185,8 @@ class MqttClient
Serial << "]" << endl; Serial << "]" << endl;
} }
static long counter; // Number of messages sent /** Count the number of messages that have been sent **/
static long counter;
private: private:
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
@@ -182,10 +195,10 @@ class MqttClient
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
MqttError publish(const Topic& topic, MqttMessage& msg); MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
void clientAlive(uint32_t more_seconds); void clientAlive(uint32_t more_seconds);
void processMessage(); void processMessage(const MqttMessage* message);
bool mqtt_connected = false; bool mqtt_connected = false;
char mqtt_flags; char mqtt_flags;
@@ -222,7 +235,7 @@ class MqttBroker
uint16_t port() const { return server.port(); } uint16_t port() const { return server.port(); }
void connect(std::string host, uint16_t port=1883); void connect(const std::string& host, uint16_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
size_t clientsCount() const { return clients.size(); } size_t clientsCount() const { return clients.size(); }
@@ -247,7 +260,9 @@ class MqttBroker
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg); MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const;
MqttError subscribe(const Topic& topic, uint8_t qos);
// For clients that are added not by the broker itself // For clients that are added not by the broker itself
void addClient(MqttClient* client); void addClient(MqttClient* client);

View File

@@ -17,14 +17,20 @@ MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ {
if (srce) if (srce)
published[srce->id()][topic]++; published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
} }
test(local_client_should_unregister_when_destroyed) test(local_client_should_unregister_when_destroyed)
{ {
return;
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
{ {
MqttClient client; MqttClient client;

View File

@@ -6,7 +6,7 @@
* TinyMqtt nowifi unit tests. * TinyMqtt nowifi unit tests.
* *
* No wifi connection unit tests. * No wifi connection unit tests.
* Checks with a local broker. Clients must connect to the local client * Checks with a local broker. Clients must connect to the local broker
**/ **/
using namespace std; using namespace std;
@@ -15,10 +15,15 @@ MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
const char* lastPayload;
size_t lastLength;
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length) void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ {
if (srce) if (srce)
published[srce->id()][topic]++; published[srce->id()][topic]++;
lastPayload = payload;
lastLength = length;
} }
test(nowifi_client_should_unregister_when_destroyed) test(nowifi_client_should_unregister_when_destroyed)
@@ -56,11 +61,11 @@ test(nowifi_publish_should_be_dispatched)
publisher.publish("a/c"); publisher.publish("a/c");
assertEqual(published.size(), (size_t)1); // 1 client has received something assertEqual(published.size(), (size_t)1); // 1 client has received something
assertTrue(published[""]["a/b"] == 1); assertEqual(published[""]["a/b"], 1);
assertTrue(published[""]["a/c"] == 2); assertEqual(published[""]["a/c"], 2);
} }
test(nowifi_publish_should_be_dispatched_to_nowifi_clients) test(nowifi_publish_should_be_dispatched_to_clients)
{ {
published.clear(); published.clear();
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
@@ -75,14 +80,14 @@ test(nowifi_publish_should_be_dispatched_to_nowifi_clients)
subscriber_b.subscribe("a/b"); subscriber_b.subscribe("a/b");
MqttClient publisher(&broker); MqttClient publisher(&broker);
publisher.publish("a/b"); publisher.publish("a/b"); // A and B should receive this
publisher.publish("a/c"); publisher.publish("a/c"); // A should receive this
assertEqual(published.size(), (size_t)2); // 2 clients have received something assertEqual(published.size(), (size_t)2); // 2 clients have received something
assertTrue(published["A"]["a/b"] == 1); assertEqual(published["A"]["a/b"], 1);
assertTrue(published["A"]["a/c"] == 1); assertEqual(published["A"]["a/c"], 1);
assertTrue(published["B"]["a/b"] == 1); assertEqual(published["B"]["a/b"], 1);
assertTrue(published["B"]["a/c"] == 0); assertEqual(published["B"]["a/c"], 0);
} }
test(nowifi_unsubscribe) test(nowifi_unsubscribe)
@@ -95,14 +100,14 @@ test(nowifi_unsubscribe)
subscriber.subscribe("a/b"); subscriber.subscribe("a/b");
MqttClient publisher(&broker); MqttClient publisher(&broker);
publisher.publish("a/b"); publisher.publish("a/b"); // This publish is received
subscriber.unsubscribe("a/b"); subscriber.unsubscribe("a/b");
publisher.publish("a/b"); publisher.publish("a/b"); // Those one, no (unsubscribed)
publisher.publish("a/b"); publisher.publish("a/b");
assertTrue(published[""]["a/b"] == 1); // Only one publish has been received assertEqual(published[""]["a/b"], 1); // Only one publish has been received
} }
test(nowifi_nocallback_when_destroyed) test(nowifi_nocallback_when_destroyed)
@@ -124,6 +129,25 @@ test(nowifi_nocallback_when_destroyed)
assertEqual(published.size(), (size_t)1); // Only one publish has been received assertEqual(published.size(), (size_t)1); // Only one publish has been received
} }
test(nowifi_payload_nullptr)
{
return; // FIXME
published.clear();
const char* payload="abcd";
MqttClient subscriber(&broker);
subscriber.setCallback(onPublish);
subscriber.subscribe("a/b");
MqttClient publisher(&broker);
publisher.publish("a/b", payload, strlen(payload)); // This publish is received
// coming from MqttClient::publish(...)
assertEqual(payload, lastPayload);
assertEqual(lastLength, (size_t)4);
}
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
// setup() and loop() // setup() and loop()
void setup() { void setup() {

View File

@@ -0,0 +1,6 @@
# See https://github.com/bxparks/EpoxyDuino for documentation about this
# Makefile to compile and run Arduino programs natively on Linux or MacOS.
APP_NAME := string-indexer-tests
ARDUINO_LIBS := AUnit AceCommon AceTime TinyMqtt EspMock
include ../../../EpoxyDuino/EpoxyDuino.mk

View File

@@ -0,0 +1,123 @@
#include <AUnit.h>
#include <TinyMqtt.h>
#include <map>
/**
* TinyMqtt local unit tests.
*
* Clients are connected to pseudo remote broker
* The remote will be 127.0.0.1:1883
* We are using 127.0.0.1 because this is simpler to test with a single ESP
* Also, this will allow to mock and thus run Action on github
**/
using namespace std;
MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{
if (srce)
published[srce->id()][topic]++;
}
test(indexer_empty)
{
assertEqual(StringIndexer::count(), 0);
}
test(indexer_strings_deleted_should_empty_indexer)
{
assertTrue(StringIndexer::count()==0);
{
IndexedString one("one");
assertEqual(StringIndexer::count(), 1);
IndexedString two("two");
assertEqual(StringIndexer::count(), 2);
IndexedString three("three");
assertEqual(StringIndexer::count(), 3);
IndexedString four("four");
assertEqual(StringIndexer::count(), 4);
}
assertEqual(StringIndexer::count(), 0);
}
test(indexer_same_strings_count_as_one)
{
IndexedString one ("one");
IndexedString two ("one");
IndexedString three("one");
IndexedString fourt("one");
assertEqual(StringIndexer::count(), 1);
}
test(indexer_size_of_indexed_string)
{
assertEqual(sizeof(IndexedString), (size_t)1);
}
test(indexer_different_strings_are_different)
{
IndexedString one("one");
IndexedString two("two");
assertFalse(one == two);
}
test(indexer_same_strings_should_equal)
{
IndexedString one("one");
IndexedString two("one");
assertTrue(one == two);
}
test(indexer_indexed_operator_eq)
{
IndexedString one("one");
{
IndexedString same = one;
assertTrue(one == same);
assertEqual(StringIndexer::count(), 1);
}
assertEqual(StringIndexer::count(), 1);
}
test(indexer_get_string)
{
std::string sone("one");
IndexedString one(sone);
assertTrue(sone==one.str());
}
test(indexer_get_index)
{
IndexedString one1("one");
IndexedString one2("one");
IndexedString two1("two");
IndexedString two2("two");
assertTrue(one1.getIndex() == one2.getIndex());
assertTrue(two1.getIndex() == two2.getIndex());
assertTrue(one1.getIndex() != two1.getIndex());
}
//----------------------------------------------------------------------------
// setup() and loop()
void setup() {
delay(1000);
Serial.begin(115200);
while(!Serial);
Serial.println("=============[ TinyMqtt StringIndexer TESTS ]========================");
}
void loop() {
aunit::TestRunner::run();
if (Serial.available()) ESP.reset();
}