947 lines
22 KiB
C++
947 lines
22 KiB
C++
// vim: ts=2 sw=2 expandtab
|
|
#include "TinyMqtt.h"
|
|
#include <sstream>
|
|
|
|
#ifdef TINY_MQTT_DEBUG
|
|
static auto cyan = TinyConsole::cyan;
|
|
static auto white = TinyConsole::white;
|
|
static auto red = TinyConsole::red;
|
|
static auto yellow = TinyConsole::yellow;
|
|
|
|
int TinyMqtt::debug=2;
|
|
#endif
|
|
|
|
#ifdef EPOXY_DUINO
|
|
std::map<MqttMessage::Type, int> MqttClient::counters;
|
|
#endif
|
|
|
|
MqttBroker::MqttBroker(uint16_t port)
|
|
{
|
|
server = new TcpServer(port);
|
|
#ifdef TINY_MQTT_ASYNC
|
|
server->onClient(onClient, this);
|
|
#endif
|
|
}
|
|
|
|
MqttBroker::~MqttBroker()
|
|
{
|
|
while(clients.size())
|
|
{
|
|
delete clients[0];
|
|
}
|
|
delete server;
|
|
}
|
|
|
|
// private constructor used by broker only
|
|
MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client)
|
|
: local_broker(local_broker)
|
|
{
|
|
#ifdef TINY_MQTT_ASYNC
|
|
client = new_client;
|
|
client->onData(onData, this);
|
|
// client->onConnect() TODO
|
|
// client->onDisconnect() TODO
|
|
#else
|
|
client = new WiFiClient(*new_client);
|
|
#endif
|
|
#ifdef EPOXY_DUINO
|
|
alive = millis()+500000;
|
|
#else
|
|
alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg
|
|
#endif
|
|
}
|
|
|
|
MqttClient::MqttClient(MqttBroker* local_broker, const std::string& id)
|
|
: local_broker(local_broker), clientId(id)
|
|
{
|
|
client = nullptr;
|
|
|
|
if (local_broker) local_broker->addClient(this);
|
|
}
|
|
|
|
MqttClient::~MqttClient()
|
|
{
|
|
close();
|
|
delete client;
|
|
}
|
|
|
|
void MqttClient::close(bool bSendDisconnect)
|
|
{
|
|
debug("close " << id().c_str());
|
|
mqtt_connected = false;
|
|
if (client) // connected to a remote broker
|
|
{
|
|
if (bSendDisconnect and client->connected())
|
|
{
|
|
message.create(MqttMessage::Type::Disconnect);
|
|
message.hexdump("close");
|
|
message.sendTo(this);
|
|
}
|
|
client->stop();
|
|
}
|
|
|
|
if (local_broker)
|
|
{
|
|
local_broker->removeClient(this);
|
|
local_broker = nullptr;
|
|
}
|
|
}
|
|
|
|
void MqttClient::connect(MqttBroker* local)
|
|
{
|
|
debug("MqttClient::connect_1");
|
|
close();
|
|
local_broker = local;
|
|
}
|
|
|
|
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
|
{
|
|
debug("MqttClient::connect_3");
|
|
keep_alive = ka;
|
|
close();
|
|
if (client) delete client;
|
|
client = new TcpClient;
|
|
|
|
debug("Trying to connect to " << broker.c_str() << ':' << port);
|
|
#ifdef TINY_MQTT_ASYNC
|
|
client->onData(onData, this);
|
|
client->onConnect(onConnect, this);
|
|
client->connect(broker.c_str(), port, ka);
|
|
#else
|
|
if (client->connect(broker.c_str(), port))
|
|
{
|
|
onConnect(this, client);
|
|
}
|
|
#endif
|
|
}
|
|
|
|
void MqttBroker::addClient(MqttClient* client)
|
|
{
|
|
debug("MqttBroker::addClient");
|
|
clients.push_back(client);
|
|
}
|
|
|
|
void MqttBroker::connect(const std::string& host, uint16_t port)
|
|
{
|
|
debug("MqttBroker::connect_2");
|
|
if (broker == nullptr) broker = new MqttClient;
|
|
broker->connect(host, port);
|
|
broker->local_broker = this; // Because connect removed the link
|
|
}
|
|
|
|
void MqttBroker::removeClient(MqttClient* remove)
|
|
{
|
|
debug("removeClient");
|
|
for(auto it=clients.begin(); it!=clients.end(); it++)
|
|
{
|
|
auto client=*it;
|
|
if (client==remove)
|
|
{
|
|
// TODO if this broker is connected to an external broker
|
|
// we have to unsubscribe remove's topics.
|
|
// (but doing this, check that other clients are not subscribed...)
|
|
// Unless -> we could receive useless messages
|
|
// -> we are using (memory) one IndexedString plus its string for nothing.
|
|
debug("Remove " << clients.size());
|
|
clients.erase(it);
|
|
debug("Client removed " << clients.size());
|
|
return;
|
|
}
|
|
}
|
|
debug(red << "Error cannot remove client"); // TODO should not occur
|
|
}
|
|
|
|
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
|
{
|
|
debug("MqttBroker::onClient");
|
|
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
|
|
|
broker->addClient(new MqttClient(broker, client));
|
|
debug("New client");
|
|
}
|
|
|
|
void MqttBroker::loop()
|
|
{
|
|
#ifndef TINY_MQTT_ASYNC
|
|
WiFiClient client = server->available();
|
|
|
|
if (client)
|
|
{
|
|
onClient(this, &client);
|
|
}
|
|
#endif
|
|
if (broker)
|
|
{
|
|
// TODO should monitor broker's activity.
|
|
// 1 When broker disconnect and reconnect we have to re-subscribe
|
|
broker->loop();
|
|
}
|
|
|
|
|
|
// for(auto it=clients.begin(); it!=clients.end(); it++)
|
|
// use index because size can change during the loop
|
|
for(size_t i=0; i<clients.size(); i++)
|
|
{
|
|
auto client = clients[i];
|
|
if (client->connected())
|
|
{
|
|
client->loop();
|
|
}
|
|
else
|
|
{
|
|
debug("Client " << client->id().c_str() << " Disconnected, local_broker=" << (dbg_ptr)client->local_broker);
|
|
// Note: deleting a client not added by the broker itself will probably crash later.
|
|
delete client;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
|
|
{
|
|
debug("MqttBroker::subscribe");
|
|
if (broker && broker->connected())
|
|
{
|
|
return broker->subscribe(topic, qos);
|
|
}
|
|
return MqttNowhereToSend;
|
|
}
|
|
|
|
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) const
|
|
{
|
|
MqttError retval = MqttOk;
|
|
|
|
debug("MqttBroker::publish");
|
|
int i=0;
|
|
for(auto client: clients)
|
|
{
|
|
i++;
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << __LINE__ << " broker:" << (broker && broker->connected() ? "linked" : "alone") <<
|
|
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
|
|
#endif
|
|
bool doit = false;
|
|
if (broker && broker->connected()) // this (MqttBroker) is connected (to a external broker)
|
|
{
|
|
// ext_broker -> clients or clients -> ext_broker
|
|
if (source == broker) // external broker -> internal clients
|
|
doit = true;
|
|
else // external clients -> this broker
|
|
{
|
|
// As this broker is connected to another broker, simply forward the msg
|
|
MqttError ret = broker->publishIfSubscribed(topic, msg);
|
|
if (ret != MqttOk) retval = ret;
|
|
}
|
|
}
|
|
else // Disconnected
|
|
{
|
|
doit = true;
|
|
}
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << ", doit=" << doit << ' ';
|
|
#endif
|
|
|
|
if (doit) retval = client->publishIfSubscribed(topic, msg);
|
|
debug("");
|
|
}
|
|
return retval;
|
|
}
|
|
|
|
bool MqttBroker::compareString(
|
|
const char* good,
|
|
const char* str,
|
|
uint8_t len) const
|
|
{
|
|
while(len-- and *good++==*str++);
|
|
|
|
return *good==0;
|
|
}
|
|
|
|
void MqttMessage::getString(const char* &buff, uint16_t& len)
|
|
{
|
|
len = getSize(buff);
|
|
buff+=2;
|
|
}
|
|
|
|
void MqttClient::clientAlive(uint32_t more_seconds)
|
|
{
|
|
debug("MqttClient::clientAlive");
|
|
if (keep_alive)
|
|
{
|
|
#ifdef EPOXY_DUINO
|
|
alive=millis()+500000+0*more_seconds;
|
|
#else
|
|
alive=millis()+1000*(keep_alive+more_seconds);
|
|
#endif
|
|
}
|
|
else
|
|
alive=0;
|
|
}
|
|
|
|
void MqttClient::loop()
|
|
{
|
|
if (alive && (millis() > alive))
|
|
{
|
|
if (local_broker)
|
|
{
|
|
debug(red << "timeout client");
|
|
close();
|
|
debug(red << "closed");
|
|
}
|
|
else if (client && client->connected())
|
|
{
|
|
debug("pingreq");
|
|
uint16_t pingreq = MqttMessage::Type::PingReq;
|
|
client->write((const char*)(&pingreq), 2);
|
|
clientAlive(0);
|
|
|
|
// TODO when many MqttClient passes through a local broker
|
|
// there is no need to send one PingReq per instance.
|
|
}
|
|
}
|
|
#ifndef TINY_MQTT_ASYNC
|
|
while(client && client->available()>0)
|
|
{
|
|
message.incoming(client->read());
|
|
if (message.type())
|
|
{
|
|
processMessage(&message);
|
|
message.reset();
|
|
}
|
|
}
|
|
#endif
|
|
}
|
|
|
|
void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
|
|
{
|
|
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
|
|
debug("MqttClient::onConnect");
|
|
MqttMessage msg(MqttMessage::Type::Connect);
|
|
msg.add("MQTT",4);
|
|
msg.add(0x4); // Mqtt protocol version 3.1.1
|
|
msg.add(0x0); // Connect flags TODO user / name
|
|
|
|
msg.add((char)(mqtt->keep_alive >> 8)); // keep_alive
|
|
msg.add((char)(mqtt->keep_alive & 0xFF));
|
|
msg.add(mqtt->clientId);
|
|
debug("cnx: mqtt connecting");
|
|
msg.sendTo(mqtt);
|
|
msg.reset();
|
|
debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker);
|
|
|
|
mqtt->clientAlive(0);
|
|
}
|
|
|
|
#ifdef TINY_MQTT_ASYNC
|
|
void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len)
|
|
{
|
|
char* char_ptr = static_cast<char*>(data);
|
|
MqttClient* client=static_cast<MqttClient*>(client_ptr);
|
|
while(len>0)
|
|
{
|
|
client->message.incoming(*char_ptr++);
|
|
if (client->message.type())
|
|
{
|
|
client->processMessage(&client->message);
|
|
client->message.reset();
|
|
}
|
|
len--;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
void MqttClient::resubscribe()
|
|
{
|
|
// TODO resubscription limited to 256 bytes
|
|
if (subscriptions.size())
|
|
{
|
|
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
|
|
|
|
// TODO manage packet identifier
|
|
msg.add(0);
|
|
msg.add(0);
|
|
|
|
for(auto topic: subscriptions)
|
|
{
|
|
msg.add(topic);
|
|
msg.add(0); // TODO qos
|
|
}
|
|
msg.sendTo(this); // TODO return value
|
|
}
|
|
}
|
|
|
|
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
|
|
{
|
|
debug("MqttClient::subsribe(" << topic.c_str() << ")");
|
|
MqttError ret = MqttOk;
|
|
|
|
subscriptions.insert(topic);
|
|
|
|
if (local_broker==nullptr) // remote broker
|
|
{
|
|
return sendTopic(topic, MqttMessage::Type::Subscribe, qos);
|
|
}
|
|
else
|
|
{
|
|
return local_broker->subscribe(topic, qos);
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
MqttError MqttClient::unsubscribe(Topic topic)
|
|
{
|
|
debug("MqttClient::unsubscribe");
|
|
auto it=subscriptions.find(topic);
|
|
if (it != subscriptions.end())
|
|
{
|
|
subscriptions.erase(it);
|
|
if (local_broker==nullptr) // remote broker
|
|
{
|
|
return sendTopic(topic, MqttMessage::Type::UnSubscribe, 0);
|
|
}
|
|
}
|
|
return MqttOk;
|
|
}
|
|
|
|
MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos)
|
|
{
|
|
debug("MqttClient::sendTopic");
|
|
MqttMessage msg(type, 2);
|
|
|
|
// TODO manage packet identifier
|
|
msg.add(0);
|
|
msg.add(0);
|
|
|
|
msg.add(topic);
|
|
msg.add(qos);
|
|
|
|
// TODO instead we should wait (state machine) for SUBACK / UNSUBACK ?
|
|
return msg.sendTo(this);
|
|
}
|
|
|
|
void MqttClient::processMessage(MqttMessage* mesg)
|
|
{
|
|
#ifdef TINY_MQTT_DEBUG
|
|
mesg->hexdump("Incoming");
|
|
#endif
|
|
auto header = mesg->getVHeader();
|
|
const char* payload;
|
|
uint16_t len;
|
|
bool bclose=true;
|
|
|
|
#ifdef EPOXY_DUINO
|
|
counters[mesg->type()]++;
|
|
#endif
|
|
|
|
switch(mesg->type())
|
|
{
|
|
case MqttMessage::Type::Connect:
|
|
if (mqtt_connected)
|
|
{
|
|
debug("already connected");
|
|
break;
|
|
}
|
|
payload = header+10;
|
|
mqtt_flags = header[7];
|
|
keep_alive = MqttMessage::getSize(header+8);
|
|
if (strncmp("MQTT", header+2,4))
|
|
{
|
|
debug("bad mqtt header");
|
|
break;
|
|
}
|
|
if (header[6]!=0x04)
|
|
{
|
|
debug("unknown level");
|
|
break; // Level 3.1.1
|
|
}
|
|
|
|
// ClientId
|
|
mesg->getString(payload, len);
|
|
clientId = std::string(payload, len);
|
|
payload += len;
|
|
|
|
if (mqtt_flags & FlagWill) // Will topic
|
|
{
|
|
mesg->getString(payload, len); // Will Topic
|
|
payload += len;
|
|
|
|
mesg->getString(payload, len); // Will Message
|
|
payload += len;
|
|
}
|
|
// FIXME forgetting credential is allowed (security hole)
|
|
if (mqtt_flags & FlagUserName)
|
|
{
|
|
mesg->getString(payload, len);
|
|
if (not local_broker->checkUser(payload, len)) break;
|
|
payload += len;
|
|
}
|
|
if (mqtt_flags & FlagPassword)
|
|
{
|
|
mesg->getString(payload, len);
|
|
if (not local_broker->checkPassword(payload, len)) break;
|
|
payload += len;
|
|
}
|
|
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << yellow << "Client connected :" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << white << endl;
|
|
#endif
|
|
bclose = false;
|
|
mqtt_connected=true;
|
|
{
|
|
MqttMessage msg(MqttMessage::Type::ConnAck);
|
|
msg.add(0); // Session present (not implemented)
|
|
msg.add(0); // Connection accepted
|
|
msg.sendTo(this);
|
|
}
|
|
break;
|
|
|
|
case MqttMessage::Type::ConnAck:
|
|
mqtt_connected = true;
|
|
bclose = false;
|
|
resubscribe();
|
|
break;
|
|
|
|
case MqttMessage::Type::SubAck:
|
|
case MqttMessage::Type::PubAck:
|
|
if (!mqtt_connected) break;
|
|
// Ignore acks
|
|
bclose = false;
|
|
break;
|
|
|
|
case MqttMessage::Type::PingResp:
|
|
// TODO: no PingResp is suspicious (server dead)
|
|
bclose = false;
|
|
break;
|
|
|
|
case MqttMessage::Type::PingReq:
|
|
if (!mqtt_connected) break;
|
|
if (client)
|
|
{
|
|
uint16_t pingreq = MqttMessage::Type::PingResp;
|
|
debug(cyan << "Ping response to client ");
|
|
client->write((const char*)(&pingreq), 2);
|
|
bclose = false;
|
|
}
|
|
else
|
|
{
|
|
debug(red << "internal pingreq ?");
|
|
}
|
|
break;
|
|
|
|
case MqttMessage::Type::Subscribe:
|
|
case MqttMessage::Type::UnSubscribe:
|
|
{
|
|
if (!mqtt_connected) break;
|
|
payload = header+2;
|
|
|
|
debug("un/subscribe loop");
|
|
std::string qoss;
|
|
while(payload < mesg->end())
|
|
{
|
|
mesg->getString(payload, len); // Topic
|
|
debug( " topic (" << std::string(payload, len) << ')');
|
|
// subscribe(Topic(payload, len));
|
|
Topic topic(payload, len);
|
|
|
|
payload += len;
|
|
if (mesg->type() == MqttMessage::Type::Subscribe)
|
|
{
|
|
uint8_t qos = *payload++;
|
|
if (qos != 0)
|
|
{
|
|
debug("Unsupported QOS" << qos << endl);
|
|
qoss.push_back(0x80);
|
|
}
|
|
else
|
|
qoss.push_back(qos);
|
|
subscriptions.insert(topic);
|
|
}
|
|
else
|
|
{
|
|
auto it=subscriptions.find(topic);
|
|
if (it != subscriptions.end())
|
|
subscriptions.erase(it);
|
|
}
|
|
}
|
|
debug("end loop");
|
|
bclose = false;
|
|
|
|
MqttMessage ack(mesg->type() == MqttMessage::Type::Subscribe ? MqttMessage::Type::SubAck : MqttMessage::Type::UnSuback);
|
|
ack.add(header[0]);
|
|
ack.add(header[1]);
|
|
ack.add(qoss.c_str(), qoss.size(), false);
|
|
ack.sendTo(this);
|
|
}
|
|
break;
|
|
|
|
case MqttMessage::Type::UnSuback:
|
|
if (!mqtt_connected) break;
|
|
bclose = false;
|
|
break;
|
|
|
|
case MqttMessage::Type::Publish:
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << "publish " << mqtt_connected << '/' << (long) client << endl;
|
|
#endif
|
|
if (mqtt_connected or client == nullptr)
|
|
{
|
|
uint8_t qos = mesg->flags();
|
|
payload = header;
|
|
mesg->getString(payload, len);
|
|
Topic published(payload, len);
|
|
payload += len;
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << "Received Publish (" << published.str().c_str() << ") size=" << (int)len << endl;
|
|
#endif
|
|
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
|
|
if (qos) payload+=2; // ignore packet identifier if any
|
|
len=mesg->end()-payload;
|
|
// TODO reset DUP
|
|
// TODO reset RETAIN
|
|
|
|
if (local_broker==nullptr or client==nullptr) // internal MqttClient receives publish
|
|
{
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n";
|
|
Console << "has " << (callback ? "" : "no ") << " callback.\n";
|
|
#endif
|
|
if (callback and isSubscribedTo(published))
|
|
{
|
|
callback(this, published, payload, len); // TODO send the real payload
|
|
}
|
|
}
|
|
else if (local_broker) // from outside to inside
|
|
{
|
|
debug("publishing to local_broker");
|
|
local_broker->publish(this, published, *mesg);
|
|
}
|
|
bclose = false;
|
|
}
|
|
break;
|
|
|
|
case MqttMessage::Type::Disconnect:
|
|
// TODO should discard any will msg
|
|
if (!mqtt_connected) break;
|
|
mqtt_connected = false;
|
|
close(false);
|
|
bclose=false;
|
|
break;
|
|
|
|
default:
|
|
bclose=true;
|
|
break;
|
|
};
|
|
if (bclose)
|
|
{
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << red << "*************** Error msg 0x" << _HEX(mesg->type());
|
|
mesg->hexdump("-------ERROR ------");
|
|
dump();
|
|
Console << white << endl;
|
|
#endif
|
|
close();
|
|
}
|
|
else
|
|
{
|
|
clientAlive(local_broker ? 5 : 0);
|
|
}
|
|
}
|
|
|
|
bool Topic::matches(const Topic& topic) const
|
|
{
|
|
if (getIndex() == topic.getIndex()) return true;
|
|
const char* p1 = c_str();
|
|
const char* p2 = topic.c_str();
|
|
|
|
if (p1 == p2) return true;
|
|
if (*p2 == '$' and *p1 != '$') return false;
|
|
|
|
while(*p1 and *p2)
|
|
{
|
|
if (*p1 == '+')
|
|
{
|
|
++p1;
|
|
if (*p1 and *p1!='/') return false;
|
|
if (*p1) ++p1;
|
|
while(*p2 and *p2++!='/');
|
|
}
|
|
else if (*p1 == '#')
|
|
{
|
|
if (*++p1==0) return true;
|
|
return false;
|
|
}
|
|
else if (*p1 == '*')
|
|
{
|
|
const char c=*(p1+1);
|
|
if (c==0) return true;
|
|
if (c!='/') return false;
|
|
const char*p = p1+2;
|
|
while(*p and *p2)
|
|
{
|
|
if (*p == *p2)
|
|
{
|
|
if (*p==0) return true;
|
|
if (*p=='/')
|
|
{
|
|
p1=p;
|
|
break;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
while(*p2 and *p2++!='/');
|
|
break;
|
|
}
|
|
++p;
|
|
++p2;
|
|
}
|
|
if (*p==0) return true;
|
|
}
|
|
else if (*p1 == *p2)
|
|
{
|
|
++p1;
|
|
++p2;
|
|
}
|
|
else
|
|
return false;
|
|
}
|
|
if (*p1=='/' and p1[1]=='#' and p1[2]==0) return true;
|
|
return *p1==0 and *p2==0;
|
|
}
|
|
|
|
|
|
// publish from local client
|
|
MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
|
|
{
|
|
MqttMessage msg(MqttMessage::Publish);
|
|
msg.add(topic);
|
|
msg.add(payload, pay_length, false);
|
|
msg.complete();
|
|
|
|
if (local_broker)
|
|
{
|
|
return local_broker->publish(this, topic, msg);
|
|
}
|
|
else if (client)
|
|
return msg.sendTo(this);
|
|
else
|
|
return MqttNowhereToSend;
|
|
}
|
|
|
|
// republish a received publish if it matches any in subscriptions
|
|
MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg)
|
|
{
|
|
MqttError retval=MqttOk;
|
|
|
|
debug("mqttclient publishIfSubscribed " << topic.c_str() << ' ' << subscriptions.size());
|
|
if (isSubscribedTo(topic))
|
|
{
|
|
if (client)
|
|
retval = msg.sendTo(this);
|
|
else
|
|
{
|
|
processMessage(&msg);
|
|
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << "Should call the callback ?\n";
|
|
#endif
|
|
// callback(this, topic, nullptr, 0); // TODO Payload
|
|
}
|
|
}
|
|
return retval;
|
|
}
|
|
|
|
bool MqttClient::isSubscribedTo(const Topic& topic) const
|
|
{
|
|
for(const auto& subscription: subscriptions)
|
|
if (subscription.matches(topic))
|
|
return true;
|
|
|
|
return false;
|
|
}
|
|
|
|
void MqttMessage::reset()
|
|
{
|
|
buffer.clear();
|
|
state=FixedHeader;
|
|
size=0;
|
|
}
|
|
|
|
void MqttMessage::incoming(char in_byte)
|
|
{
|
|
buffer += in_byte;
|
|
switch(state)
|
|
{
|
|
case FixedHeader:
|
|
size=MaxBufferLength;
|
|
state = Length;
|
|
break;
|
|
case Length:
|
|
|
|
if (size==MaxBufferLength)
|
|
size = in_byte & 0x7F;
|
|
else
|
|
size += static_cast<uint16_t>(in_byte & 0x7F)<<7;
|
|
|
|
if (size > MaxBufferLength)
|
|
state = Error;
|
|
else if ((in_byte & 0x80) == 0)
|
|
{
|
|
vheader = buffer.length();
|
|
if (size==0)
|
|
state = Complete;
|
|
else
|
|
{
|
|
buffer.reserve(size);
|
|
state = VariableHeader;
|
|
}
|
|
}
|
|
break;
|
|
case VariableHeader:
|
|
case PayLoad:
|
|
--size;
|
|
if (size==0)
|
|
{
|
|
state=Complete;
|
|
// hexdump("rec");
|
|
}
|
|
break;
|
|
case Create:
|
|
size++;
|
|
break;
|
|
case Complete:
|
|
default:
|
|
#ifdef TINY_MQTT_DEBUG
|
|
Console << red << "Spurious " << _HEX(in_byte) << white << endl;
|
|
hexdump("spurious");
|
|
#endif
|
|
reset();
|
|
break;
|
|
}
|
|
if (buffer.length() > MaxBufferLength)
|
|
{
|
|
debug("Too long " << state);
|
|
reset();
|
|
}
|
|
}
|
|
|
|
void MqttMessage::add(const char* p, size_t len, bool addLength)
|
|
{
|
|
if (addLength)
|
|
{
|
|
buffer.reserve(buffer.length()+2);
|
|
incoming(len>>8);
|
|
incoming(len & 0xFF);
|
|
}
|
|
while(len--) incoming(*p++);
|
|
}
|
|
|
|
void MqttMessage::encodeLength()
|
|
{
|
|
debug("encodingLength");
|
|
if (state != Complete)
|
|
{
|
|
int length = buffer.size()-3; // 3 = 1 byte for header + 2 bytes for pre-reserved length field.
|
|
if (length <= 0x7F)
|
|
{
|
|
buffer.erase(1,1);
|
|
buffer[1] = length;
|
|
vheader = 2;
|
|
}
|
|
else
|
|
{
|
|
buffer[1] = 0x80 | (length & 0x7F);
|
|
buffer[2] = (length >> 7);
|
|
vheader = 3;
|
|
}
|
|
|
|
// We could check that buffer[2] < 128 (end of length encoding)
|
|
state = Complete;
|
|
}
|
|
};
|
|
|
|
MqttError MqttMessage::sendTo(MqttClient* client)
|
|
{
|
|
if (buffer.size())
|
|
{
|
|
debug(cyan << "sending " << buffer.size() << " bytes to " << client->id());
|
|
encodeLength();
|
|
hexdump("Sending ");
|
|
client->write(&buffer[0], buffer.size());
|
|
}
|
|
else
|
|
{
|
|
debug(red << "??? Invalid send");
|
|
return MqttInvalidMessage;
|
|
}
|
|
return MqttOk;
|
|
}
|
|
|
|
void MqttMessage::hexdump(const char* prefix) const
|
|
{
|
|
(void)prefix;
|
|
#ifdef TINY_MQTT_DEBUG
|
|
if (TinyMqtt::debug<2) return;
|
|
static std::map<Type, std::string> tts={
|
|
{ Connect, "Connect" },
|
|
{ ConnAck, "Connack" },
|
|
{ Publish, "Publish" },
|
|
{ PubAck, "Puback" },
|
|
{ Subscribe, "Subscribe" },
|
|
{ SubAck, "Suback" },
|
|
{ UnSubscribe, "Unsubscribe" },
|
|
{ UnSuback, "Unsuback" },
|
|
{ PingReq, "Pingreq" },
|
|
{ PingResp, "Pingresp" },
|
|
{ Disconnect, "Disconnect" }
|
|
};
|
|
std::string t("Unknown");
|
|
Type typ=static_cast<Type>(buffer[0] & 0xF0);
|
|
if (tts.find(typ) != tts.end())
|
|
t=tts[typ];
|
|
Console.fg(cyan);
|
|
#ifdef NOT_ESP_CORE
|
|
Console << "---> MESSAGE " << t << ' ' << _HEX(typ) << ' ' << " mem=???" << endl;
|
|
#else
|
|
Console << "---> MESSAGE " << t << ' ' << _HEX(typ) << ' ' << " mem=" << ESP.getFreeHeap() << endl;
|
|
#endif
|
|
Console.fg(white);
|
|
|
|
uint16_t addr=0;
|
|
const int bytes_per_row = 8;
|
|
const char* hex_to_str = " | ";
|
|
const char* separator = hex_to_str;
|
|
const char* half_sep = " - ";
|
|
std::string ascii;
|
|
|
|
Console << prefix << " size(" << buffer.size() << "), state=" << state << endl;
|
|
|
|
for(const char chr: buffer)
|
|
{
|
|
if ((addr % bytes_per_row) == 0)
|
|
{
|
|
if (ascii.length()) Console << hex_to_str << ascii << separator << endl;
|
|
if (prefix) Console << prefix << separator;
|
|
ascii.clear();
|
|
}
|
|
addr++;
|
|
if (chr<16) Console << '0';
|
|
Console << _HEX(chr) << ' ';
|
|
|
|
ascii += (chr<32 ? '.' : chr);
|
|
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
|
|
}
|
|
if (ascii.length())
|
|
{
|
|
while(ascii.length() < bytes_per_row+strlen(half_sep))
|
|
{
|
|
Console << " "; // spaces per hexa byte
|
|
ascii += ' ';
|
|
}
|
|
Console << hex_to_str << ascii << separator;
|
|
}
|
|
|
|
Console << endl;
|
|
#endif
|
|
}
|