Compare commits

..

12 Commits
0.1.0 ... 0.3.0

Author SHA1 Message Date
hsaturn
428eb51850 Release 0.3.0
clients can now connect to outside.
bug fixed for broker (pings etc.)
crashes fixed when clients where removed
More examples added (the tinymqtt-test is great)
2021-03-21 13:50:42 +01:00
hsaturn
4e629bbc1e Update README.md 2021-03-21 12:26:02 +01:00
hsaturn
389a2eec8b Update README.md 2021-03-21 12:20:11 +01:00
hsaturn
6ff31c9820 Readme rewritten 2021-03-21 12:09:52 +01:00
hsaturn
dd44a4a658 Readme rewritten 2021-03-21 12:08:12 +01:00
hsaturn
17fabeae79 Readme rewritten 2021-03-21 11:50:23 +01:00
hsaturn
d052f6b55a Credentials added 2021-03-21 11:15:28 +01:00
hsaturn
6a80b29fd3 [broker] fix timeout on external client 2021-03-19 22:30:58 +01:00
hsaturn
cc708cdf22 Example when wifi is not connected 2021-03-19 22:04:23 +01:00
hsaturn
132fc56803 Update README.md 2021-03-19 22:01:02 +01:00
hsaturn
b33c9ba687 Version 0.2 2021-03-19 19:02:40 +01:00
hsaturn
bb2a2e6737 Added includes 2021-03-16 23:53:52 +01:00
12 changed files with 919 additions and 82 deletions

View File

@@ -1,2 +1,57 @@
# TinyMqtt # TinyMqtt
ESP 8266 Small footprint Mqtt Broker and Client
![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/paltform-ESP8266-green)
![](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small and very capable Mqtt Broker and Client
## Features
- Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode)
- Brokers can connect to another broker and becomes then a
proxy for clients that are connected to it.
- zeroconf, this is a strange but very powerful mode where
all brokers tries to connect together on the same local network.
## TODO List
* Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)
* Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed.
## Quickstart
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)
## Examples
| Example | Description |
| ---------------------------- | --------------------------------- |
| client-without-wifi | standalone example |
| simple-client | Connect the ESP to an external Mqtt broker |
| simple-broker | Simple Mqtt broker with your ESP |
| tinymqtt-test | Complex console example |
- tinymqtt-test : This is a complex sketch with a terminal console
that allows to add clients publish, connect etc with interpreted commands.
## Standalone mode (zeroconf)
-> The zeroconf mode is not yet implemented
zerofonf clients to connect to broker on local network.
In Zeroconf mode, each ESP is a a broker and scans the local network.
After a while one ESP naturally becomes a 'master' and all ESP are connected together.
No problem if the master dies, a new master will be choosen soon.
## License
Gnu GPL 3.0, see [LICENSE](https://github.com/hsaturn/TinyMqtt/blob/main/LICENSE).

View File

@@ -0,0 +1,90 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
// TODO should be renamed to most-complete setup
/**
* Local broker that accept connections
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/
#include <my_credentials.h>
std::string topic="sensor/temperature";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
mqtt_b.setCallback(onPublishB);
mqtt_b.subscribe(topic);
}
void loop()
{
broker.loop();
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 50000;
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
// ============= client B publish ================
static const int intervalB = 30000; // will send topic each 5000 ms
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str()));
}
}

View File

@@ -0,0 +1,64 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** TinyMQTT allows a disconnected mode:
*
* In this example, local clients A and B are talking together, no need to be connected.
* A single ESP can use this to be able to comunicate with itself with the power
* of MQTT, and once connected still continue to work with others.
*
*/
std::string topic="sensor/temperature";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "init" << endl;
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
mqtt_b.setCallback(onPublishB);
mqtt_b.subscribe(topic);
}
void loop()
{
broker.loop();
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 5000;
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
// ============= client B publish ================
static const int intervalB = 3000; // will send topic each 5000 ms
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic);
}
}

View File

