diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index a9be1f9..1547c0b 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -1,6 +1,10 @@ #include "TinyMqtt.h" #include +#ifdef EPOXY_DUINO + std::map MqttClient::counters; +#endif + MqttBroker::MqttBroker(uint16_t port) { server = new TcpServer(port); @@ -30,7 +34,11 @@ MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client) #else client = new WiFiClient(*new_client); #endif +#ifdef EPOXY_DUINO + alive = millis()+500000; +#else alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg +#endif } MqttClient::MqttClient(MqttBroker* parent, const std::string& id) @@ -242,7 +250,11 @@ void MqttClient::clientAlive(uint32_t more_seconds) { if (keep_alive) { +#ifdef EPOXY_DUINO + alive=millis()+500000; +#else alive=millis()+1000*(keep_alive+more_seconds); +#endif } else alive=0; @@ -387,11 +399,8 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint return msg.sendTo(this); } -long MqttClient::counter=0; - void MqttClient::processMessage(MqttMessage* mesg) { - counter++; #ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { @@ -409,7 +418,11 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T uint16_t len; bool bclose=true; - switch(mesg->type() & 0XF0) +#ifdef EPOXY_DUINO + counters[mesg->type()]++; +#endif + + switch(mesg->type()) { case MqttMessage::Type::Connect: if (mqtt_connected) @@ -510,17 +523,25 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T payload = header+2; debug("un/subscribe loop"); + std::string qoss; while(payload < mesg->end()) { mesg->getString(payload, len); // Topic debug( " topic (" << std::string(payload, len) << ')'); // subscribe(Topic(payload, len)); Topic topic(payload, len); + payload += len; - if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe) + if (mesg->type() == MqttMessage::Type::Subscribe) { uint8_t qos = *payload++; - if (qos != 0) debug("Unsupported QOS" << qos << endl); + if (qos != 0) + { + debug("Unsupported QOS" << qos << endl); + qoss.push_back(0x80); + } + else + qoss.push_back(qos); subscriptions.insert(topic); } else @@ -532,7 +553,12 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T } debug("end loop"); bclose = false; - // TODO SUBACK + + MqttMessage ack(mesg->type() == MqttMessage::Type::Subscribe ? MqttMessage::Type::SubAck : MqttMessage::Type::UnSuback); + ack.add(header[0]); + ack.add(header[1]); + ack.add(qoss.c_str(), qoss.size(), false); + ack.sendTo(this); } break; @@ -547,7 +573,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T #endif if (mqtt_connected or client == nullptr) { - uint8_t qos = mesg->type() & 0x6; + uint8_t qos = mesg->flags(); payload = header; mesg->getString(payload, len); Topic published(payload, len); diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 8a17500..bc36e7e 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -42,7 +42,7 @@ using TcpServer = WiFiServer; #endif -enum MqttError +enum __attribute__((packed)) MqttError { MqttOk = 0, MqttNowhereToSend=1, @@ -66,7 +66,7 @@ class MqttMessage { const uint16_t MaxBufferLength = 4096; //hard limit: 16k due to size decoding public: - enum Type + enum __attribute__((packed)) Type { Unknown = 0, Connect = 0x10, @@ -76,12 +76,12 @@ class MqttMessage Subscribe = 0x80, SubAck = 0x90, UnSubscribe = 0xA0, - UnSuback = 0xB0, + UnSuback = 0xB0, PingReq = 0xC0, PingResp = 0xD0, Disconnect = 0xE0 }; - enum State + enum __attribute__((packed)) State { FixedHeader=0, Length=1, @@ -111,12 +111,14 @@ class MqttMessage Type type() const { - return state == Complete ? static_cast(buffer[0]) : Unknown; + return state == Complete ? static_cast(buffer[0] & 0xF0) : Unknown; } + uint8_t flags() const { return static_cast(buffer[0] & 0x0F); } + void create(Type type) { - buffer=(char)type; + buffer=(decltype(buffer)::value_type)type; buffer+='\0'; // reserved for msg length byte 1/2 buffer+='\0'; // reserved for msg length byte 2/2 (fixed) vheader=3; // Should never change @@ -139,7 +141,7 @@ class MqttBroker; class MqttClient { using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length); - enum Flags + enum __attribute__((packed)) Flags { FlagUserName = 128, FlagPassword = 64, @@ -223,7 +225,9 @@ class MqttClient #endif } - static long counter; // Number of processed messages +#ifdef EPOXY_DUINO + static std::map counters; // Number of processed messages +#endif private: @@ -262,7 +266,7 @@ class MqttClient class MqttBroker { - enum State + enum __attribute__((packed)) State { Disconnected, // Also the initial state Connecting, // connect and sends a fake publish to avoid circular cnx diff --git a/tests/howto b/tests/howto index cc4dda2..e629f0f 100644 --- a/tests/howto +++ b/tests/howto @@ -4,3 +4,4 @@ git clone https://github.com/bxparks/AUnit.git git clone https://github.com/bxparks/EpoxyDuino.git cd TinyMqtt/tests make +make runtests diff --git a/tests/local-tests/local-tests.ino b/tests/local-tests/local-tests.ino index f812905..e260b09 100644 --- a/tests/local-tests/local-tests.ino +++ b/tests/local-tests/local-tests.ino @@ -31,12 +31,10 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, test(local_client_should_unregister_when_destroyed) { -return; assertEqual(broker.clientsCount(), (size_t)0); { - MqttClient client; assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected - client.connect("127.0.0.1", 1883); + MqttClient client(&broker); assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected } assertEqual(broker.clientsCount(), (size_t)0); diff --git a/tests/network-tests/network-tests.ino b/tests/network-tests/network-tests.ino index f953114..b43e477 100644 --- a/tests/network-tests/network-tests.ino +++ b/tests/network-tests/network-tests.ino @@ -12,6 +12,11 @@ using namespace std; +String toString(const IPAddress& ip) +{ + return String(ip[0])+'.'+String(ip[1])+'.'+String(ip[2])+'.'+String(ip[3]); +} + MqttBroker broker(1883); std::map> published; // map[client_id] => map[topic] = count @@ -51,10 +56,36 @@ test(network_single_broker_begin) // TODO Nothing is tested here ! } +test(suback) +{ + start_servers(2, true); + assertEqual(WiFi.status(), WL_CONNECTED); + + MqttBroker broker(1883); + broker.begin(); + IPAddress broker_ip = WiFi.localIP(); + + ESP8266WiFiClass::selectInstance(2); + MqttClient client; + client.connect(broker_ip.toString().c_str(), 1883); + broker.loop(); + + assertTrue(broker.clientsCount() == 1); + assertTrue(client.connected()); + + MqttClient::counters[MqttMessage::Type::SubAck] = 0; + client.subscribe("a/b"); + + // TODO how to avoid these loops ??? + broker.loop(); + client.loop(); + + assertEqual(MqttClient::counters[MqttMessage::Type::SubAck], 1); +} + test(network_client_to_broker_connexion) { start_servers(2, true); - assertEqual(WiFi.status(), WL_CONNECTED); MqttBroker broker(1883); @@ -147,7 +178,6 @@ test(network_client_should_unregister_when_destroyed) assertEqual(broker.clientsCount(), (size_t)0); } -#if 0 // THESE TESTS ARE IN LOCAL MODE // WE HAVE TO CONVERT THEM TO WIFI MODE (pass through virtual TCP link) @@ -277,8 +307,8 @@ test(network_hudge_payload) // onPublish should have filled lastPayload and lastLength assertEqual(payload, lastPayload); assertEqual(lastLength, strlen(payload)); + assertEqual(strcmp(payload, lastPayload), 0); } -#endif //---------------------------------------------------------------------------- // setup() and loop() @@ -286,9 +316,10 @@ void setup() { /* delay(1000); Serial.begin(115200); while(!Serial); + */ Serial.println("=============[ FAKE NETWORK TinyMqtt TESTS ]========================"); -*/ + WiFi.mode(WIFI_STA); WiFi.begin("network", "password"); }