diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 9e164a7..9077ba9 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -59,7 +59,7 @@ void MqttClient::close(bool bSendDisconnect) { debug("close " << id().c_str()); mqtt_connected = false; - if (client) + if (client) // connected to a remote broker { if (bSendDisconnect and client->connected()) { @@ -72,10 +72,16 @@ void MqttClient::close(bool bSendDisconnect) if (parent) { parent->removeClient(this); - parent=nullptr; + parent = nullptr; } } +void MqttClient::connect(MqttBroker* parentBroker) +{ + close(); + parent = parentBroker; +} + void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) { debug("cnx: closing"); @@ -397,12 +403,13 @@ void MqttClient::processMessage(const MqttMessage* mesg) #ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { -#ifdef NOT_ESP_CORE - Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << " ESP.getFreeHeap() "<< endl; -#else - Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; -#endif + #ifdef NOT_ESP_CORE + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << " ESP.getFreeHeap() "<< endl; + #else + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; + #endif // mesg->hexdump("Incoming"); + mesg->hexdump("Incoming"); } #endif auto header = mesg->getVHeader(); @@ -544,6 +551,9 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T break; case MqttMessage::Type::Publish: + #ifdef TINY_MQTT_DEBUG + Serial << "publish " << mqtt_connected << '/' << (long) client << endl; + #endif if (mqtt_connected or client == nullptr) { uint8_t qos = mesg->type() & 0x6; @@ -558,8 +568,12 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T // TODO reset DUP // TODO reset RETAIN - if (client==nullptr) // internal MqttClient receives publish + if (parent==nullptr or client==nullptr) // internal MqttClient receives publish { + #ifdef TINY_MQTT_DEBUG + Serial << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n"; + Serial << "has " << (callback ? "" : "no ") << " callback.\n"; + #endif if (callback and isSubscribedTo(published)) { callback(this, published, payload, len); // TODO send the real payload @@ -613,7 +627,6 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa MqttMessage msg(MqttMessage::Publish); msg.add(topic); msg.add(payload, pay_length, false); - msg.complete(); if (parent) { return parent->publish(this, topic, msg); @@ -637,6 +650,10 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& else { processMessage(&msg); + + #ifdef TINY_MQTT_DEBUG + Serial << "Should call the callback ?\n"; + #endif // callback(this, topic, nullptr, 0); // TODO Payload } } @@ -714,7 +731,7 @@ void MqttMessage::incoming(char in_byte) reset(); break; } - if (buffer.length() > MaxBufferLength) // TODO magic 256 ? + if (buffer.length() > MaxBufferLength) { debug("Too long " << state); reset(); @@ -725,36 +742,47 @@ void MqttMessage::add(const char* p, size_t len, bool addLength) { if (addLength) { - buffer.reserve(buffer.length()+addLength+2); + buffer.reserve(buffer.length()+2); incoming(len>>8); incoming(len & 0xFF); } while(len--) incoming(*p++); } -void MqttMessage::encodeLength(char* msb, int length) const +void MqttMessage::encodeLength() const { - do + if (state != Complete) { - uint8_t encoded(length & 0x7F); - length >>=7; - if (length) encoded |= 0x80; - *msb++ = encoded; - } while (length); -}; + int length = buffer.size()-2; // 1 byte for header, 1 byte for pre-reserved length field. + std::string::size_type ins=1; + do + { + uint8_t encoded(length & 0x7F); + length >>=7; + if (length) encoded |= 0x80; -void MqttMessage::complete() -{ - encodeLength(&buffer[1], buffer.size()-2); + if (ins==1) + buffer[ins]=encoded; + else + buffer.insert(ins, 1, encoded); + // On pourrait optimiser, cet insert est couteux, il faudrait en fait non pas + // insérer, mais réserver 4 octets pour les remplir + // plus tard avec ke fixed header et la taille. + // Cela changerait en revanche le début du message qui ne serait plus + // buffer[0], mais buffer[0..3] selon la taille du message. + + ++ins; + } while (length); state = Complete; -} + } +}; MqttError MqttMessage::sendTo(MqttClient* client) const { if (buffer.size()) { debug("sending " << buffer.size() << " bytes"); - encodeLength(&buffer[1], buffer.size()-2); + encodeLength(); // hexdump("snd"); client->write(&buffer[0], buffer.size()); } diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index 27487dd..2a8d01a 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -101,8 +101,6 @@ class MqttMessage void add(const Topic& t) { add(t.str()); } const char* end() const { return &buffer[0]+buffer.size(); } const char* getVHeader() const { return &buffer[vheader]; } - uint16_t length() const { return buffer.size(); } - void complete(); void reset(); @@ -127,12 +125,12 @@ class MqttMessage void hexdump(const char* prefix=nullptr) const; private: - void encodeLength(char* msb, int length) const; + void encodeLength() const; mutable std::string buffer; // mutable -> sendTo() uint8_t vheader; uint16_t size; // bytes left to receive - State state; + mutable State state; // mutable -> encodeLength() }; class MqttBroker; @@ -172,7 +170,14 @@ class MqttClient /** Should be called in main loop() */ void loop(); void close(bool bSendDisconnect=true); - void setCallback(CallBack fun) {callback=fun; }; + void setCallback(CallBack fun) + { + callback=fun; + #ifdef TINY_MQTT_DEBUG + Serial << "Callback set to " << (long)fun << endl; + if (callback) callback(this, "test/topic", "value", 5); + #endif + }; // Publish from client to the world MqttError publish(const Topic&, const char* payload, size_t pay_length); @@ -214,6 +219,8 @@ class MqttClient static long counter; private: + + // event when tcp/ip link established (real or fake) static void onConnect(void * client_ptr, TcpClient*); #ifdef TCP_ASYNC static void onData(void* client_ptr, TcpClient*, void* data, size_t len); @@ -240,7 +247,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - TcpClient* client=nullptr; // connection to mqtt client or to remote broker + TcpClient* client=nullptr; // connection to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr;