diff --git a/README.md b/README.md index e6d7064..828e2b1 100644 --- a/README.md +++ b/README.md @@ -6,10 +6,13 @@ ![](https://img.shields.io/github/license/hsaturn/TinyMqtt) ![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow) -ESP 8266 is a small and very capable Mqtt Broker and Client +ESP 8266 is a small, fast and capable Mqtt Broker and Client ## Features +- Very (very !!) fast broker I saw it re-sent 1000 topics per second for two + clients that had subscribed (payload ~15 bytes). No topic lost. + The max I've seen was 2k msg/s (1 client 1 subscription) - Act as as a mqtt broker and/or a mqtt client - Mqtt 3.1.1 / Qos 0 supported - Standalone (can work without WiFi) (degraded/local mode) @@ -28,9 +31,10 @@ no need for having tons of clients (also RAM is the problem with many clients) * ~~MqttClient does not support more than one subscription at time~~ * MqttClient auto re-subscribe (::resubscribe works bad on broker.emqx.io) * MqttClient auto reconnection -* MqttClient unsubscribe -* MqttClient does not callback payload... +* ~~MqttClient unsubscribe~~ +* MqttClient does not sent payload to callback... * MqttClient user/password +* Wildcards (I may implement only # as I'm not interrested by a clever and cpu consuming matching) ## Quickstart diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index b36dd4a..b08af3e 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -146,8 +146,10 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt for(auto client: clients) { i++; +#if TINY_MQTT_DEBUG Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; +#endif bool doit = false; if (broker && broker->connected()) // Broker is connected { @@ -166,7 +168,9 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt // All is allowed doit = true; } +#if TINY_MQTT_DEBUG Serial << ", doit=" << doit << ' '; +#endif if (doit) retval = client->publish(topic, msg); debug(""); @@ -248,7 +252,7 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos) msg.add(0); msg.add(0); - msg.add(topic.str()); + msg.add(topic); msg.add(qos); ret = msg.sendTo(this); @@ -257,15 +261,18 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos) return ret; } +long MqttClient::counter=0; + void MqttClient::processMessage() { - std::string error; - std::string s; + counter++; +#if TINY_MQTT_DEBUG if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) { Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; - message.hexdump("Incoming"); + // message.hexdump("Incoming"); } +#endif auto header = message.getVHeader(); const char* payload; uint16_t len; @@ -295,20 +302,6 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag // ClientId message.getString(payload, len); - debug("client id len=" << len); - if (len>60) - { - Serial << '('; - for(int i=0; i<30; i++) - { - if (i%5==0) Serial << ' '; - char c=*(header+i); - Serial << (c < 32 ? '.' : c); - } - Serial << " )" << endl; - debug("Bad client id length"); - break; - } clientId = std::string(payload, len); payload += len; @@ -437,7 +430,7 @@ if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessag if (bclose) { Serial << "*************** Error msg 0x" << _HEX(message.type()); - if (error.length()) Serial << ':' << error.c_str(); + message.hexdump("-------ERROR ------"); Serial << endl; close(); } @@ -477,10 +470,9 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg) debug("mqttclient publish " << subscriptions.size()); for(const auto& subscription: subscriptions) { - Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' '; if (subscription.matches(topic)) { - Serial << " match/send"; + debug(" match client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' '); if (client) { retval = msg.sendTo(this); @@ -490,7 +482,6 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg) callback(this, topic, nullptr, 0); // TODO Payload } } - Serial << endl; } return retval; } @@ -541,6 +532,7 @@ void MqttMessage::incoming(char in_byte) case Complete: default: Serial << "Spurious " << _HEX(in_byte) << endl; + hexdump("spurious"); reset(); break; } @@ -578,7 +570,7 @@ MqttError MqttMessage::sendTo(MqttClient* client) { debug("sending " << buffer.size() << " bytes"); encodeLength(&buffer[1], buffer.size()-2); - hexdump("snd"); + // hexdump("snd"); client->write(&buffer[0], buffer.size()); } else diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 022a553..17e68b3 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -5,11 +5,11 @@ #include "StringIndexer.h" #include -#define TINY_MQTT_DEBUG - -#ifdef TINY_MQTT_DEBUG +#if 0 #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } + #define TINY_MQTT_DEBUG 1 #else + #define TINY_MQTT_DEBUG 0 #define debug(what) {} #endif @@ -152,7 +152,6 @@ class MqttClient // TODO seems to be useless bool isLocal() const { return client == nullptr; } -#ifdef TINY_MQTT_DEBUG void dump() { uint32_t ms=millis(); @@ -160,9 +159,9 @@ class MqttClient << " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive; Serial << " cnx " << (client && client->connected()); - Serial << " ["; message.hexdump("entrant msg"); bool c=false; + Serial << " ["; for(auto s: subscriptions) { Serial << (c?", ": "")<< s.str().c_str(); @@ -172,9 +171,12 @@ class MqttClient Serial << "]" << endl; } -#endif + + static long counter; // Number of messages sent private: + void resubscribe(); + friend class MqttBroker; MqttClient(MqttBroker* parent, WiFiClient& client); // republish a received publish if topic matches any in subscriptions @@ -221,7 +223,6 @@ class MqttBroker void connect(std::string host, uint32_t port=1883); bool connected() const { return state == Connected; } -#ifdef TINY_MQTT_DEBUG void dump() { Serial << clients.size() << " client/s" << endl; @@ -231,7 +232,6 @@ class MqttBroker client->dump(); } } -#endif private: friend class MqttClient;