Fix issue_2 broken payload

This commit is contained in:
hsaturn
2021-09-17 19:51:32 +02:00
parent 36dde2c063
commit d39c58d8f5
2 changed files with 65 additions and 30 deletions

View File

@@ -59,7 +59,7 @@ void MqttClient::close(bool bSendDisconnect)
{ {
debug("close " << id().c_str()); debug("close " << id().c_str());
mqtt_connected = false; mqtt_connected = false;
if (client) if (client) // connected to a remote broker
{ {
if (bSendDisconnect and client->connected()) if (bSendDisconnect and client->connected())
{ {
@@ -76,6 +76,12 @@ void MqttClient::close(bool bSendDisconnect)
} }
} }
void MqttClient::connect(MqttBroker* parentBroker)
{
close();
parent = parentBroker;
}
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{ {
debug("cnx: closing"); debug("cnx: closing");
@@ -403,6 +409,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
#endif #endif
// mesg->hexdump("Incoming"); // mesg->hexdump("Incoming");
mesg->hexdump("Incoming");
} }
#endif #endif
auto header = mesg->getVHeader(); auto header = mesg->getVHeader();
@@ -544,6 +551,9 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
#ifdef TINY_MQTT_DEBUG
Serial << "publish " << mqtt_connected << '/' << (long) client << endl;
#endif
if (mqtt_connected or client == nullptr) if (mqtt_connected or client == nullptr)
{ {
uint8_t qos = mesg->type() & 0x6; uint8_t qos = mesg->type() & 0x6;
@@ -558,8 +568,12 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
if (client==nullptr) // internal MqttClient receives publish if (parent==nullptr or client==nullptr) // internal MqttClient receives publish
{ {
#ifdef TINY_MQTT_DEBUG
Serial << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n";
Serial << "has " << (callback ? "" : "no ") << " callback.\n";
#endif
if (callback and isSubscribedTo(published)) if (callback and isSubscribedTo(published))
{ {
callback(this, published, payload, len); // TODO send the real payload callback(this, published, payload, len); // TODO send the real payload
@@ -613,7 +627,6 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
MqttMessage msg(MqttMessage::Publish); MqttMessage msg(MqttMessage::Publish);
msg.add(topic); msg.add(topic);
msg.add(payload, pay_length, false); msg.add(payload, pay_length, false);
msg.complete();
if (parent) if (parent)
{ {
return parent->publish(this, topic, msg); return parent->publish(this, topic, msg);
@@ -637,6 +650,10 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage&
else else
{ {
processMessage(&msg); processMessage(&msg);
#ifdef TINY_MQTT_DEBUG
Serial << "Should call the callback ?\n";
#endif
// callback(this, topic, nullptr, 0); // TODO Payload // callback(this, topic, nullptr, 0); // TODO Payload
} }
} }
@@ -714,7 +731,7 @@ void MqttMessage::incoming(char in_byte)
reset(); reset();
break; break;
} }
if (buffer.length() > MaxBufferLength) // TODO magic 256 ? if (buffer.length() > MaxBufferLength)
{ {
debug("Too long " << state); debug("Too long " << state);
reset(); reset();
@@ -725,36 +742,47 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
{ {
if (addLength) if (addLength)
{ {
buffer.reserve(buffer.length()+addLength+2); buffer.reserve(buffer.length()+2);
incoming(len>>8); incoming(len>>8);
incoming(len & 0xFF); incoming(len & 0xFF);
} }
while(len--) incoming(*p++); while(len--) incoming(*p++);
} }
void MqttMessage::encodeLength(char* msb, int length) const void MqttMessage::encodeLength() const
{ {
if (state != Complete)
{
int length = buffer.size()-2; // 1 byte for header, 1 byte for pre-reserved length field.
std::string::size_type ins=1;
do do
{ {
uint8_t encoded(length & 0x7F); uint8_t encoded(length & 0x7F);
length >>=7; length >>=7;
if (length) encoded |= 0x80; if (length) encoded |= 0x80;
*msb++ = encoded;
} while (length);
};
void MqttMessage::complete() if (ins==1)
{ buffer[ins]=encoded;
encodeLength(&buffer[1], buffer.size()-2); else
buffer.insert(ins, 1, encoded);
// On pourrait optimiser, cet insert est couteux, il faudrait en fait non pas
// insérer, mais réserver 4 octets pour les remplir
// plus tard avec ke fixed header et la taille.
// Cela changerait en revanche le début du message qui ne serait plus
// buffer[0], mais buffer[0..3] selon la taille du message.
++ins;
} while (length);
state = Complete; state = Complete;
} }
};
MqttError MqttMessage::sendTo(MqttClient* client) const MqttError MqttMessage::sendTo(MqttClient* client) const
{ {
if (buffer.size()) if (buffer.size())
{ {
debug("sending " << buffer.size() << " bytes"); debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2); encodeLength();
// hexdump("snd"); // hexdump("snd");
client->write(&buffer[0], buffer.size()); client->write(&buffer[0], buffer.size());
} }

View File

@@ -101,8 +101,6 @@ class MqttMessage
void add(const Topic& t) { add(t.str()); } void add(const Topic& t) { add(t.str()); }
const char* end() const { return &buffer[0]+buffer.size(); } const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; } const char* getVHeader() const { return &buffer[vheader]; }
uint16_t length() const { return buffer.size(); }
void complete();
void reset(); void reset();
@@ -127,12 +125,12 @@ class MqttMessage
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
void encodeLength(char* msb, int length) const; void encodeLength() const;
mutable std::string buffer; // mutable -> sendTo() mutable std::string buffer; // mutable -> sendTo()
uint8_t vheader; uint8_t vheader;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
State state; mutable State state; // mutable -> encodeLength()
}; };
class MqttBroker; class MqttBroker;
@@ -172,7 +170,14 @@ class MqttClient
/** Should be called in main loop() */ /** Should be called in main loop() */
void loop(); void loop();
void close(bool bSendDisconnect=true); void close(bool bSendDisconnect=true);
void setCallback(CallBack fun) {callback=fun; }; void setCallback(CallBack fun)
{
callback=fun;
#ifdef TINY_MQTT_DEBUG
Serial << "Callback set to " << (long)fun << endl;
if (callback) callback(this, "test/topic", "value", 5);
#endif
};
// Publish from client to the world // Publish from client to the world
MqttError publish(const Topic&, const char* payload, size_t pay_length); MqttError publish(const Topic&, const char* payload, size_t pay_length);
@@ -214,6 +219,8 @@ class MqttClient
static long counter; static long counter;
private: private:
// event when tcp/ip link established (real or fake)
static void onConnect(void * client_ptr, TcpClient*); static void onConnect(void * client_ptr, TcpClient*);
#ifdef TCP_ASYNC #ifdef TCP_ASYNC
static void onData(void* client_ptr, TcpClient*, void* data, size_t len); static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
@@ -240,7 +247,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here) // (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
TcpClient* client=nullptr; // connection to mqtt client or to remote broker TcpClient* client=nullptr; // connection to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;