Compare commits

...

4 Commits
0.1.0 ... 0.2.0

Author SHA1 Message Date
hsaturn
cc708cdf22 Example when wifi is not connected 2021-03-19 22:04:23 +01:00
hsaturn
132fc56803 Update README.md 2021-03-19 22:01:02 +01:00
hsaturn
b33c9ba687 Version 0.2 2021-03-19 19:02:40 +01:00
hsaturn
bb2a2e6737 Added includes 2021-03-16 23:53:52 +01:00
9 changed files with 289 additions and 47 deletions

View File

@@ -1,2 +1,14 @@
# TinyMqtt # TinyMqtt
ESP 8266 Small footprint Mqtt Broker and Client ESP 8266 is a very capable Mqtt Broker and Client
Here are the features
- mqtt client can Works without WiFi (local mode) in a unique ESP
Thus, publishes and subscribes are possible and allows
minimal (degraded) function of a single module.
- broker can connect to another broker and becomes then a
proxy for clients that are connected to it.
- zeroconf, this is a strange but very powerful mode
where each ESP is a a broker and scans the local network.
After a while one ESP becomes the 'master'
and all ESP are connected together. The master can die
whithout breaking the system.

View File

@@ -0,0 +1,64 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** TinyMQTT allows a disconnected mode:
*
* In this example, local clients A and B are talking together, no need to be connected.
* A single ESP can use this to be able to comunicate with itself with the power
* of MQTT, and once connected still continue to work with others.
*
*/
std::string topic="sensor/temperature";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "init" << endl;
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
mqtt_b.setCallback(onPublishB);
mqtt_b.subscribe(topic);
}
void loop()
{
broker.loop();
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 5000;
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
// ============= client B publish ================
static const int intervalB = 3000; // will send topic each 5000 ms
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic);
}
}

View File

@@ -16,7 +16,6 @@ void setup()
WiFi.begin(ssid, password); WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial << '.'; Serial << '.';
delay(500); delay(500);
} }

View File

@@ -0,0 +1,32 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
const char *ssid = ; // Put here your wifi SSID ("ssid")
const char *password = ; // Put here your Wifi password ("pwd")
#define PORT 1883
MqttBroker broker(PORT);
void setup()
{
Serial.begin(115200);
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial << '.';
delay(500);
}
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
Serial << "Broker ready : " << WiFi.localIP() << " on port " << PORT << endl;
}
void loop()
{
broker.loop();
}

View File

@@ -3,16 +3,23 @@
####################################### #######################################
####################################### #######################################
# Datatypes (KEYWORD1) # Datatypes and functions
####################################### #######################################
TinyMqtt KEYWORD1
MqttBroker KEYWORD1 MqttBroker KEYWORD1
begin KEYWORD2
loop KEYWORD2
MqttClient KEYWORD1 MqttClient KEYWORD1
publish KEYWORD2
setCallback KEYWORD2
subscribe KEYWORD2
####################################### Topic KEYWORD1
# Methods and Functions (KEYWORD2) matches KEYWORD2
####################################### c_str KEYWORD2
####################################### #######################################
# Constants (LITERAL1) # Constants (LITERAL1)

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.1", "version": "0.2",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,5 +1,5 @@
name=TinyMqtt name=TinyMqtt
version=0.1 version=0.2.0
author=HSaturn <hsaturn@gmail.com> author=HSaturn <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com> maintainer=HSaturn <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
@@ -7,3 +7,4 @@ paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*
includes=TinyMqtt.h

View File

