MqttClient can subscribe and receive publishes from distant broker

This commit is contained in:
hsaturn
2021-03-24 01:30:56 +01:00
parent 7b20e7deb5
commit 4020393f90
2 changed files with 70 additions and 12 deletions

View File

@@ -81,7 +81,9 @@ void MqttClient::connect(std::string broker, uint16_t port)
message.add(clientId); message.add(clientId);
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
message.sendTo(this); message.sendTo(this);
message.reset();
debug("cnx: mqtt sent " << (int32_t)parent); debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0); clientAlive(0);
} }
} }
@@ -211,6 +213,7 @@ void MqttClient::loop()
} }
else if (client && client->connected()) else if (client && client->connected())
{ {
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2); client->write((uint8_t*)(&pingreq), 2);
clientAlive(0); clientAlive(0);
@@ -242,7 +245,9 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
debug("remote subscribe"); debug("remote subscribe");
MqttMessage msg(MqttMessage::Type::Subscribe, 2); MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO Qos > 0 needs a packet identifier // TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic.str()); msg.add(topic.str());
msg.add(qos); msg.add(qos);
@@ -257,7 +262,11 @@ void MqttClient::processMessage()
{ {
std::string error; std::string error;
std::string s; std::string s;
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
message.hexdump("Incoming");
}
auto header = message.getVHeader(); auto header = message.getVHeader();
const char* payload; const char* payload;
uint16_t len; uint16_t len;
@@ -338,6 +347,18 @@ void MqttClient::processMessage()
message.sendTo(this); message.sendTo(this);
break; break;
case MqttMessage::Type::Connack:
// if (!mqtt_connected) break;
// TODO what more on connack ?
mqtt_connected = true;
bclose = false;
break;
case MqttMessage::Type::PingResp:
// TODO: no PingResp is suspicious (server dead)
bclose = false;
break;
case MqttMessage::Type::PingReq: case MqttMessage::Type::PingReq:
if (!mqtt_connected) break; if (!mqtt_connected) break;
if (client) if (client)
@@ -390,7 +411,10 @@ void MqttClient::processMessage()
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
debug("publishing to parent"); debug("publishing to parent");
parent->publish(this, published, message); if (parent)
parent->publish(this, published, message);
else if (callback && subscriptions.find(published)!=subscriptions.end())
callback(this, published, nullptr, 0); // TODO send the real payload
// TODO should send PUBACK // TODO should send PUBACK
bclose = false; bclose = false;
} }
@@ -414,7 +438,7 @@ void MqttClient::processMessage()
} }
else else
{ {
clientAlive(5); clientAlive(parent ? 5 : 0);
} }
message.reset(); message.reset();
} }
@@ -548,13 +572,14 @@ MqttError MqttMessage::sendTo(MqttClient* client)
{ {
if (buffer.size()>2) if (buffer.size()>2)
{ {
debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2); encodeLength(&buffer[1], buffer.size()-2);
// hexdump("snd"); hexdump("snd");
client->write(&buffer[0], buffer.size()); client->write(&buffer[0], buffer.size());
} }
else else
{ {
Serial << "??? Invalid send" << endl; debug("??? Invalid send");
return MqttInvalidMessage; return MqttInvalidMessage;
} }
return MqttOk; return MqttOk;
@@ -562,12 +587,39 @@ MqttError MqttMessage::sendTo(MqttClient* client)
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const
{ {
if (prefix) Serial << prefix << ' '; uint16_t addr=0;
Serial << "size(" << buffer.size() << ") : "; const int bytes_per_row = 8;
for(const char chr: buffer) const char* hex_to_str = " | ";
const char* separator = hex_to_str;
const char* half_sep = " - ";
std::string ascii;
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
for(const char chr: buffer)
{
if ((addr % bytes_per_row) == 0)
{ {
if (chr<16) Serial << '0'; if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
Serial << _HEX(chr) << ' '; if (prefix) Serial << prefix << separator;
ascii.clear();
} }
Serial << endl; addr++;
if (chr<16) Serial << '0';
Serial << _HEX(chr) << ' ';
ascii += (chr<32 ? '.' : chr);
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
}
if (ascii.length())
{
while(ascii.length() < bytes_per_row+strlen(half_sep))
{
Serial << " "; // spaces per hexa byte
ascii += ' ';
}
Serial << hex_to_str << ascii << separator;
}
Serial << endl;
} }

View File

@@ -154,15 +154,21 @@ class MqttClient
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); << " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << " cnx " << (client && client->connected());
Serial << " ["; Serial << " [";
message.hexdump("entrant msg");
bool c=false; bool c=false;
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
c=true; c=true;
} }
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif #endif