@@ -1,9 +1,7 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming #include <Streaming.h> // https://github.com/janelia-arduino/Streaming
const char *ssid = ; // Put here your wifi SSID ("ssid") #include <my_credentials.h>
const char *password = ; // Put here your Wifi password ("pwd")
#define PORT 1883 #define PORT 1883
MqttBroker broker(PORT); MqttBroker broker(PORT);
@@ -16,7 +14,6 @@ void setup()
WiFi.begin(ssid, password); WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial << '.'; Serial << '.';
delay(500); delay(500);
} }

View File

@@ -0,0 +1,36 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** Simple Client
*
* This is the simplest Mqtt client configuration
*
* pro - small memory footprint (both ram and flash)
* - very simple to setup and use
*
* cons - cannot work without wifi connection
* - stop working if broker is down
* - local publishes takes more time (because they go outside)
*/
#include <my_credentials.h>
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Simple clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ delay(500); Serial << '.'; }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
}
void loop()
{
}

View File

@@ -0,0 +1,326 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <map>
/**
* Local broker that accept connections
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/
#include <my_credentials.h>
std::string topic="sensor/temperature";
MqttBroker broker(1883);
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << endl << endl << endl
<< "Demo started. Type help for more..." << endl
<< "Connecting to '" << ssid << "' ";
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
}
std::string getword(std::string& str, const char* if_empty=nullptr)
{
std::string sword;
while(str.length() && str[0]!=' ')
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return sword;
}
class automatic
{
public:
automatic(MqttClient* clt, uint32_t intervl)
: client(clt), topic_(::topic)
{
interval(intervl);
autos[clt] = this;
}
void interval(uint32_t new_interval)
{
interval_ = new_interval;
if (interval_<1000) interval_=1000;
timer_ = millis() + interval_;
}
void loop_()
{
if (!bon) return;
if (interval_ && millis() > timer_)
{
Serial << "AUTO PUBLISH " << interval_ << endl;
timer_ += interval_;
client->publish(topic_, std::string(String(15+millis()%10).c_str()));
}
}
void topic(std::string new_topic) { topic_ = new_topic; }
static void loop()
{
for(auto it: autos)
it.second->loop_();
}
static void command(MqttClient* who, std::string cmd)
{
automatic* autop = nullptr;
if (autos.find(who) != autos.end())
{
autop=autos[who];
}
std::string s = getword(cmd);
if (compare(s, "create"))
{
std::string seconds=getword(cmd, "10000");
if (autop) delete autop;
std::string top = getword(cmd, ::topic.c_str());
autos[who] = new automatic(who, atol(seconds.c_str()));
autos[who]->topic(top);
autos[who]->bon=true;
Serial << "New auto (" << seconds.c_str() << " topic:" << top.c_str() << ')' << endl;
}
else if (autop)
{
while(s.length())
{
if (s=="on")
{
autop->bon = true;
autop->interval(autop->interval_);
}
else if (s=="off")
autop->bon=false;
else if (s=="interval")
{
int32_t i=atol(getword(cmd).c_str());
if (i)
autop->interval(atol(s.c_str()));
else
Serial << "Bad value" << endl;
}
else if (s=="view")
{
Serial << " automatic "
<< (int32_t)autop->client
<< " interval " << autop->interval_
<< (autop->bon ? " on" : " off") << endl;
}
else
{
Serial << "Unknown auto command (" << s.c_str() << ")" << endl;
break;
}
s=getword(cmd);
}
}
else if (who==nullptr)
{
for(auto it: autos)
command(it.first, s+' '+cmd);
}
else
Serial << "what ? (" << s.c_str() << ")" << endl;
}
static void help()
{
Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl;
}
private:
MqttClient* client;
uint32_t interval_;
uint32_t timer_;
std::string topic_;
bool bon=false;
static std::map<MqttClient*, automatic*> autos;
};
std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd)
{
if (s.length()==0 or s.length()>strlen(cmd)) return false;
return strncmp(cmd, s.c_str(), s.length())==0;
}
std::map<std::string, MqttClient*> clients;
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false)
{
std::string s=getword(cmd);
bool found = clients.find(s) != clients.end();
if (canBeNull && found==false)
{
cmd += ' ' + s;
}
if (found or canBeNull)
{
MqttClient* publish = publish = clients[s];
func(cmd, publish);
}
else
{
Serial << "client not found (" << s.c_str() << ")" << endl;
cmd="";
}
}
void loop()
{
broker.loop();
for(auto it: clients)
it.second->loop();
automatic::loop();
if (Serial.available())
{
static std::string cmd;
char c=Serial.read();
if (c==10 or c==14)
{
Serial << "------------------------------------------------------" << endl;
while(cmd.length())
{
std::string s = getword(cmd);
if (compare(s,"connect"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); });
}
else if (compare(s,"publish"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->publish(getword(cmd, topic.c_str())); });
}
else if (compare(s,"subscribe"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->subscribe(getword(cmd, topic.c_str())); });
}
else if (compare(s, "view"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->dump(); });
}
else if (compare(s, "auto"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ automatic::command(publish, cmd);
if (publish == nullptr)
cmd.clear();
}, true);
}
else if (compare(s, "new"))
{
std::string id=getword(cmd);
if (id.length())
{
MqttClient* client = new MqttClient(&broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
}
else
Serial << "missing id" << endl;
cmd+=" ls";
}
else if (compare(s, "delete"))
{
s = getword(cmd);
auto it=clients.find(s);
if (it != clients.end())
{
delete it->second;
clients.erase(it);
cmd+=" ls";
}
else
Serial << "Unknown client (" << s.c_str() << ")" << endl;
}
else if (compare(s, "ls"))
{
Serial << "main : " << clients.size() << " client/s." << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
broker.dump();
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " new/delete $id" << endl;
Serial << " connect $id [ip]" << endl;
Serial << " subscribe $id [topic]" << endl;
Serial << " publish $id [topic]" << endl;
Serial << " view $id " << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " ls" << endl;
Serial << " reset" << endl;
Serial << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
}
}
else
{
cmd=cmd+c;
}
}
}

View File

@@ -3,16 +3,23 @@
####################################### #######################################
####################################### #######################################
# Datatypes (KEYWORD1) # Datatypes and functions
####################################### #######################################
MqttBroker KEYWORD1 TinyMqtt KEYWORD1
MqttBroker KEYWORD1
begin KEYWORD2
loop KEYWORD2
MqttClient KEYWORD1 MqttClient KEYWORD1
publish KEYWORD2
setCallback KEYWORD2
subscribe KEYWORD2
####################################### Topic KEYWORD1
# Methods and Functions (KEYWORD2) matches KEYWORD2
####################################### c_str KEYWORD2
####################################### #######################################
# Constants (LITERAL1) # Constants (LITERAL1)

View File

@@ -6,7 +6,7 @@
"type": "git", "type": "git",
"url": "https://github.com/hsaturn/TinyMqtt.git" "url": "https://github.com/hsaturn/TinyMqtt.git"
}, },
"version": "0.1", "version": "0.2",
"exclude": "", "exclude": "",
"examples": "examples/*/*.ino", "examples": "examples/*/*.ino",
"frameworks": "arduino", "frameworks": "arduino",

View File

@@ -1,5 +1,5 @@
name=TinyMqtt name=TinyMqtt
version=0.1 version=0.2.0
author=HSaturn <hsaturn@gmail.com> author=HSaturn <hsaturn@gmail.com>
maintainer=HSaturn <hsaturn@gmail.com> maintainer=HSaturn <hsaturn@gmail.com>
sentence=A tiny broker and client library for MQTT messaging. sentence=A tiny broker and client library for MQTT messaging.
@@ -7,3 +7,4 @@ paragraph=MQTT is a lightweight messaging protocol ideal for small devices. This
category=Communication category=Communication
url=https://github.com/hsaturn/TinyMqtt url=https://github.com/hsaturn/TinyMqtt
architectures=* architectures=*
includes=TinyMqtt.h

View File

@@ -2,6 +2,12 @@
#include <sstream> #include <sstream>
#include <Streaming.h> #include <Streaming.h>
#if 1
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
void outstring(const char* prefix, const char*p, uint16_t len) void outstring(const char* prefix, const char*p, uint16_t len)
{ {
return; return;
@@ -10,32 +16,92 @@ void outstring(const char* prefix, const char*p, uint16_t len)
Serial << '\'' << endl; Serial << '\'' << endl;
} }
MqttBroker::MqttBroker(uint16_t port) MqttBroker::MqttBroker(uint16_t port) : server(port)
: server(port)
{ {
} }
MqttCnx::MqttCnx(MqttBroker* parent, WiFiClient& new_client) MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent), : parent(parent)
mqtt_connected(false)
{ {
client = new_client ? new WiFiClient(new_client) : nullptr; client = new_client ? new WiFiClient(new_client) : nullptr;
clientAlive(); alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
MqttCnx::~MqttCnx() MqttClient::MqttClient(MqttBroker* parent)
: parent(parent)
{
client = nullptr;
parent->addClient(this);
}
MqttClient::~MqttClient()
{ {
close(); close();
delete client;
} }
void MqttCnx::close() void MqttClient::close()
{ {
debug("close " << id().c_str());
mqtt_connected = false;
if (client) if (client)
{ {
client->stop(); client->stop();
delete client;
client = nullptr;
} }
if (parent)
{
parent->removeClient(this);
parent=nullptr;
}
}
void MqttClient::connect(std::string broker, uint16_t port)
{
debug("cnx: closing");
close();
debug("cnx: closed");
if (client) delete client;
client = new WiFiClient;
if (client->connect(broker.c_str(), port))
{
debug("cnx: connecting");
message.create(MqttMessage::Type::Connect);
message.add("MQTT",4);
message.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name
keep_alive = 1;
message.add(0x00); // keep_alive
message.add((char)keep_alive);
message.add(clientId);
debug("cnx: mqtt connecting");
message.sendTo(this);
debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0);
}
}
void MqttBroker::addClient(MqttClient* client)
{
clients.push_back(client);
}
void MqttBroker::removeClient(MqttClient* remove)
{
for(auto it=clients.begin(); it!=clients.end(); it++)
{
auto client=*it;
if (client==remove)
{
debug("Remove " << clients.size());
clients.erase(it);
debug("Client removed " << clients.size());
return;
}
}
debug("Error cannot remove client"); // TODO should not occur
} }
void MqttBroker::loop() void MqttBroker::loop()
@@ -44,31 +110,58 @@ void MqttBroker::loop()
if (client) if (client)
{ {
clients.push_back(new MqttCnx(this, client)); addClient(new MqttClient(this, client));
Serial << "New client (" << clients.size() << ')' << endl; debug("New client (" << clients.size() << ')');
} }
for(auto it=clients.begin(); it!=clients.end(); it++) // for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop
for(int i=0; i<clients.size(); i++)
{ {
auto client=*it; auto client = clients[i];
if(client->connected()) if(client->connected())
{ {
client->loop(); client->loop();
} }
else else
{ {
Serial << "Client " << client->id().c_str() << " Disconnected" << endl; debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent);
clients.erase(it); // Note: deleting a client not added by the broker itself will probably crash later.
delete client; delete client;
break; break;
} }
} }
} }
void MqttBroker::publish(const Topic& topic, MqttMessage& msg) void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{ {
debug("publish ");
int i=0;
for(auto client: clients) for(auto client: clients)
client->publish(topic, msg); {
i++;
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected();
bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6
{
// ext broker -> clients or
// or clients -> ext broker
if (source == broker) // broker -> clients
doit = true;
else // clients -> broker
broker->publish(topic, msg);
}
else // Disconnected: R7
{
// All is allowed
doit = true;
}
Serial << ", doit=" << doit << ' ';
if (doit) client->publish(topic, msg);
debug("");
}
} }
bool MqttBroker::compareString( bool MqttBroker::compareString(
@@ -87,22 +180,34 @@ void MqttMessage::getString(char* &buffer, uint16_t& len)
buffer+=2; buffer+=2;
} }
void MqttCnx::clientAlive() void MqttClient::clientAlive(uint32_t more_seconds)
{ {
if (keep_alive) if (keep_alive)
{ {
alive=millis()+1000*(keep_alive+5); alive=millis()+1000*(keep_alive+more_seconds);
} }
else else
alive=0; alive=0;
} }
void MqttCnx::loop() void MqttClient::loop()
{ {
if (alive && (millis() > alive)) if (alive && (millis() > alive))
{ {
Serial << "timeout client" << endl; if (parent)
close(); {
debug("timeout client");
close();
}
else
{
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local browser
// there is no need to send one PingReq per instance.
}
} }
while(client && client->available()>0) while(client && client->available()>0)
@@ -115,7 +220,7 @@ void MqttCnx::loop()
} }
} }
void MqttCnx::processMessage() void MqttClient::processMessage()
{ {
std::string error; std::string error;
std::string s; std::string s;
@@ -128,19 +233,45 @@ void MqttCnx::processMessage()
switch(message.type() & 0XF0) switch(message.type() & 0XF0)
{ {
case MqttMessage::Type::Connect: case MqttMessage::Type::Connect:
if (mqtt_connected) break; if (mqtt_connected)
{
debug("already connected");
break;
}
payload = header+10; payload = header+10;
flags = header[7]; mqtt_flags = header[7];
keep_alive = (header[8]<<8)|(header[9]); keep_alive = (header[8]<<8)|(header[9]);
if (strncmp("MQTT", header+2,4)) break; if (strncmp("MQTT", header+2,4))
if (header[6]!=0x04) break; // Level 3.1.1 {
debug("bad mqtt header");
break;
}
if (header[6]!=0x04)
{
debug("unknown level");
break; // Level 3.1.1
}
// ClientId // ClientId
message.getString(payload, len); message.getString(payload, len);
debug("client id len=" << len);
if (len>30)
{
Serial << '(';
for(int i=0; i<30; i++)
{
if (i%5==0) Serial << ' ';
char c=*(header+i);
Serial << (c < 32 ? '.' : c);
}
Serial << " )" << endl;
debug("Bad client id length");
break;
}
clientId = std::string(payload, len); clientId = std::string(payload, len);
payload += len; payload += len;
if (flags & FlagWill) // Will topic if (mqtt_flags & FlagWill) // Will topic
{ {
message.getString(payload, len); // Will Topic message.getString(payload, len); // Will Topic
outstring("WillTopic", payload, len); outstring("WillTopic", payload, len);
@@ -150,13 +281,14 @@ void MqttCnx::processMessage()
outstring("WillMessage", payload, len); outstring("WillMessage", payload, len);
payload += len; payload += len;
} }
if (flags & FlagUserName) // FIXME forgetting credential is allowed (security hole)
if (mqtt_flags & FlagUserName)
{ {
message.getString(payload, len); message.getString(payload, len);
if (!parent->checkUser(payload, len)) break; if (!parent->checkUser(payload, len)) break;
payload += len; payload += len;
} }
if (flags & FlagPassword) if (mqtt_flags & FlagPassword)
{ {
message.getString(payload, len); message.getString(payload, len);
if (!parent->checkPassword(payload, len)) break; if (!parent->checkPassword(payload, len)) break;
@@ -174,10 +306,17 @@ void MqttCnx::processMessage()
break; break;
case MqttMessage::Type::PingReq: case MqttMessage::Type::PingReq:
message.create(MqttMessage::Type::PingResp); if (!mqtt_connected) break;
message.add(0); if (client)
message.sendTo(this); {
bclose = false; uint16_t pingreq = MqttMessage::Type::PingResp;
client->write((uint8_t*)(&pingreq), 2);
bclose = false;
}
else
{
debug("internal pingreq ?");
}
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
@@ -186,7 +325,7 @@ void MqttCnx::processMessage()
message.getString(payload, len); // Topic message.getString(payload, len); // Topic
outstring("Subscribes", payload, len); outstring("Subscribes", payload, len);
subscriptions.insert(Topic(payload, len)); subscribe(Topic(payload, len));
bclose = false; bclose = false;
// TODO SUBACK // TODO SUBACK
break; break;
@@ -205,7 +344,8 @@ void MqttCnx::processMessage()
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
parent->publish(published, message); debug("publishing to parent");
parent->publish(this, published, message);
// TODO should send PUBACK // TODO should send PUBACK
bclose = false; bclose = false;
} }
@@ -229,7 +369,7 @@ void MqttCnx::processMessage()
} }
else else
{ {
clientAlive(); clientAlive(5);
} }
message.reset(); message.reset();
} }
@@ -241,15 +381,40 @@ bool Topic::matches(const Topic& topic) const
return false; return false;
} }
void MqttCnx::publish(const Topic& topic, MqttMessage& msg) // publish from local client
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{ {
message.create(MqttMessage::Publish);
message.add(topic);
message.add(payload, pay_length);
if (parent)
parent->publish(this, topic, message);
else if (client)
publish(topic, message);
else
Serial << " Should not happen" << endl;
}
// republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg)
{
debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions) for(const auto& subscription: subscriptions)
{ {
Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ';
if (subscription.matches(topic)) if (subscription.matches(topic))
{ {
// Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl; Serial << " match/send";
msg.sendTo(this); if (client)
{
msg.sendTo(this);
}
else if (callback)
{
callback(this, topic, nullptr, 0); // TODO
}
} }
Serial << endl;
} }
} }
@@ -306,11 +471,18 @@ void MqttMessage::incoming(char in_byte)
} }
if (curr-buffer > 250) if (curr-buffer > 250)
{ {
Serial << "Too much incoming bytes." << endl; debug("Spurious byte " << _HEX(in_byte));
curr=buffer; curr=buffer;
} }
} }
void MqttMessage::add(const char* p, size_t len)
{
incoming(len>>8);
incoming(len & 0xFF);
while(len--) incoming(*p++);
}
void MqttMessage::encodeLength(char* msb, int length) void MqttMessage::encodeLength(char* msb, int length)
{ {
do do
@@ -322,7 +494,7 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
void MqttMessage::sendTo(MqttCnx* client) void MqttMessage::sendTo(MqttClient* client)
{ {
if (curr-buffer-2 >= 0) if (curr-buffer-2 >= 0)
{ {

View File

@@ -11,11 +11,14 @@ class Topic : public IndexedString
public: public:
Topic(const char* s, uint8_t len) : IndexedString(s,len){} Topic(const char* s, uint8_t len) : IndexedString(s,len){}
Topic(const char* s) : Topic(s, strlen(s)) {} Topic(const char* s) : Topic(s, strlen(s)) {}
Topic(const std::string s) : Topic(s.c_str(), s.length()){};
const char* c_str() const { return str().c_str(); }
bool matches(const Topic&) const; bool matches(const Topic&) const;
}; };
class MqttCnx; class MqttClient;
class MqttMessage class MqttMessage
{ {
public: public:
@@ -45,6 +48,9 @@ class MqttMessage
MqttMessage(Type t) { create(t); } MqttMessage(Type t) { create(t); }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len);
void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; } char* getVHeader() const { return vheader; }
char* end() const { return curr; } char* end() const { return curr; }
uint16_t length() const { return curr-buffer; } uint16_t length() const { return curr-buffer; }
@@ -69,13 +75,13 @@ class MqttMessage
size=0; size=0;
state=Create; state=Create;
} }
void sendTo(MqttCnx*); void sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
void encodeLength(char* msb, int length); void encodeLength(char* msb, int length);
char buffer[256]; // TODO 256 ? char buffer[256]; // TODO why 256 ? (should be replaced by a std::string)
char* vheader; char* vheader;
char* curr; char* curr;
uint16_t size; // bytes left to receive uint16_t size; // bytes left to receive
@@ -83,78 +89,159 @@ class MqttMessage
}; };
class MqttBroker; class MqttBroker;
class MqttCnx class MqttClient
{ {
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
enum Flags enum Flags
{ {
FlagUserName = 128, FlagUserName = 128,
FlagPassword = 64, FlagPassword = 64,
FlagWillRetain = 32, // unsupported FlagWillRetain = 32, // unsupported
FlagWillQos = 16 | 8, // unsupported FlagWillQos = 16 | 8, // unsupported
FlagWill = 4, // unsupported FlagWill = 4, // unsupported
FlagCleanSession = 2, // unsupported FlagCleanSession = 2, // unsupported
FlagReserved = 1 FlagReserved = 1
}; };
public: public:
MqttCnx(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker*);
~MqttCnx(); ~MqttClient();
bool connected() { return client && client->connected(); } void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port);
bool connected() { return client==nullptr || client->connected(); }
void write(const char* buf, size_t length) void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); } { if (client) client->write(buf, length); }
const std::string& id() const { return clientId; } const std::string& id() const { return clientId; }
void id(std::string& new_id) { clientId = new_id; }
void loop(); void loop();
void close(); void close();
void publish(const Topic& topic, MqttMessage& msg); void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());}
void publish(const Topic& t) { publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); }
void unsubscribe(Topic& topic);
// connected to local broker
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
void dump()
{
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << " [";
bool c=false;
for(auto s: subscriptions)
{
Serial << (c?", ": "")<< s.str().c_str();
c=true;
}
Serial << "]" << endl;
}
private: private:
void clientAlive(); friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg);
void clientAlive(uint32_t more_seconds);
void processMessage(); void processMessage();
char flags; bool mqtt_connected = false;
char mqtt_flags;
uint32_t keep_alive; uint32_t keep_alive;
uint32_t alive; uint32_t alive;
bool mqtt_connected;
WiFiClient* client;
MqttMessage message; MqttMessage message;
MqttBroker* parent; MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr;
}; };
class MqttClient /***********************************************
{ * R1 - accept external cnx
public: * R2 - allows all clients pusblish to go outside
MqttClient(IPAddress broker) : broker_ip(broker) {} * R3 - allows ext publish to all clients
* R4 - allows local publish to local clients
protected: * R5 - tries to connect elsewhere (*)
IPAddress broker_ip; * R6 - disconnect external clients
}; * R7 - allows all publish to go everywhere
* ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* ================================================+
* | connected | not connected |
* -------------+---------------+------------------+
* proxy broker | R2 R3 R5 R6 | R5 R7 |
* normal broker| R2 R3 R5 R6 | R1 R5 R7 |
* -------------+---------------+------------------+
*
*/
class MqttBroker class MqttBroker
{ {
enum State
{
Disconnected, // Also the initial state
Connecting, // connect and sends a fake publish to avoid circular cnx
Connected, // this->broker is connected and circular cnx avoided
};
public: public:
// TODO limit max number of clients
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
void begin() { server.begin(); } void begin() { server.begin(); }
void loop(); void loop();
uint8_t port() const { return server.port(); }
void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; }
void dump()
{
Serial << "broker: " << clients.size() << " client/s" << endl;
for(auto client: clients)
{
Serial << " ";
client->dump();
}
}
private:
friend class MqttClient;
bool checkUser(const char* user, uint8_t len) const bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); } { return compareString(auth_user, user, len); }
bool checkPassword(const char* password, uint8_t len) const bool checkPassword(const char* password, uint8_t len) const
{ return compareString(auth_password, password, len); } { return compareString(auth_password, password, len); }
void publish(const Topic& topic, MqttMessage& msg);
private: void publish(const MqttClient* source, const Topic& topic, MqttMessage& msg);
// For clients that are added not by the broker itself
void addClient(MqttClient* client);
void removeClient(MqttClient* client);
bool compareString(const char* good, const char* str, uint8_t str_len) const; bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttCnx*> clients; std::vector<MqttClient*> clients;
WiFiServer server; WiFiServer server;
const char* auth_user = "guest"; const char* auth_user = "guest";
const char* auth_password = "guest"; const char* auth_password = "guest";
State state = Disconnected;
MqttClient* broker = nullptr;
}; };

2
src/my_credentials.h Normal file
View File

@@ -0,0 +1,2 @@
const char *ssid = "YOUR-SSID-HERE";
const char *password = "YOUR-PASSWORD-HERE";