From 13d22f85b3a260eddd76d62a3d1f26f92c05981f Mon Sep 17 00:00:00 2001 From: hsaturn Date: Tue, 16 Mar 2021 23:44:14 +0100 Subject: [PATCH] First cmmit --- .gitignore | 1 + examples/simple-broker/simple-broker.ino | 32 +++ keywords.txt | 19 ++ library.json | 18 ++ library.properties | 9 + src/StringIndexer.cpp | 3 + src/StringIndexer.h | 112 ++++++++ src/TinyMqtt.cpp | 352 +++++++++++++++++++++++ src/TinyMqtt.h | 160 +++++++++++ 9 files changed, 706 insertions(+) create mode 100644 .gitignore create mode 100644 examples/simple-broker/simple-broker.ino create mode 100644 keywords.txt create mode 100644 library.json create mode 100644 library.properties create mode 100644 src/StringIndexer.cpp create mode 100644 src/StringIndexer.h create mode 100644 src/TinyMqtt.cpp create mode 100644 src/TinyMqtt.h diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b25c15b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*~ diff --git a/examples/simple-broker/simple-broker.ino b/examples/simple-broker/simple-broker.ino new file mode 100644 index 0000000..6f0fe64 --- /dev/null +++ b/examples/simple-broker/simple-broker.ino @@ -0,0 +1,32 @@ +#include +#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt +#include // https://github.com/janelia-arduino/Streaming + +const char *ssid = ; // Put here your wifi SSID ("ssid") +const char *password = ; // Put here your Wifi password ("pwd") + +#define PORT 1883 +MqttBroker broker(PORT); + +void setup() +{ + Serial.begin(115200); + + WiFi.mode(WIFI_STA); + WiFi.begin(ssid, password); + + while (WiFi.status() != WL_CONNECTED) { + delay(500); + Serial << '.'; + delay(500); + } + Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; + + broker.begin(); + Serial << "Broker ready : " << WiFi.localIP() << " on port " << PORT << endl; +} + +void loop() +{ + broker.loop(); +} diff --git a/keywords.txt b/keywords.txt new file mode 100644 index 0000000..2430b38 --- /dev/null +++ b/keywords.txt @@ -0,0 +1,19 @@ +####################################### +# Syntax Coloring Map For TinyMQTT +####################################### + +####################################### +# Datatypes (KEYWORD1) +####################################### + +MqttBroker KEYWORD1 +MqttClient KEYWORD1 + +####################################### +# Methods and Functions (KEYWORD2) +####################################### + + +####################################### +# Constants (LITERAL1) +####################################### diff --git a/library.json b/library.json new file mode 100644 index 0000000..838e1fc --- /dev/null +++ b/library.json @@ -0,0 +1,18 @@ +{ + "name": "TinyMqtt", + "keywords": "ethernet, mqtt, m2m, iot", + "description": "MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS.", + "repository": { + "type": "git", + "url": "https://github.com/hsaturn/TinyMqtt.git" + }, + "version": "0.1", + "exclude": "", + "examples": "examples/*/*.ino", + "frameworks": "arduino", + "platforms": [ + "atmelavr", + "espressif8266", + "espressif32" + ] +} diff --git a/library.properties b/library.properties new file mode 100644 index 0000000..0e4762f --- /dev/null +++ b/library.properties @@ -0,0 +1,9 @@ +name=TinyMqtt +version=0.1 +author=HSaturn +maintainer=HSaturn +sentence=A tiny broker and client library for MQTT messaging. +paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This library allows you to send and receive MQTT messages. It does support MQTT 3.1.1 without any QOS. +category=Communication +url=https://github.com/hsaturn/TinyMqtt +architectures=* diff --git a/src/StringIndexer.cpp b/src/StringIndexer.cpp new file mode 100644 index 0000000..a535fed --- /dev/null +++ b/src/StringIndexer.cpp @@ -0,0 +1,3 @@ +#include "StringIndexer.h" + +std::map StringIndexer::strings; diff --git a/src/StringIndexer.h b/src/StringIndexer.h new file mode 100644 index 0000000..5c27e87 --- /dev/null +++ b/src/StringIndexer.h @@ -0,0 +1,112 @@ +#pragma once +#include +#include +#include +#include +#include + +/*** + * Allows to store up to 255 different strings with one byte class + * very memory efficient when one string is used many times. + */ +class StringIndexer +{ + class StringCounter + { + std::string str; + uint8_t used=0; + friend class StringIndexer; + }; + public: + using index_t=uint8_t; + + static const index_t strToIndex(const char* str, uint8_t len) + { + for(auto it=strings.begin(); it!=strings.end(); it++) + { + if (strncmp(it->second.str.c_str(), str, len)==0) + { + it->second.used++; + return it->first; + } + } + for(index_t index=0; index<255; index++) + { + if (strings.find(index)==strings.end()) + { + strings[index].str = std::string(str, len); + strings[index].used++; + Serial << "Creating index " << index << " for (" << strings[index].str.c_str() << ") len=" << len << endl; + return index; + } + } + return 0; // TODO out of indexes + } + + static const std::string& str(const index_t& index) + { + static std::string dummy; + const auto& it=strings.find(index); + if (it == strings.end()) return dummy; + return it->second.str; + } + + static void use(const index_t& index) + { + auto it=strings.find(index); + if (it != strings.end()) it->second.used++; + } + + static void release(const index_t& index) + { + auto it=strings.find(index); + if (it != strings.end()) + { + it->second.used--; + if (it->second.used == 0) + { + strings.erase(it); + Serial << "Removing string(" << it->second.str.c_str() << ") size=" << strings.size() << endl; + } + } + } + + private: + static std::map strings; +}; + +class IndexedString +{ + public: + IndexedString(const IndexedString& source) + { + StringIndexer::use(source.index); + index = source.index; + } + + IndexedString(const char* str, uint8_t len) + { + index=StringIndexer::strToIndex(str, len); + } + + ~IndexedString() { StringIndexer::release(index); } + + IndexedString& operator=(const IndexedString& source) + { + StringIndexer::use(source.index); + index = source.index; + return *this; + } + + friend bool operator<(const IndexedString& i1, const IndexedString& i2) + { + return i1.index < i2.index; + } + + const std::string& str() const { return StringIndexer::str(index); } + + const StringIndexer::index_t getIndex() const { return index; } + + private: + StringIndexer::index_t index; +}; diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp new file mode 100644 index 0000000..b5f694a --- /dev/null +++ b/src/TinyMqtt.cpp @@ -0,0 +1,352 @@ +#include "TinyMqtt.h" +#include +#include + +void outstring(const char* prefix, const char*p, uint16_t len) +{ + return; + Serial << prefix << "='"; + while(len--) Serial << (char)*p++; + Serial << '\'' << endl; +} + +MqttBroker::MqttBroker(uint16_t port) + : server(port) +{ +} + +MqttCnx::MqttCnx(MqttBroker* parent, WiFiClient& new_client) + : parent(parent), + mqtt_connected(false) +{ + client = new_client ? new WiFiClient(new_client) : nullptr; + clientAlive(); +} + +MqttCnx::~MqttCnx() +{ + close(); +} + +void MqttCnx::close() +{ + if (client) + { + client->stop(); + delete client; + client = nullptr; + } +} + +void MqttBroker::loop() +{ + WiFiClient client = server.available(); + + if (client) + { + clients.push_back(new MqttCnx(this, client)); + Serial << "New client (" << clients.size() << ')' << endl; + } + + for(auto it=clients.begin(); it!=clients.end(); it++) + { + auto client=*it; + if(client->connected()) + { + client->loop(); + } + else + { + Serial << "Client " << client->id().c_str() << " Disconnected" << endl; + clients.erase(it); + delete client; + break; + } + } +} + +void MqttBroker::publish(const Topic& topic, MqttMessage& msg) +{ + for(auto client: clients) + client->publish(topic, msg); +} + +bool MqttBroker::compareString( + const char* good, + const char* str, + uint8_t len) const +{ + while(len-- and *good++==*str++); + + return *good==0; +} + +void MqttMessage::getString(char* &buffer, uint16_t& len) +{ + len = (buffer[0]<<8)|(buffer[1]); + buffer+=2; +} + +void MqttCnx::clientAlive() +{ + if (keep_alive) + { + alive=millis()+1000*(keep_alive+5); + } + else + alive=0; +} + +void MqttCnx::loop() +{ + if (alive && (millis() > alive)) + { + Serial << "timeout client" << endl; + close(); + } + + while(client && client->available()>0) + { + message.incoming(client->read()); + if (message.type()) + { + processMessage(); + } + } +} + +void MqttCnx::processMessage() +{ + std::string error; + std::string s; + // Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; + auto header = message.getVHeader(); + char* payload; + uint16_t len; + bool bclose=true; + + switch(message.type() & 0XF0) + { + case MqttMessage::Type::Connect: + if (mqtt_connected) break; + payload = header+10; + flags = header[7]; + keep_alive = (header[8]<<8)|(header[9]); + if (strncmp("MQTT", header+2,4)) break; + if (header[6]!=0x04) break; // Level 3.1.1 + + // ClientId + message.getString(payload, len); + clientId = std::string(payload, len); + payload += len; + + if (flags & FlagWill) // Will topic + { + message.getString(payload, len); // Will Topic + outstring("WillTopic", payload, len); + payload += len; + + message.getString(payload, len); // Will Message + outstring("WillMessage", payload, len); + payload += len; + } + if (flags & FlagUserName) + { + message.getString(payload, len); + if (!parent->checkUser(payload, len)) break; + payload += len; + } + if (flags & FlagPassword) + { + message.getString(payload, len); + if (!parent->checkPassword(payload, len)) break; + payload += len; + } + + Serial << "Connected client:" << clientId.c_str() << ", keep alive=" << keep_alive << '.' << endl; + bclose = false; + mqtt_connected=true; + // Reuse received msg + message.create(MqttMessage::Type::Connack); + message.add(0); // Session present (not implemented) + message.add(0); // Connection accepted + message.sendTo(this); + break; + + case MqttMessage::Type::PingReq: + message.create(MqttMessage::Type::PingResp); + message.add(0); + message.sendTo(this); + bclose = false; + break; + + case MqttMessage::Type::Subscribe: + if (!mqtt_connected) break; + payload = header+2; + message.getString(payload, len); // Topic + outstring("Subscribes", payload, len); + + subscriptions.insert(Topic(payload, len)); + bclose = false; + // TODO SUBACK + break; + + case MqttMessage::Type::Publish: + if (!mqtt_connected) break; + { + uint8_t qos = message.type() & 0x6; + payload = header; + message.getString(payload, len); + Topic published(payload, len); + payload += len; + len=message.end()-payload; + // Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len + // << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << message.length() << endl; + if (qos) payload+=2; // ignore packet identifier if any + // TODO reset DUP + // TODO reset RETAIN + parent->publish(published, message); + // TODO should send PUBACK + bclose = false; + } + break; + + case MqttMessage::Type::PubAck: + if (!mqtt_connected) break; + bclose = false; + break; + + default: + bclose=true; + break; + }; + if (bclose) + { + Serial << "*************** Error msg 0x" << _HEX(message.type()); + if (error.length()) Serial << ':' << error.c_str(); + Serial << endl; + close(); + } + else + { + clientAlive(); + } + message.reset(); +} + +bool Topic::matches(const Topic& topic) const +{ + if (getIndex() == topic.getIndex()) return true; + if (str() == topic.str()) return true; + return false; +} + +void MqttCnx::publish(const Topic& topic, MqttMessage& msg) +{ + for(const auto& subscription: subscriptions) + { + if (subscription.matches(topic)) + { + // Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl; + msg.sendTo(this); + } + } +} + +void MqttMessage::reset() +{ + curr=buffer; + *curr=0; // Type Unknown + state=FixedHeader; + size=0; +} + +void MqttMessage::incoming(char in_byte) +{ + *curr++ = in_byte; + switch(state) + { + case FixedHeader: + size=0; + state = Length; + break; + case Length: + size = (size<<7) + (in_byte & 0x3F); + if (size > MaxBufferLength) + { + state = Error; + } + else if ((in_byte & 0x80) == 0) + { + vheader = curr; + if (size==0) + state = Complete; + else + state = VariableHeader; + } + break; + case VariableHeader: + case PayLoad: + --size; + if (size==0) + { + state=Complete; + // hexdump("rec"); + } + break; + case Create: + size++; + break; + case Complete: + default: + curr--; + Serial << "Spurious " << _HEX(in_byte) << endl; + state = Error; + break; + } + if (curr-buffer > 250) + { + Serial << "Too much incoming bytes." << endl; + curr=buffer; + } +} + +void MqttMessage::encodeLength(char* msb, int length) +{ + do + { + uint8_t encoded(length & 0x7F); + length >>=7; + if (length) encoded |= 0x80; + *msb++ = encoded; + } while (length); +}; + +void MqttMessage::sendTo(MqttCnx* client) +{ + if (curr-buffer-2 >= 0) + { + encodeLength(buffer+1, curr-buffer-2); + // hexdump("snd"); + client->write(buffer, curr-buffer); + } + else + { + Serial << "??? Invalid send" << endl; + Serial << (long)end() << "-" << (long)buffer << endl; + } +} + +void MqttMessage::hexdump(const char* prefix) const +{ + if (prefix) Serial << prefix << ' '; + Serial << (long)buffer << "-" << (long)curr << " : "; + const char* p=buffer; + while(p!=curr) + { + if (*p<16) Serial << '0'; + Serial << _HEX(*p) << ' '; + p++; + } + Serial << endl; +} diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h new file mode 100644 index 0000000..5ee47ee --- /dev/null +++ b/src/TinyMqtt.h @@ -0,0 +1,160 @@ +#include +#include +#include +#include +#include "StringIndexer.h" + +#define MaxBufferLength 255 + +class Topic : public IndexedString +{ + public: + Topic(const char* s, uint8_t len) : IndexedString(s,len){} + Topic(const char* s) : Topic(s, strlen(s)) {} + + bool matches(const Topic&) const; +}; + +class MqttCnx; +class MqttMessage +{ + public: + enum Type + { + Unknown = 0, + Connect = 0x10, + Connack = 0x20, + Publish = 0x30, + PubAck = 0x40, + Subscribe = 0x80, + PingReq = 0xC0, + PingResp = 0xD0, + }; + enum State + { + FixedHeader=0, + Length=1, + VariableHeader=2, + PayLoad=3, + Complete=4, + Error=5, + Create=6 + }; + + MqttMessage() { reset(); } + MqttMessage(Type t) { create(t); } + void incoming(char byte); + void add(char byte) { incoming(byte); } + char* getVHeader() const { return vheader; } + char* end() const { return curr; } + uint16_t length() const { return curr-buffer; } + + void reset(); + + // buff is MSB/LSB/STRING + // output buff+=2, len=length(str) + void getString(char* &buff, uint16_t& len); + + + Type type() const + { + return state == Complete ? static_cast(buffer[0]) : Unknown; + } + + void create(Type type) + { + buffer[0]=type; + curr=buffer+2; + vheader=curr; + size=0; + state=Create; + } + void sendTo(MqttCnx*); + void hexdump(const char* prefix=nullptr) const; + + private: + void encodeLength(char* msb, int length); + + char buffer[256]; // TODO 256 ? + char* vheader; + char* curr; + uint16_t size; // bytes left to receive + State state; +}; + +class MqttBroker; +class MqttCnx +{ + enum Flags + { + FlagUserName = 128, + FlagPassword = 64, + FlagWillRetain = 32, // unsupported + FlagWillQos = 16 | 8, // unsupported + FlagWill = 4, // unsupported + FlagCleanSession = 2, // unsupported + FlagReserved = 1 + }; + public: + MqttCnx(MqttBroker* parent, WiFiClient& client); + + ~MqttCnx(); + + bool connected() { return client && client->connected(); } + void write(const char* buf, size_t length) + { if (client) client->write(buf, length); } + + const std::string& id() const { return clientId; } + + void loop(); + void close(); + void publish(const Topic& topic, MqttMessage& msg); + + private: + void clientAlive(); + void processMessage(); + + char flags; + uint32_t keep_alive; + uint32_t alive; + bool mqtt_connected; + WiFiClient* client; + MqttMessage message; + MqttBroker* parent; + std::set subscriptions; + std::string clientId; +}; + +class MqttClient +{ + public: + MqttClient(IPAddress broker) : broker_ip(broker) {} + + protected: + IPAddress broker_ip; +}; + +class MqttBroker +{ + public: + MqttBroker(uint16_t port); + + void begin() { server.begin(); } + void loop(); + + bool checkUser(const char* user, uint8_t len) const + { return compareString(auth_user, user, len); } + + bool checkPassword(const char* password, uint8_t len) const + { return compareString(auth_password, password, len); } + + void publish(const Topic& topic, MqttMessage& msg); + + private: + bool compareString(const char* good, const char* str, uint8_t str_len) const; + std::vector clients; + WiFiServer server; + + const char* auth_user = "guest"; + const char* auth_password = "guest"; +};