From 4020393f90a7028ac349ef3cc37d0e7e3b9108cf Mon Sep 17 00:00:00 2001 From: hsaturn Date: Wed, 24 Mar 2021 01:30:56 +0100 Subject: [PATCH] MqttClient can subscribe and receive publishes from distant broker --- src/TinyMqtt.cpp | 76 ++++++++++++++++++++++++++++++++++++++++-------- src/TinyMqtt.h | 6 ++++ 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 404b3a6..f9ef984 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -81,7 +81,9 @@ void MqttClient::connect(std::string broker, uint16_t port) message.add(clientId); debug("cnx: mqtt connecting"); message.sendTo(this); + message.reset(); debug("cnx: mqtt sent " << (int32_t)parent); + clientAlive(0); } } @@ -211,6 +213,7 @@ void MqttClient::loop() } else if (client && client->connected()) { + debug("pingreq"); uint16_t pingreq = MqttMessage::Type::PingReq; client->write((uint8_t*)(&pingreq), 2); clientAlive(0); @@ -242,7 +245,9 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos) debug("remote subscribe"); MqttMessage msg(MqttMessage::Type::Subscribe, 2); - // TODO Qos > 0 needs a packet identifier + // TODO manage packet identifier + msg.add(0); + msg.add(0); msg.add(topic.str()); msg.add(qos); @@ -257,7 +262,11 @@ void MqttClient::processMessage() { std::string error; std::string s; - // Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; +if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp) +{ + Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; + message.hexdump("Incoming"); +} auto header = message.getVHeader(); const char* payload; uint16_t len; @@ -338,6 +347,18 @@ void MqttClient::processMessage() message.sendTo(this); break; + case MqttMessage::Type::Connack: + // if (!mqtt_connected) break; + // TODO what more on connack ? + mqtt_connected = true; + bclose = false; + break; + + case MqttMessage::Type::PingResp: + // TODO: no PingResp is suspicious (server dead) + bclose = false; + break; + case MqttMessage::Type::PingReq: if (!mqtt_connected) break; if (client) @@ -390,7 +411,10 @@ void MqttClient::processMessage() // TODO reset DUP // TODO reset RETAIN debug("publishing to parent"); - parent->publish(this, published, message); + if (parent) + parent->publish(this, published, message); + else if (callback && subscriptions.find(published)!=subscriptions.end()) + callback(this, published, nullptr, 0); // TODO send the real payload // TODO should send PUBACK bclose = false; } @@ -414,7 +438,7 @@ void MqttClient::processMessage() } else { - clientAlive(5); + clientAlive(parent ? 5 : 0); } message.reset(); } @@ -548,13 +572,14 @@ MqttError MqttMessage::sendTo(MqttClient* client) { if (buffer.size()>2) { + debug("sending " << buffer.size() << " bytes"); encodeLength(&buffer[1], buffer.size()-2); - // hexdump("snd"); + hexdump("snd"); client->write(&buffer[0], buffer.size()); } else { - Serial << "??? Invalid send" << endl; + debug("??? Invalid send"); return MqttInvalidMessage; } return MqttOk; @@ -562,12 +587,39 @@ MqttError MqttMessage::sendTo(MqttClient* client) void MqttMessage::hexdump(const char* prefix) const { - if (prefix) Serial << prefix << ' '; - Serial << "size(" << buffer.size() << ") : "; - for(const char chr: buffer) + uint16_t addr=0; + const int bytes_per_row = 8; + const char* hex_to_str = " | "; + const char* separator = hex_to_str; + const char* half_sep = " - "; + std::string ascii; + + Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl; + + for(const char chr: buffer) + { + if ((addr % bytes_per_row) == 0) { - if (chr<16) Serial << '0'; - Serial << _HEX(chr) << ' '; + if (ascii.length()) Serial << hex_to_str << ascii << separator << endl; + if (prefix) Serial << prefix << separator; + ascii.clear(); } - Serial << endl; + addr++; + if (chr<16) Serial << '0'; + Serial << _HEX(chr) << ' '; + + ascii += (chr<32 ? '.' : chr); + if (ascii.length() == (bytes_per_row/2)) ascii += half_sep; + } + if (ascii.length()) + { + while(ascii.length() < bytes_per_row+strlen(half_sep)) + { + Serial << " "; // spaces per hexa byte + ascii += ' '; + } + Serial << hex_to_str << ascii << separator; + } + + Serial << endl; } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 3d8aad4..7990b88 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -154,15 +154,21 @@ class MqttClient #ifdef TINY_MQTT_DEBUG void dump() { + uint32_t ms=millis(); Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent << " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); + Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive; + Serial << " cnx " << (client && client->connected()); Serial << " ["; + message.hexdump("entrant msg"); bool c=false; for(auto s: subscriptions) { Serial << (c?", ": "")<< s.str().c_str(); c=true; } + + Serial << "]" << endl; } #endif