Fix issue_2 : Broken large payloads

This commit is contained in:
hsaturn
2021-09-17 22:32:00 +02:00
parent d39c58d8f5
commit a6b3540cb8
7 changed files with 59 additions and 55 deletions

View File

@@ -38,7 +38,7 @@ MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
#else
client = new WiFiClient(*new_client);
#endif
alive = millis()+5000; // client expires after 5s if no CONNECT msg
alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg
}
MqttClient::MqttClient(MqttBroker* parent, const std::string& id)
@@ -190,7 +190,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
return MqttNowhereToSend;
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const
{
MqttError retval = MqttOk;
@@ -397,7 +397,7 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint
long MqttClient::counter=0;
void MqttClient::processMessage(const MqttMessage* mesg)
void MqttClient::processMessage(MqttMessage* mesg)
{
counter++;
#ifdef TINY_MQTT_DEBUG
@@ -627,6 +627,8 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
MqttMessage msg(MqttMessage::Publish);
msg.add(topic);
msg.add(payload, pay_length, false);
msg.complete();
if (parent)
{
return parent->publish(this, topic, msg);
@@ -638,7 +640,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
}
// republish a received publish if it matches any in subscriptions
MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage& msg)
MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg)
{
MqttError retval=MqttOk;
@@ -682,29 +684,23 @@ void MqttMessage::incoming(char in_byte)
switch(state)
{
case FixedHeader:
size=0;
size=MaxBufferLength;
state = Length;
break;
case Length:
if (size==0)
if (size==MaxBufferLength)
size = in_byte & 0x7F;
else if (size<128)
size += static_cast<uint16_t>(in_byte & 0x7F)<<7;
else
state = Error; // Really don't want to handle msg with length > 16k
size += static_cast<uint16_t>(in_byte & 0x7F)<<7;
if (size > MaxBufferLength)
{
state = Error;
}
else if ((in_byte & 0x80) == 0)
{
vheader = buffer.length();
if (size==0)
state = Complete;
else if (size > 500) // TODO magic
{
state = Error;
}
else
{
buffer.reserve(size);
@@ -749,35 +745,21 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
while(len--) incoming(*p++);
}
void MqttMessage::encodeLength() const
void MqttMessage::encodeLength()
{
if (state != Complete)
{
int length = buffer.size()-2; // 1 byte for header, 1 byte for pre-reserved length field.
std::string::size_type ins=1;
do
{
uint8_t encoded(length & 0x7F);
length >>=7;
if (length) encoded |= 0x80;
if (ins==1)
buffer[ins]=encoded;
else
buffer.insert(ins, 1, encoded);
// On pourrait optimiser, cet insert est couteux, il faudrait en fait non pas
// insérer, mais réserver 4 octets pour les remplir
// plus tard avec ke fixed header et la taille.
// Cela changerait en revanche le début du message qui ne serait plus
// buffer[0], mais buffer[0..3] selon la taille du message.
++ins;
} while (length);
state = Complete;
}
int length = buffer.size()-3; // 3 = 1 byte for header + 2 bytes for pre-reserved length field.
buffer[1] = 0x80 | (length & 0x7F);
buffer[2] = (length >> 7);
vheader = 3;
// We could check that buffer[2] < 128 (end of length encoding)
state = Complete;
}
};
MqttError MqttMessage::sendTo(MqttClient* client) const
MqttError MqttMessage::sendTo(MqttClient* client)
{
if (buffer.size())
{

View File

@@ -64,7 +64,7 @@ class Topic : public IndexedString
class MqttClient;
class MqttMessage
{
const uint16_t MaxBufferLength = 4096; //hard limit: 16k
const uint16_t MaxBufferLength = 4096; //hard limit: 16k due to size decoding
public:
enum Type
{
@@ -101,6 +101,7 @@ class MqttMessage
void add(const Topic& t) { add(t.str()); }
const char* end() const { return &buffer[0]+buffer.size(); }
const char* getVHeader() const { return &buffer[vheader]; }
void complete() { encodeLength(); }
void reset();
@@ -116,21 +117,22 @@ class MqttMessage
void create(Type type)
{
buffer=(char)type;
buffer+='\0'; // reserved for msg length
vheader=2;
buffer+='\0'; // reserved for msg length byte 1/2
buffer+='\0'; // reserved for msg length byte 2/2 (fixed)
vheader=3; // Should never change
size=0;
state=Create;
}
MqttError sendTo(MqttClient*) const;
MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const;
private:
void encodeLength() const;
void encodeLength();
mutable std::string buffer; // mutable -> sendTo()
std::string buffer;
uint8_t vheader;
uint16_t size; // bytes left to receive
mutable State state; // mutable -> encodeLength()
State state;
};
class MqttBroker;
@@ -215,8 +217,7 @@ class MqttClient
Serial << endl;
}
/** Count the number of messages that have been sent **/
static long counter;
static long counter; // Number of processed messages
private:
@@ -231,10 +232,10 @@ class MqttClient
friend class MqttBroker;
MqttClient(MqttBroker* parent, TcpClient* client);
// republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
MqttError publishIfSubscribed(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds);
void processMessage(const MqttMessage* message);
void processMessage(MqttMessage* message);
bool mqtt_connected = false;
char mqtt_flags;
@@ -291,7 +292,7 @@ class MqttBroker
{ return compareString(auth_password, password, len); }
MqttError publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const;
MqttError publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const;
MqttError subscribe(const Topic& topic, uint8_t qos);