// vim: ts=2 sw=2 expandtab #include "TinyMqtt.h" #include #if TINY_MQTT_DEBUG static auto cyan = TinyConsole::cyan; static auto white = TinyConsole::white; static auto red = TinyConsole::red; static auto yellow = TinyConsole::yellow; int TinyMqtt::debug=2; #endif #ifdef EPOXY_DUINO std::map MqttClient::counters; int MqttBroker::instances = 0; int MqttClient::instances = 0; #endif MqttBroker::MqttBroker(uint16_t port, uint8_t max_retain_size) { debug("New broker" << port); retain_size = max_retain_size; server = new TcpServer(port); #ifdef TINY_MQTT_ASYNC server->onClient(onClient, this); #endif #ifdef EPOXY_DUINO instances++; #endif } MqttBroker::~MqttBroker() { #ifdef EPOXY_DUINO instances--; #endif closeRemoteBroker(); while(clients.size()) { auto client = clients[0]; client->local_broker = nullptr; if (client->cltFlags & MqttClient::CltFlags::CltFlagToDelete) { // std::cout << "Deleting client" << std::endl; delete client; } clients.erase(clients.begin()); } delete server; } // private constructor used by broker only MqttClient::MqttClient(MqttBroker* local_broker, TcpClient* new_client) : local_broker(local_broker) { debug("MqttClient private with broker"); #ifdef TINY_MQTT_ASYNC tcp_client = new_client; tcp_client->onData(onData, this); // client->onConnect() TODO // client->onDisconnect() TODO #else tcp_client = new TcpClient(*new_client); #endif #ifdef EPOXY_DUINO alive = millis()+500000; instances++; #else alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg #endif } MqttClient::MqttClient(MqttBroker* local_broker, const string& id) : local_broker(local_broker), clientId(id) { alive = 0; keep_alive = 0; if (local_broker) local_broker->addClient(this); #ifdef EPOXY_DUINO instances++; #endif } MqttClient::~MqttClient() { #ifdef EPOXY_DUINO instances--; #endif close(); delete tcp_client; debug("*** MqttClient delete()"); } void MqttClient::close(bool bSendDisconnect) { debug("close " << id().c_str()); resetFlag(CltFlagConnected); if (tcp_client) // connected to a remote broker { if (bSendDisconnect and tcp_client->connected()) { message.create(MqttMessage::Type::Disconnect); message.hexdump("close"); message.sendTo(this); } tcp_client->stop(); } if (local_broker) { local_broker->removeClient(this); local_broker = nullptr; } } void MqttClient::connect(MqttBroker* local) { debug("MqttClient::connect_local"); close(); local_broker = local; local_broker->addClient(this); } void MqttClient::connect(string broker, uint16_t port, uint16_t ka) { debug("MqttClient::connect_to_host " << broker << ':' << port); keep_alive = ka; close(); if (tcp_client) delete tcp_client; tcp_client = new TcpClient; #ifdef TINY_MQTT_ASYNC tcp_client->onData(onData, this); tcp_client->onConnect(onConnect, this); tcp_client->connect(broker.c_str(), port); #else if (tcp_client->connect(broker.c_str(), port)) { debug("link established"); onConnect(this, tcp_client); } else { debug("unable to connect."); } #endif } void MqttBroker::addClient(MqttClient* client) { debug("MqttBroker::addClient"); clients.push_back(client); } void MqttBroker::closeRemoteBroker() { if (remote_broker) { delete remote_broker; remote_broker = nullptr; } } void MqttBroker::connect(const string& host, uint16_t port) { debug("MqttBroker::connect"); closeRemoteBroker(); if (remote_broker == nullptr) remote_broker = new MqttClient; remote_broker->connect(host, port); remote_broker->local_broker = this; // Because connect removed the link // TODO shouldn't we resubscribe to all client subscriptions ? } void MqttBroker::removeClient(MqttClient* remove) { debug("removeClient"); for(auto it=clients.begin(); it!=clients.end(); it++) { auto client=*it; if (client==remove) { // TODO if this broker is connected to an external broker // we have to unsubscribe remove's topics. // (but doing this, check that other clients are not subscribed...) // Unless -> we could receive useless messages // -> we are using (memory) one IndexedString plus its string for nothing. debug("Remove " << clients.size()); clients.erase(it); debug("Client removed " << clients.size()); return; } } debug(red << "Error cannot remove client"); // TODO should not occur } void MqttBroker::onClient(void* broker_ptr, TcpClient* client) { debug("MqttBroker::onClient"); MqttBroker* broker = static_cast(broker_ptr); MqttClient* mqtt = new MqttClient(broker, client); mqtt->setFlag(MqttClient::CltFlags::CltFlagToDelete); broker->addClient(mqtt); debug("New client"); } void MqttBroker::loop() { #ifndef TINY_MQTT_ASYNC TcpClient client = server->accept(); if (client) { onClient(this, &client); } #endif if (remote_broker) { // TODO should monitor broker's activity. // 1 When broker disconnect and reconnect we have to re-subscribe remote_broker->loop(); } for(size_t i=0; iconnected()) { client->loop(); } else { debug("Client " << client->id().c_str() << " Disconnected, local_broker=" << (dbg_ptr)client->local_broker); // Note: deleting a client not added by the broker itself will probably crash later. delete client; break; } } } // Obvioulsy called when the broker is connected to another broker. MqttError MqttBroker::subscribe(MqttClient* client, const Topic& topic, uint8_t qos) { debug("MqttBroker::subscribe to " << topic.str() << ", retained=" << retained.size() ); for(auto& retainItem: retained) { auto &retained_topic = retainItem.first; auto &retain = retainItem.second; debug(" retained: " << retained_topic.str()); if (topic.matches(retained_topic)) { debug(" -> sending"); client->publishIfSubscribed(retained_topic, retain.msg); } } if (remote_broker && remote_broker->connected()) { return remote_broker->subscribe(topic, qos); } return MqttNowhereToSend; } MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg) { MqttError retval = MqttOk; retain(topic, msg); debug("MqttBroker::publish"); int i=0; for(auto client: clients) { i++; #if TINY_MQTT_DEBUG Console << __LINE__ << " broker:" << (remote_broker && remote_broker->connected() ? "linked" : "alone") << " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; #endif bool doit = false; if (remote_broker && remote_broker->connected()) // this (MqttBroker) is connected (to a external broker) { // ext_broker -> clients or clients -> ext_broker if (source == remote_broker) // external broker -> internal clients doit = true; else // external clients -> this broker { // As this broker is connected to another broker, simply forward the msg MqttError ret = remote_broker->publishIfSubscribed(topic, msg); if (ret != MqttOk) retval = ret; } } else // Disconnected { doit = true; } #if TINY_MQTT_DEBUG Console << ", doit=" << doit << ' '; #endif if (doit) retval = client->publishIfSubscribed(topic, msg); debug(""); } return retval; } bool MqttBroker::compareString( const char* good, const char* str, uint8_t len) const { while(len-- and *good++==*str++); return *good==0; } void MqttMessage::getString(const char* &buff, uint16_t& len) { len = getSize(buff); buff+=2; } void MqttClient::clientAlive(uint32_t more_seconds) { debug("MqttClient::clientAlive"); if (keep_alive) { #ifdef EPOXY_DUINO alive=millis()+500000+0*more_seconds; #else alive=millis()+1000*(keep_alive+more_seconds); #endif } else alive=0; } void MqttClient::loop() { if (keep_alive && (millis() >= alive)) { if (tcp_client && tcp_client->connected()) { debug("pingreq"); static MqttMessage pingreq(MqttMessage::Type::PingReq); pingreq.sendTo(this); clientAlive(0); // TODO when many MqttClient passes through a local broker // there is no need to send one PingReq per instance. } else if (local_broker) { debug(red << "timeout client"); close(); debug(red << "closed"); } } #ifndef TINY_MQTT_ASYNC while(tcp_client && tcp_client->available()>0) { message.incoming(tcp_client->read()); if (message.type()) { processMessage(&message); message.reset(); } } #endif } void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) { MqttClient* mqtt = static_cast(mqttclient_ptr); debug("MqttClient::onConnect"); MqttMessage msg(MqttMessage::Type::Connect); msg.add("MQTT",4); msg.add(0x4); // Mqtt protocol version 3.1.1 msg.add(0x0); // Connect flags TODO user / name msg.add((char)(mqtt->keep_alive >> 8)); // keep_alive msg.add((char)(mqtt->keep_alive & 0xFF)); msg.add(mqtt->clientId); debug("cnx: mqtt connecting"); msg.sendTo(mqtt); msg.reset(); debug("cnx: mqtt sent " << (dbg_ptr)mqtt->local_broker); mqtt->clientAlive(0); } #ifdef TINY_MQTT_ASYNC void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len) { char* char_ptr = static_cast(data); MqttClient* client=static_cast(client_ptr); while(len>0) { client->message.incoming(*char_ptr++); if (client->message.type()) { client->processMessage(&client->message); client->message.reset(); } len--; } } #endif void MqttClient::resubscribe() { // TODO resubscription limited to 256 bytes if (subscriptions.size()) { MqttMessage msg(MqttMessage::Type::Subscribe, 2); // TODO manage packet identifier msg.add(0); msg.add(0); for(auto topic: subscriptions) { msg.add(topic); msg.add(0); // TODO qos } msg.sendTo(this); // TODO return value } } MqttError MqttClient::subscribe(Topic topic, uint8_t qos) { debug("MqttClient::subsribe(" << topic.c_str() << ")"); MqttError ret = MqttOk; subscriptions.insert(topic); if (local_broker==nullptr) // connected to a remote broker { return sendTopic(topic, MqttMessage::Type::Subscribe, qos); } else { return local_broker->subscribe(this, topic, qos); } return ret; } MqttError MqttClient::unsubscribe(Topic topic) { debug("MqttClient::unsubscribe"); auto it=subscriptions.find(topic); if (it != subscriptions.end()) { subscriptions.erase(it); if (local_broker==nullptr) // remote broker { return sendTopic(topic, MqttMessage::Type::UnSubscribe, 0); } } return MqttOk; } MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos) { debug("MqttClient::sendTopic"); MqttMessage msg(type, 2); // TODO manage packet identifier msg.add(0); msg.add(0); msg.add(topic); msg.add(qos); // TODO instead we should wait (state machine) for SUBACK / UNSUBACK ? return msg.sendTo(this); } void MqttClient::processMessage(MqttMessage* mesg) { #if TINY_MQTT_DEBUG mesg->hexdump("Incoming"); #endif auto header = mesg->getVHeader(); const char* payload; uint16_t len; bool bclose=true; #ifdef EPOXY_DUINO counters[mesg->type()]++; #endif switch(mesg->type()) { case MqttMessage::Type::Connect: if (mqtt_connected()) { debug("already connected"); break; } payload = header+10; mqtt_flags = header[7]; keep_alive = MqttMessage::getSize(header+8); if (strncmp("MQTT", header+2,4)) { debug("bad mqtt header"); break; } if (header[6]!=0x04) { debug("Unsupported MQTT version (" << (int) header[6] << "), only version=4 supported" << endl); break; // Level 3.1.1 } // ClientId mesg->getString(payload, len); clientId = string(payload, len); payload += len; if (mqtt_flags & FlagWill) // Will topic { mesg->getString(payload, len); // Will Topic payload += len; mesg->getString(payload, len); // Will Message payload += len; } // FIXME forgetting credential is allowed (security hole) if (mqtt_flags & FlagUserName) { mesg->getString(payload, len); if (not local_broker->checkUser(payload, len)) break; payload += len; } if (mqtt_flags & FlagPassword) { mesg->getString(payload, len); if (not local_broker->checkPassword(payload, len)) break; payload += len; } #if TINY_MQTT_DEBUG Console << yellow << "Client " << clientId << " connected : keep alive=" << keep_alive << '.' << white << endl; #endif bclose = false; setFlag(CltFlagConnected); { MqttMessage msg(MqttMessage::Type::ConnAck); msg.add(0); // Session present (not implemented) msg.add(0); // Connection accepted msg.sendTo(this); } break; case MqttMessage::Type::ConnAck: setFlag(CltFlagConnected); bclose = false; resubscribe(); break; case MqttMessage::Type::SubAck: case MqttMessage::Type::PubAck: if (not mqtt_connected()) break; // Ignore acks bclose = false; break; case MqttMessage::Type::PingResp: // TODO: no PingResp is suspicious (server dead) bclose = false; break; case MqttMessage::Type::PingReq: if (not mqtt_connected()) break; if (tcp_client) { uint16_t pingreq = MqttMessage::Type::PingResp; debug(cyan << "Ping response to client "); tcp_client->write((const char*)(&pingreq), 2); bclose = false; } else { debug(red << "internal pingreq ?"); } break; case MqttMessage::Type::Subscribe: case MqttMessage::Type::UnSubscribe: { if (not mqtt_connected()) break; payload = header+2; debug("un/subscribe loop"); string qoss; while(payload < mesg->end()) { mesg->getString(payload, len); // Topic debug( " topic (" << string(payload, len) << ')'); // subscribe(Topic(payload, len)); Topic topic(payload, len); payload += len; if (mesg->type() == MqttMessage::Type::Subscribe) { uint8_t qos = *payload++; if (qos != 0) { debug("Unsupported QOS" << qos << endl); qoss.push_back(0x80); } else qoss.push_back(qos); subscribe(topic); } else { auto it=subscriptions.find(topic); if (it != subscriptions.end()) subscriptions.erase(it); } } debug("end loop"); bclose = false; MqttMessage ack(mesg->type() == MqttMessage::Type::Subscribe ? MqttMessage::Type::SubAck : MqttMessage::Type::UnSuback); ack.add(header[0]); ack.add(header[1]); ack.add(qoss.c_str(), qoss.size(), false); ack.sendTo(this); } break; case MqttMessage::Type::UnSuback: if (not mqtt_connected()) break; bclose = false; break; case MqttMessage::Type::Publish: #if TINY_MQTT_DEBUG Console << "publish " << mqtt_connected() << '/' << (long) tcp_client << endl; #endif if (mqtt_connected() or tcp_client == nullptr) { uint8_t qos = mesg->flags(); payload = header; mesg->getString(payload, len); Topic published(payload, len); payload += len; #if TINY_MQTT_DEBUG Console << "Received Publish (" << published.str().c_str() << ") size=" << (int)len << endl; #endif // << '(' << string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl; if (qos) payload+=2; // ignore packet identifier if any len=mesg->end()-payload; // TODO reset DUP // TODO reset RETAIN if (local_broker==nullptr or tcp_client==nullptr) // internal MqttClient receives publish { #if TINY_MQTT_DEBUG if (TinyMqtt::debug >= 2) { Console << (isSubscribedTo(published) ? "not" : "") << " subscribed.\r\n"; Console << "has " << (callback ? "" : "no ") << " callback.\r\n"; } #endif if (callback and isSubscribedTo(published)) { callback(this, published, payload, len); } } else if (local_broker) // from outside to inside { debug("publishing to local_broker"); local_broker->publish(this, published, *mesg); } bclose = false; } break; case MqttMessage::Type::Disconnect: // TODO should discard any will msg if (not mqtt_connected()) break; resetFlag(CltFlagConnected); close(false); bclose=false; break; default: bclose=true; break; }; if (bclose) { #if TINY_MQTT_DEBUG Console << red << "*************** Error msg 0x" << _HEX(mesg->type()); mesg->hexdump("-------ERROR ------"); dump(); Console << white << endl; #endif close(); } else { clientAlive(local_broker ? 5 : 0); } } bool Topic::matches(const Topic& topic) const { if (getIndex() == topic.getIndex()) return true; const char* p1 = c_str(); const char* p2 = topic.c_str(); if (p1 == p2) return true; if (*p2 == '$' and *p1 != '$') return false; while(*p1 and *p2) { if (*p1 == '+') { ++p1; if (*p1 and *p1!='/') return false; if (*p1) ++p1; while(*p2 and *p2++!='/'); } else if (*p1 == '#') { if (*++p1==0) return true; return false; } else if (*p1 == '*') { const char c=*(p1+1); if (c==0) return true; if (c!='/') return false; const char*p = p1+2; while(*p and *p2) { if (*p == *p2) { if (*p==0) return true; if (*p=='/') { p1=p; break; } } else { while(*p2 and *p2++!='/'); break; } ++p; ++p2; } if (*p==0) return true; } else if (*p1 == *p2) { ++p1; ++p2; } else return false; } if (*p1=='/' and p1[1]=='#' and p1[2]==0) return true; return *p1==0 and *p2==0; } // publish from local client MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length, bool retain) { MqttMessage msg(MqttMessage::Publish, retain ? 1 : 0); msg.add(topic); msg.add(payload, pay_length, false); msg.complete(); if (local_broker) { return local_broker->publish(this, topic, msg); } else if (tcp_client and connected()) return msg.sendTo(this); else return MqttNowhereToSend; } // republish a received publish if it matches any in subscriptions MqttError MqttClient::publishIfSubscribed(const Topic& topic, MqttMessage& msg) { MqttError retval=MqttOk; debug("mqttclient publishIfSubscribed " << topic.c_str() << ' ' << subscriptions.size()); if (isSubscribedTo(topic)) { if (tcp_client) retval = msg.sendTo(this); else { processMessage(&msg); #if TINY_MQTT_DEBUG Console << "Should call the callback ?\n"; #endif // callback(this, topic, nullptr, 0); // TODO Payload } } return retval; } bool MqttClient::isSubscribedTo(const Topic& topic) const { for(const auto& subscription: subscriptions) if (subscription.matches(topic)) return true; return false; } void MqttMessage::reset() { buffer.clear(); state=FixedHeader; size=0; } void MqttMessage::incoming(char in_byte) { buffer += in_byte; switch(state) { case FixedHeader: size=MaxBufferLength; state = Length; break; case Length: if (size==MaxBufferLength) size = in_byte & 0x7F; else size += static_cast(in_byte & 0x7F)<<7; if (size > MaxBufferLength) state = Error; else if ((in_byte & 0x80) == 0) { vheader = buffer.length(); if (size==0) state = Complete; else { buffer.reserve(size); state = VariableHeader; } } break; case VariableHeader: case PayLoad: --size; if (size==0) { state=Complete; // hexdump("rec"); } break; case Create: size++; break; case Complete: default: #if TINY_MQTT_DEBUG Console << red << "Spurious " << _HEX(in_byte) << white << endl; hexdump("spurious"); #endif reset(); break; } if (buffer.length() > MaxBufferLength) { debug("Too long " << state); reset(); } } void MqttMessage::add(const char* p, size_t len, bool addLength) { if (addLength) { buffer.reserve(buffer.length()+2); incoming(len>>8); incoming(len & 0xFF); } while(len--) incoming(*p++); } void MqttMessage::encodeLength() { debug("encodingLength"); if (state != Complete) { int length = buffer.size()-3; // 3 = 1 byte for header + 2 bytes for pre-reserved length field. if (length <= 0x7F) { buffer.erase(1,1); buffer[1] = length; vheader = 2; } else { buffer[1] = 0x80 | (length & 0x7F); buffer[2] = (length >> 7); vheader = 3; } // We could check that buffer[2] < 128 (end of length encoding) state = Complete; } }; MqttError MqttMessage::sendTo(MqttClient* client) { if (buffer.size()) { debug(cyan << "sending " << buffer.size() << " bytes to " << client->id()); encodeLength(); hexdump("Sending "); client->write(&buffer[0], buffer.size()); } else { debug(red << "??? Invalid send"); return MqttInvalidMessage; } return MqttOk; } void MqttBroker::retainDrop() { if (retained.size() >= retain_size) { std::map::iterator oldest = retained.begin(); auto it = oldest; while(++it != retained.end()) { if (oldest->second.timestamp > it->second.timestamp) oldest = it; } retained.erase(oldest); } } void MqttBroker::retain(const Topic& topic, const MqttMessage& msg) { debug("MqttBroker::retain msg_type=" << _HEX(msg.type()) << ", retain_size=" << retain_size); if (retain_size==0 or msg.type() != MqttMessage::Publish) return; if (msg.flags() & 1) // flag RETAIN { debug(" retaining " << topic.str()); auto old = retained.find(topic); if (old == retained.end()) retainDrop(); else retained.erase(old); // FIXME if payload size == 0 remove message from retained Retain r(micros(), msg); r.msg.retained(); retained.insert({ topic, std::move(r)}); } } void MqttMessage::hexdump(const char* prefix) const { (void)prefix; #if TINY_MQTT_DEBUG if (TinyMqtt::debug<2) return; static std::map tts={ { Connect, "Connect" }, { ConnAck, "Connack" }, { Publish, "Publish" }, { PubAck, "Puback" }, { Subscribe, "Subscribe" }, { SubAck, "Suback" }, { UnSubscribe, "Unsubscribe" }, { UnSuback, "Unsuback" }, { PingReq, "Pingreq" }, { PingResp, "Pingresp" }, { Disconnect, "Disconnect" } }; string t("Unknown"); Type typ=static_cast(buffer[0] & 0xF0); if (tts.find(typ) != tts.end()) t=tts[typ]; Console.fg(cyan); #ifdef NOT_ESP_CORE Console << "---> MESSAGE " << t << ' ' << _HEX(typ) << ' ' << " mem=???" << endl; #else Console << "---> MESSAGE " << t << ' ' << _HEX(typ) << ' ' << " mem=" << ESP.getFreeHeap() << endl; #endif Console.fg(white); uint16_t addr=0; const int bytes_per_row = 8; const char* hex_to_str = " | "; const char* separator = hex_to_str; const char* half_sep = " - "; string ascii; Console << prefix << " size(" << buffer.size() << "), state=" << state << endl; for(const char chr: buffer) { if ((addr % bytes_per_row) == 0) { if (ascii.length()) Console << hex_to_str << ascii << separator << endl; if (prefix) Console << prefix << separator; ascii.clear(); } addr++; if (chr<16) Console << '0'; Console << _HEX(chr) << ' '; ascii += (chr<32 ? '.' : chr); if (ascii.length() == (bytes_per_row/2)) ascii += half_sep; } if (ascii.length()) { while(ascii.length() < bytes_per_row+strlen(half_sep)) { Console << " "; // spaces per hexa byte ascii += ' '; } Console << hex_to_str << ascii << separator; } Console << endl; #endif }