@@ -15,20 +15,29 @@ MqttBroker::MqttBroker(uint16_t port)
{ {
} }
MqttCnx::MqttCnx(MqttBroker* parent, WiFiClient& new_client) MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent), : parent(parent),
mqtt_connected(false) mqtt_connected(false)
{ {
client = new_client ? new WiFiClient(new_client) : nullptr; client = new_client ? new WiFiClient(new_client) : nullptr;
clientAlive(); alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
MqttCnx::~MqttCnx() MqttClient::MqttClient(MqttBroker* parent)
: parent(parent)
{
client = nullptr;
parent->addClient(this);
}
MqttClient::~MqttClient()
{ {
close(); close();
parent->removeClient(this);
} }
void MqttCnx::close() void MqttClient::close()
{ {
if (client) if (client)
{ {
@@ -38,13 +47,32 @@ void MqttCnx::close()
} }
} }
void MqttBroker::addClient(MqttClient* client)
{
clients.push_back(client);
}
void MqttBroker::removeClient(MqttClient* remove)
{
for(auto it=clients.begin(); it!=clients.end(); it++)
{
auto client=*it;
if (client==remove)
{
clients.erase(it);
return;
}
}
Serial << "Error cannot remove client" << endl; // TODO should not occur
}
void MqttBroker::loop() void MqttBroker::loop()
{ {
WiFiClient client = server.available(); WiFiClient client = server.available();
if (client) if (client)
{ {
clients.push_back(new MqttCnx(this, client)); addClient(new MqttClient(this, client));
Serial << "New client (" << clients.size() << ')' << endl; Serial << "New client (" << clients.size() << ')' << endl;
} }
@@ -58,17 +86,36 @@ void MqttBroker::loop()
else else
{ {
Serial << "Client " << client->id().c_str() << " Disconnected" << endl; Serial << "Client " << client->id().c_str() << " Disconnected" << endl;
clients.erase(it); // Note: deleting a client not added by the broker itself will probably crash later.
delete client; delete client;
break; break;
} }
} }
} }
void MqttBroker::publish(const Topic& topic, MqttMessage& msg) // Should be called for inside and outside incoming publishes (all)
void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{ {
for(auto client: clients) for(auto client: clients)
client->publish(topic, msg); {
bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6
{
if (!client->isLocal()) // R2 go outside allowed
doit = true;
else // R3 any client to outside allowed
doit = true;
}
else // Disconnected: R3 R4 R5
{
if (!source->isLocal()) // R3
doit = true;
else if (client->isLocal()) // R4 local -> local
doit = true;
}
if (doit) client->publish(topic, msg); // goes outside R2
}
} }
bool MqttBroker::compareString( bool MqttBroker::compareString(
@@ -87,7 +134,7 @@ void MqttMessage::getString(char* &buffer, uint16_t& len)
buffer+=2; buffer+=2;
} }
void MqttCnx::clientAlive() void MqttClient::clientAlive()
{ {
if (keep_alive) if (keep_alive)
{ {
@@ -97,7 +144,7 @@ void MqttCnx::clientAlive()
alive=0; alive=0;
} }
void MqttCnx::loop() void MqttClient::loop()
{ {
if (alive && (millis() > alive)) if (alive && (millis() > alive))
{ {
@@ -115,7 +162,7 @@ void MqttCnx::loop()
} }
} }
void MqttCnx::processMessage() void MqttClient::processMessage()
{ {
std::string error; std::string error;
std::string s; std::string s;
@@ -186,7 +233,7 @@ void MqttCnx::processMessage()
message.getString(payload, len); // Topic message.getString(payload, len); // Topic
outstring("Subscribes", payload, len); outstring("Subscribes", payload, len);
subscriptions.insert(Topic(payload, len)); subscribe(Topic(payload, len));
bclose = false; bclose = false;
// TODO SUBACK // TODO SUBACK
break; break;
@@ -205,7 +252,7 @@ void MqttCnx::processMessage()
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
parent->publish(published, message); parent->publish(this, published, message);
// TODO should send PUBACK // TODO should send PUBACK
bclose = false; bclose = false;
} }
@@ -241,15 +288,36 @@ bool Topic::matches(const Topic& topic) const
return false; return false;
} }
void MqttCnx::publish(const Topic& topic, MqttMessage& msg) // publish from local client
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{
message.create(MqttMessage::Publish);
message.add(topic);
message.add(payload, pay_length);
if (parent)
parent->publish(this, topic, message);
else if (client)
publish(topic, message);
else
Serial << " Should not happen" << endl;
}
// republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg)
{ {
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
if (subscription.matches(topic)) if (subscription.matches(topic))
{ {
// Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl; if (client)
{
msg.sendTo(this); msg.sendTo(this);
} }
else if (callback)
{
callback(topic, nullptr, 0); // TODO
}
}
} }
} }
@@ -311,6 +379,11 @@ void MqttMessage::incoming(char in_byte)
} }
} }
void MqttMessage::add(const char* p, size_t len)
{
while(len--) incoming(*p);
}
void MqttMessage::encodeLength(char* msb, int length) void MqttMessage::encodeLength(char* msb, int length)
{ {
do do
@@ -322,7 +395,7 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
void MqttMessage::sendTo(MqttCnx* client) void MqttMessage::sendTo(MqttClient* client)
{ {
if (curr-buffer-2 >= 0) if (curr-buffer-2 >= 0)
{ {

View File

@@ -11,11 +11,14 @@ class Topic : public IndexedString
public: public:
Topic(const char* s, uint8_t len) : IndexedString(s,len){} Topic(const char* s, uint8_t len) : IndexedString(s,len){}
Topic(const char* s) : Topic(s, strlen(s)) {} Topic(const char* s) : Topic(s, strlen(s)) {}
Topic(const std::string s) : Topic(s.c_str(), s.length()){};
const char* c_str() const { return str().c_str(); }
bool matches(const Topic&) const; bool matches(const Topic&) const;
}; };
class MqttCnx; class MqttClient;
class MqttMessage class MqttMessage
{ {
public: public:
@@ -45,6 +48,9 @@ class MqttMessage
MqttMessage(Type t) { create(t); } MqttMessage(Type t) { create(t); }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len);
void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; } char* getVHeader() const { return vheader; }
char* end() const { return curr; } char* end() const { return curr; }
uint16_t length() const { return curr-buffer; } uint16_t length() const { return curr-buffer; }
@@ -69,7 +75,7 @@ class MqttMessage
size=0; size=0;
state=Create; state=Create;
} }
void sendTo(MqttCnx*); void sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
@@ -83,8 +89,9 @@ class MqttMessage
}; };
class MqttBroker; class MqttBroker;
class MqttCnx class MqttClient
{ {
using CallBack = void (*)(const Topic& topic, const char* payload, size_t payload_length);
enum Flags enum Flags
{ {
FlagUserName = 128, FlagUserName = 128,
@@ -96,11 +103,11 @@ class MqttCnx
FlagReserved = 1 FlagReserved = 1
}; };
public: public:
MqttCnx(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker*);
~MqttCnx(); ~MqttClient();
bool connected() { return client && client->connected(); } bool connected() { return client==nullptr || client->connected(); }
void write(const char* buf, size_t length) void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); } { if (client) client->write(buf, length); }
@@ -108,9 +115,24 @@ class MqttCnx
void loop(); void loop();
void close(); void close();
void publish(const Topic& topic, MqttMessage& msg); void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());}
void publish(const Topic& t) { publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); }
void unsubscribe(Topic& topic);
bool isLocal() const { return client==nullptr; }
private: private:
friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg);
void clientAlive(); void clientAlive();
void processMessage(); void processMessage();
@@ -118,43 +140,75 @@ class MqttCnx
uint32_t keep_alive; uint32_t keep_alive;
uint32_t alive; uint32_t alive;
bool mqtt_connected; bool mqtt_connected;
WiFiClient* client; WiFiClient* client; // nullptr if this client is local
MqttMessage message; MqttMessage message;
MqttBroker* parent; MqttBroker* parent;
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback;
}; };
class MqttClient /***********************************************
{ * R1 - accept external cnx
public: * R2 - allows all clients pusblish to go outside
MqttClient(IPAddress broker) : broker_ip(broker) {} * R3 - allows ext publish to all clients
* R4 - allows local publish to local clients
protected: * R5 - tries to connect elsewhere (*)
IPAddress broker_ip; * R6 - disconnect external clients
}; * ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* =============================================+
* | connected | not connected |
* -------------+---------------+---------------+
* proxy broker | R2 R3 R5 R6 | R3 R4 R5 |
* normal broker| R2 R3 R5 R6 | R1 R3 R4 R5 |
* -------------+---------------+---------------+
*
*/
class MqttBroker class MqttBroker
{ {
enum State
{
Disconnected, // Also the initial state
Connecting, // connect and sends a fake publish to avoid circular cnx
Connected, // this->broker is connected and circular cnx avoided
};
public: public:
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
void begin() { server.begin(); } void begin() { server.begin(); }
void loop(); void loop();
uint8_t port() const { return server.port(); }
void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; }
private:
friend class MqttClient;
bool checkUser(const char* user, uint8_t len) const bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); } { return compareString(auth_user, user, len); }
bool checkPassword(const char* password, uint8_t len) const bool checkPassword(const char* password, uint8_t len) const
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
void publish(const Topic& topic, MqttMessage& msg);
private: void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
// For clients that are added not by the broker itself
void addClient(MqttClient* client);
void removeClient(MqttClient* client);
bool compareString(const char* good, const char* str, uint8_t str_len) const; bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttCnx*> clients; std::vector<MqttClient*> clients;
WiFiServer server; WiFiServer server;
const char* auth_user = "guest"; const char* auth_user = "guest";
const char* auth_password = "guest"; const char* auth_password = "guest";
State state = Disconnected;
MqttClient* broker = nullptr;
}; };