Use TinyConsole instead of Serial
This commit is contained in:
123
src/TinyMqtt.cpp
123
src/TinyMqtt.cpp
@@ -1,6 +1,16 @@
|
||||
#include "TinyMqtt.h"
|
||||
#include <sstream>
|
||||
|
||||
static auto green = TinyConsole::green;
|
||||
static auto cyan = TinyConsole::cyan;
|
||||
static auto white = TinyConsole::white;
|
||||
static auto red = TinyConsole::red;
|
||||
static auto yellow = TinyConsole::yellow;
|
||||
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
int TinyMqtt::debug=2;
|
||||
#endif
|
||||
|
||||
#ifdef EPOXY_DUINO
|
||||
std::map<MqttMessage::Type, int> MqttClient::counters;
|
||||
#endif
|
||||
@@ -64,6 +74,7 @@ void MqttClient::close(bool bSendDisconnect)
|
||||
if (bSendDisconnect and client->connected())
|
||||
{
|
||||
message.create(MqttMessage::Type::Disconnect);
|
||||
message.hexdump("close");
|
||||
message.sendTo(this);
|
||||
}
|
||||
client->stop();
|
||||
@@ -78,13 +89,14 @@ void MqttClient::close(bool bSendDisconnect)
|
||||
|
||||
void MqttClient::connect(MqttBroker* parentBroker)
|
||||
{
|
||||
debug("MqttClient::connect_1");
|
||||
close();
|
||||
parent = parentBroker;
|
||||
}
|
||||
|
||||
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
||||
{
|
||||
debug("MqttClient::connect");
|
||||
debug("MqttClient::connect_3");
|
||||
keep_alive = ka;
|
||||
close();
|
||||
if (client) delete client;
|
||||
@@ -105,11 +117,13 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
||||
|
||||
void MqttBroker::addClient(MqttClient* client)
|
||||
{
|
||||
clients.push_back(client);
|
||||
debug("MqttBroker::addClient");
|
||||
clients.push_back(client);
|
||||
}
|
||||
|
||||
void MqttBroker::connect(const std::string& host, uint16_t port)
|
||||
{
|
||||
debug("MqttBroker::connect_2");
|
||||
if (broker == nullptr) broker = new MqttClient;
|
||||
broker->connect(host, port);
|
||||
broker->parent = this; // Because connect removed the link
|
||||
@@ -117,6 +131,7 @@ void MqttBroker::connect(const std::string& host, uint16_t port)
|
||||
|
||||
void MqttBroker::removeClient(MqttClient* remove)
|
||||
{
|
||||
debug("removeClient");
|
||||
for(auto it=clients.begin(); it!=clients.end(); it++)
|
||||
{
|
||||
auto client=*it;
|
||||
@@ -133,11 +148,12 @@ void MqttBroker::removeClient(MqttClient* remove)
|
||||
return;
|
||||
}
|
||||
}
|
||||
debug("Error cannot remove client"); // TODO should not occur
|
||||
debug(red << "Error cannot remove client"); // TODO should not occur
|
||||
}
|
||||
|
||||
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
||||
{
|
||||
debug("MqttBroker::onClient");
|
||||
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
||||
|
||||
broker->addClient(new MqttClient(broker, client));
|
||||
@@ -183,6 +199,7 @@ void MqttBroker::loop()
|
||||
|
||||
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
|
||||
{
|
||||
debug("MqttBroker::subscribe");
|
||||
if (broker && broker->connected())
|
||||
{
|
||||
return broker->subscribe(topic, qos);
|
||||
@@ -194,13 +211,13 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
|
||||
{
|
||||
MqttError retval = MqttOk;
|
||||
|
||||
debug("publish ");
|
||||
debug("MqttBroker::publish");
|
||||
int i=0;
|
||||
for(auto client: clients)
|
||||
{
|
||||
i++;
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
|
||||
Console << __LINE__ << " broker:" << (broker && broker->connected() ? "linked" : "alone") <<
|
||||
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
|
||||
#endif
|
||||
bool doit = false;
|
||||
@@ -221,7 +238,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
|
||||
doit = true;
|
||||
}
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << ", doit=" << doit << ' ';
|
||||
Console << ", doit=" << doit << ' ';
|
||||
#endif
|
||||
|
||||
if (doit) retval = client->publishIfSubscribed(topic, msg);
|
||||
@@ -248,6 +265,7 @@ void MqttMessage::getString(const char* &buff, uint16_t& len)
|
||||
|
||||
void MqttClient::clientAlive(uint32_t more_seconds)
|
||||
{
|
||||
debug("MqttClient::clientAlive");
|
||||
if (keep_alive)
|
||||
{
|
||||
#ifdef EPOXY_DUINO
|
||||
@@ -266,9 +284,9 @@ void MqttClient::loop()
|
||||
{
|
||||
if (parent)
|
||||
{
|
||||
debug("timeout client");
|
||||
debug(red << "timeout client");
|
||||
close();
|
||||
debug("closed");
|
||||
debug(red << "closed");
|
||||
}
|
||||
else if (client && client->connected())
|
||||
{
|
||||
@@ -297,7 +315,7 @@ void MqttClient::loop()
|
||||
void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
|
||||
{
|
||||
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
|
||||
debug("cnx: connecting");
|
||||
debug("MqttClient::onConnect");
|
||||
MqttMessage msg(MqttMessage::Type::Connect);
|
||||
msg.add("MQTT",4);
|
||||
msg.add(0x4); // Mqtt protocol version 3.1.1
|
||||
@@ -354,7 +372,7 @@ void MqttClient::resubscribe()
|
||||
|
||||
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
|
||||
{
|
||||
debug("subsribe(" << topic.c_str() << ")");
|
||||
debug("MqttClient::subsribe(" << topic.c_str() << ")");
|
||||
MqttError ret = MqttOk;
|
||||
|
||||
subscriptions.insert(topic);
|
||||
@@ -372,6 +390,7 @@ MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
|
||||
|
||||
MqttError MqttClient::unsubscribe(Topic topic)
|
||||
{
|
||||
debug("MqttClient::unsubscribe");
|
||||
auto it=subscriptions.find(topic);
|
||||
if (it != subscriptions.end())
|
||||
{
|
||||
@@ -386,6 +405,7 @@ MqttError MqttClient::unsubscribe(Topic topic)
|
||||
|
||||
MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos)
|
||||
{
|
||||
debug("MqttClient::sendTopic");
|
||||
MqttMessage msg(type, 2);
|
||||
|
||||
// TODO manage packet identifier
|
||||
@@ -402,16 +422,7 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint
|
||||
void MqttClient::processMessage(MqttMessage* mesg)
|
||||
{
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
|
||||
{
|
||||
#ifdef NOT_ESP_CORE
|
||||
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << " ESP.getFreeHeap() "<< endl;
|
||||
#else
|
||||
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
|
||||
#endif
|
||||
// mesg->hexdump("Incoming");
|
||||
mesg->hexdump("Incoming");
|
||||
}
|
||||
#endif
|
||||
auto header = mesg->getVHeader();
|
||||
const char* payload;
|
||||
@@ -472,7 +483,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
||||
}
|
||||
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl;
|
||||
Console << yellow << "Client connected :" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << white << endl;
|
||||
#endif
|
||||
bclose = false;
|
||||
mqtt_connected=true;
|
||||
@@ -507,12 +518,13 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
||||
if (client)
|
||||
{
|
||||
uint16_t pingreq = MqttMessage::Type::PingResp;
|
||||
debug(cyan << "Ping response to client ");
|
||||
client->write((const char*)(&pingreq), 2);
|
||||
bclose = false;
|
||||
}
|
||||
else
|
||||
{
|
||||
debug("internal pingreq ?");
|
||||
debug(red << "internal pingreq ?");
|
||||
}
|
||||
break;
|
||||
|
||||
@@ -569,7 +581,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
||||
|
||||
case MqttMessage::Type::Publish:
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "publish " << mqtt_connected << '/' << (long) client << endl;
|
||||
Console << "publish " << mqtt_connected << '/' << (long) client << endl;
|
||||
#endif
|
||||
if (mqtt_connected or client == nullptr)
|
||||
{
|
||||
@@ -578,7 +590,9 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
||||
mesg->getString(payload, len);
|
||||
Topic published(payload, len);
|
||||
payload += len;
|
||||
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Console << "Received Publish (" << published.str().c_str() << ") size=" << (int)len << endl;
|
||||
#endif
|
||||
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
|
||||
if (qos) payload+=2; // ignore packet identifier if any
|
||||
len=mesg->end()-payload;
|
||||
@@ -588,8 +602,8 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
||||
if (parent==nullptr or client==nullptr) // internal MqttClient receives publish
|
||||
{
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n";
|
||||
Serial << "has " << (callback ? "" : "no ") << " callback.\n";
|
||||
Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n";
|
||||
Console << "has " << (callback ? "" : "no ") << " callback.\n";
|
||||
#endif
|
||||
if (callback and isSubscribedTo(published))
|
||||
{
|
||||
@@ -620,10 +634,10 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
||||
if (bclose)
|
||||
{
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "*************** Error msg 0x" << _HEX(mesg->type());
|
||||
Console << red << "*************** Error msg 0x" << _HEX(mesg->type());
|
||||
mesg->hexdump("-------ERROR ------");
|
||||
dump();
|
||||
Serial << endl;
|
||||
Console << white << endl;
|
||||
#endif
|
||||
close();
|
||||
}
|
||||
@@ -719,7 +733,7 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg)
|
||||
{
|
||||
MqttError retval=MqttOk;
|
||||
|
||||
debug("mqttclient publish " << subscriptions.size());
|
||||
debug("mqttclient publishIfSubscribed " << topic.c_str() << ' ' << subscriptions.size());
|
||||
if (isSubscribedTo(topic))
|
||||
{
|
||||
if (client)
|
||||
@@ -729,7 +743,7 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg)
|
||||
processMessage(&msg);
|
||||
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "Should call the callback ?\n";
|
||||
Console << "Should call the callback ?\n";
|
||||
#endif
|
||||
// callback(this, topic, nullptr, 0); // TODO Payload
|
||||
}
|
||||
@@ -798,7 +812,7 @@ void MqttMessage::incoming(char in_byte)
|
||||
case Complete:
|
||||
default:
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "Spurious " << _HEX(in_byte) << endl;
|
||||
Console << red << "Spurious " << _HEX(in_byte) << white << endl;
|
||||
hexdump("spurious");
|
||||
#endif
|
||||
reset();
|
||||
@@ -824,6 +838,7 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
|
||||
|
||||
void MqttMessage::encodeLength()
|
||||
{
|
||||
debug("encodingLength");
|
||||
if (state != Complete)
|
||||
{
|
||||
int length = buffer.size()-3; // 3 = 1 byte for header + 2 bytes for pre-reserved length field.
|
||||
@@ -849,14 +864,14 @@ MqttError MqttMessage::sendTo(MqttClient* client)
|
||||
{
|
||||
if (buffer.size())
|
||||
{
|
||||
debug("sending " << buffer.size() << " bytes");
|
||||
debug(cyan << "sending " << buffer.size() << " bytes to " << client->id());
|
||||
encodeLength();
|
||||
// hexdump("snd");
|
||||
hexdump("Sending ");
|
||||
client->write(&buffer[0], buffer.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
debug("??? Invalid send");
|
||||
debug(red << "??? Invalid send");
|
||||
return MqttInvalidMessage;
|
||||
}
|
||||
return MqttOk;
|
||||
@@ -866,6 +881,32 @@ void MqttMessage::hexdump(const char* prefix) const
|
||||
{
|
||||
(void)prefix;
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
if (TinyMqtt::debug<2) return;
|
||||
static std::map<Type, std::string> tts={
|
||||
{ Connect, "Connect" },
|
||||
{ ConnAck, "Connack" },
|
||||
{ Publish, "Publish" },
|
||||
{ PubAck, "Puback" },
|
||||
{ Subscribe, "Subscribe" },
|
||||
{ SubAck, "Suback" },
|
||||
{ UnSubscribe, "Unsubscribe" },
|
||||
{ UnSuback, "Unsuback" },
|
||||
{ PingReq, "Pingreq" },
|
||||
{ PingResp, "Pingresp" },
|
||||
{ Disconnect, "Disconnect" }
|
||||
};
|
||||
std::string t("Unknown");
|
||||
Type typ=static_cast<Type>(buffer[0] & 0xF0);
|
||||
if (tts.find(typ) != tts.end())
|
||||
t=tts[typ];
|
||||
Console.fg(cyan);
|
||||
#ifdef NOT_ESP_CORE
|
||||
Console << "---> MESSAGE " << t << ' ' << _HEX(typ) << ' ' << (Complete ? "complete" : "uncomplete") << " mem=???" << endl;
|
||||
#else
|
||||
Console << "---> MESSAGE " << t << ' ' << _HEX(typ) << ' ' << (Complete ? "complete" : "uncomplete") << " mem=" << ESP.getFreeHeap() << endl;
|
||||
#endif
|
||||
Console.fg(white);
|
||||
|
||||
uint16_t addr=0;
|
||||
const int bytes_per_row = 8;
|
||||
const char* hex_to_str = " | ";
|
||||
@@ -873,19 +914,19 @@ void MqttMessage::hexdump(const char* prefix) const
|
||||
const char* half_sep = " - ";
|
||||
std::string ascii;
|
||||
|
||||
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
|
||||
Console << prefix << " size(" << buffer.size() << "), state=" << state << endl;
|
||||
|
||||
for(const char chr: buffer)
|
||||
{
|
||||
if ((addr % bytes_per_row) == 0)
|
||||
{
|
||||
if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
|
||||
if (prefix) Serial << prefix << separator;
|
||||
if (ascii.length()) Console << hex_to_str << ascii << separator << endl;
|
||||
if (prefix) Console << prefix << separator;
|
||||
ascii.clear();
|
||||
}
|
||||
addr++;
|
||||
if (chr<16) Serial << '0';
|
||||
Serial << _HEX(chr) << ' ';
|
||||
if (chr<16) Console << '0';
|
||||
Console << _HEX(chr) << ' ';
|
||||
|
||||
ascii += (chr<32 ? '.' : chr);
|
||||
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
|
||||
@@ -894,12 +935,12 @@ void MqttMessage::hexdump(const char* prefix) const
|
||||
{
|
||||
while(ascii.length() < bytes_per_row+strlen(half_sep))
|
||||
{
|
||||
Serial << " "; // spaces per hexa byte
|
||||
Console << " "; // spaces per hexa byte
|
||||
ascii += ' ';
|
||||
}
|
||||
Serial << hex_to_str << ascii << separator;
|
||||
Console << hex_to_str << ascii << separator;
|
||||
}
|
||||
|
||||
Serial << endl;
|
||||
Console << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -34,10 +34,14 @@
|
||||
#include "StringIndexer.h"
|
||||
#include <MqttStreaming.h>
|
||||
|
||||
// #define TINY_MQTT_DEBUG
|
||||
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
#define debug(what) { Serial << (int)__LINE__ << ' ' << what << endl; delay(100); }
|
||||
#include <TinyConsole.h> // https://github.com/hsaturn/TinyConsole
|
||||
struct TinyMqtt
|
||||
{
|
||||
static int debug;
|
||||
};
|
||||
|
||||
#define debug(what) { if (TinyMqtt::debug>=1) Console << (int)__LINE__ << ' ' << what << TinyConsole::white << endl; delay(100); }
|
||||
#else
|
||||
#define debug(what) {}
|
||||
#endif
|
||||
@@ -176,7 +180,9 @@ class MqttClient
|
||||
(parent!=nullptr and client==nullptr) or
|
||||
(client and client->connected()); }
|
||||
void write(const char* buf, size_t length)
|
||||
{ if (client) client->write(buf, length); }
|
||||
{
|
||||
if (client) client->write(buf, length);
|
||||
}
|
||||
|
||||
const std::string& id() const { return clientId; }
|
||||
void id(std::string& new_id) { clientId = new_id; }
|
||||
@@ -188,7 +194,7 @@ class MqttClient
|
||||
{
|
||||
callback=fun;
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
Serial << "Callback set to " << (long)fun << endl;
|
||||
Console << TinyConsole::magenta << "Callback set to " << (long)fun << TinyConsole::white << endl;
|
||||
if (callback) callback(this, "test/topic", "value", 5);
|
||||
#endif
|
||||
};
|
||||
@@ -213,22 +219,29 @@ class MqttClient
|
||||
(void)indent;
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
uint32_t ms=millis();
|
||||
Serial << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF");
|
||||
Serial << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive << ' ';
|
||||
Serial << (client && client->connected() ? "" : "dis") << "connected";
|
||||
Console << indent << "+-- " << '\'' << clientId.c_str() << "' " << (connected() ? " ON " : " OFF");
|
||||
Console << ", alive=" << alive << '/' << ms << ", ka=" << keep_alive << ' ';
|
||||
if (client)
|
||||
{
|
||||
if (client->connected())
|
||||
Console << TinyConsole::green << "connected";
|
||||
else
|
||||
Console << TinyConsole::red << "disconnected";
|
||||
Console << TinyConsole::white;
|
||||
}
|
||||
if (subscriptions.size())
|
||||
{
|
||||
bool c = false;
|
||||
Serial << " [";
|
||||
Console << " [";
|
||||
for(auto s: subscriptions)
|
||||
{
|
||||
if (c) Serial << ", ";
|
||||
Serial << s.str().c_str();
|
||||
if (c) Console << ", ";
|
||||
Console << s.str().c_str();
|
||||
c=true;
|
||||
}
|
||||
Serial << ']';
|
||||
Console << ']';
|
||||
}
|
||||
Serial << endl;
|
||||
Console << TinyConsole::erase_to_end << endl;
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -256,7 +269,7 @@ class MqttClient
|
||||
|
||||
bool mqtt_connected = false;
|
||||
char mqtt_flags;
|
||||
uint32_t keep_alive = 60;
|
||||
uint32_t keep_alive = 30;
|
||||
uint32_t alive;
|
||||
MqttMessage message;
|
||||
|
||||
@@ -319,7 +332,7 @@ class MqttBroker
|
||||
|
||||
bool compareString(const char* good, const char* str, uint8_t str_len) const;
|
||||
std::vector<MqttClient*> clients;
|
||||
TcpServer* server;
|
||||
TcpServer* server = nullptr;
|
||||
|
||||
const char* auth_user = "guest";
|
||||
const char* auth_password = "guest";
|
||||
|
||||
Reference in New Issue
Block a user