Compare commits

...

8 Commits
0.2.0 ... 0.3.0

Author SHA1 Message Date
hsaturn
428eb51850 Release 0.3.0
clients can now connect to outside.
bug fixed for broker (pings etc.)
crashes fixed when clients where removed
More examples added (the tinymqtt-test is great)
2021-03-21 13:50:42 +01:00
hsaturn
4e629bbc1e Update README.md 2021-03-21 12:26:02 +01:00
hsaturn
389a2eec8b Update README.md 2021-03-21 12:20:11 +01:00
hsaturn
6ff31c9820 Readme rewritten 2021-03-21 12:09:52 +01:00
hsaturn
dd44a4a658 Readme rewritten 2021-03-21 12:08:12 +01:00
hsaturn
17fabeae79 Readme rewritten 2021-03-21 11:50:23 +01:00
hsaturn
d052f6b55a Credentials added 2021-03-21 11:15:28 +01:00
hsaturn
6a80b29fd3 [broker] fix timeout on external client 2021-03-19 22:30:58 +01:00
8 changed files with 679 additions and 84 deletions

View File

@@ -1,14 +1,57 @@
# TinyMqtt
ESP 8266 is a very capable Mqtt Broker and Client
Here are the features
- mqtt client can Works without WiFi (local mode) in a unique ESP
Thus, publishes and subscribes are possible and allows
minimal (degraded) function of a single module.
- broker can connect to another broker and becomes then a
![](https://img.shields.io/github/v/release/hsaturn/TinyMqtt)
![](https://img.shields.io/github/issues/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/paltform-ESP8266-green)
![](https://img.shields.io/github/license/hsaturn/TinyMqtt)
![](https://img.shields.io/badge/Mqtt-%203.1.1-yellow)
ESP 8266 is a small and very capable Mqtt Broker and Client
## Features
- Act as as a mqtt broker and/or a mqtt client
- Mqtt 3.1.1 / Qos 0 supported
- Standalone (can work without WiFi) (degraded/local mode)
- Brokers can connect to another broker and becomes then a
proxy for clients that are connected to it.
- zeroconf, this is a strange but very powerful mode
where each ESP is a a broker and scans the local network.
After a while one ESP becomes the 'master'
and all ESP are connected together. The master can die
whithout breaking the system.
- zeroconf, this is a strange but very powerful mode where
all brokers tries to connect together on the same local network.
## TODO List
* Use [Async library](https://github.com/me-no-dev/ESPAsyncTCP)
* Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed.
## Quickstart
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)
## Examples
| Example | Description |
| ---------------------------- | --------------------------------- |
| client-without-wifi | standalone example |
| simple-client | Connect the ESP to an external Mqtt broker |
| simple-broker | Simple Mqtt broker with your ESP |
| tinymqtt-test | Complex console example |
- tinymqtt-test : This is a complex sketch with a terminal console
that allows to add clients publish, connect etc with interpreted commands.
## Standalone mode (zeroconf)
-> The zeroconf mode is not yet implemented
zerofonf clients to connect to broker on local network.
In Zeroconf mode, each ESP is a a broker and scans the local network.
After a while one ESP naturally becomes a 'master' and all ESP are connected together.
No problem if the master dies, a new master will be choosen soon.
## License
Gnu GPL 3.0, see [LICENSE](https://github.com/hsaturn/TinyMqtt/blob/main/LICENSE).

View File

@@ -0,0 +1,90 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
// TODO should be renamed to most-complete setup
/**
* Local broker that accept connections
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/
#include <my_credentials.h>
std::string topic="sensor/temperature";
MqttBroker broker(1883);
MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) { Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic);
mqtt_b.setCallback(onPublishB);
mqtt_b.subscribe(topic);
}
void loop()
{
broker.loop();
mqtt_a.loop();
mqtt_b.loop();
// ============= client A publish ================
static const int intervalA = 50000;
static uint32_t timerA = millis() + intervalA;
if (millis() > timerA)
{
Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA;
mqtt_a.publish(topic);
}
// ============= client B publish ================
static const int intervalB = 30000; // will send topic each 5000 ms
static uint32_t timerB = millis() + intervalB;
if (millis() > timerB)
{
Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB;
mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str()));
}
}

View File

@@ -1,9 +1,7 @@
#include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
const char *ssid = ; // Put here your wifi SSID ("ssid")
const char *password = ; // Put here your Wifi password ("pwd")
#include <my_credentials.h>
#define PORT 1883
MqttBroker broker(PORT);

View File

@@ -2,31 +2,35 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
const char *ssid = ; // Put here your wifi SSID ("ssid")
const char *password = ; // Put here your Wifi password ("pwd")
/** Simple Client
*
* This is the simplest Mqtt client configuration
*
* pro - small memory footprint (both ram and flash)
* - very simple to setup and use
*
* cons - cannot work without wifi connection
* - stop working if broker is down
* - local publishes takes more time (because they go outside)
*/
#define PORT 1883
MqttBroker broker(PORT);
#include <my_credentials.h>
void setup()
{
Serial.begin(115200);
delay(500);
Serial << "Simple clients with wifi " << endl;
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial << '.';
delay(500);
}
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
while (WiFi.status() != WL_CONNECTED)
{ delay(500); Serial << '.'; }
broker.begin();
Serial << "Broker ready : " << WiFi.localIP() << " on port " << PORT << endl;
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
}
void loop()
{
broker.loop();
}

View File

@@ -0,0 +1,326 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <map>
/**
* Local broker that accept connections
*
* pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic
* - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients
*
* cons - Takes more memory
* - a bit hard to understand
*
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/
#include <my_credentials.h>
std::string topic="sensor/temperature";
MqttBroker broker(1883);
void onPublish(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> " << srce->id().c_str() << ": ======> received " << topic.c_str() << endl; }
void setup()
{
Serial.begin(115200);
delay(500);
Serial << endl << endl << endl
<< "Demo started. Type help for more..." << endl
<< "Connecting to '" << ssid << "' ";
WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED)
{ Serial << '-'; delay(500); }
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin();
}
std::string getword(std::string& str, const char* if_empty=nullptr)
{
std::string sword;
while(str.length() && str[0]!=' ')
{
sword += str[0]; str.erase(0,1);
}
while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) return if_empty;
return sword;
}
class automatic
{
public:
automatic(MqttClient* clt, uint32_t intervl)
: client(clt), topic_(::topic)
{
interval(intervl);
autos[clt] = this;
}
void interval(uint32_t new_interval)
{
interval_ = new_interval;
if (interval_<1000) interval_=1000;
timer_ = millis() + interval_;
}
void loop_()
{
if (!bon) return;
if (interval_ && millis() > timer_)
{
Serial << "AUTO PUBLISH " << interval_ << endl;
timer_ += interval_;
client->publish(topic_, std::string(String(15+millis()%10).c_str()));
}
}
void topic(std::string new_topic) { topic_ = new_topic; }
static void loop()
{
for(auto it: autos)
it.second->loop_();
}
static void command(MqttClient* who, std::string cmd)
{
automatic* autop = nullptr;
if (autos.find(who) != autos.end())
{
autop=autos[who];
}
std::string s = getword(cmd);
if (compare(s, "create"))
{
std::string seconds=getword(cmd, "10000");
if (autop) delete autop;
std::string top = getword(cmd, ::topic.c_str());
autos[who] = new automatic(who, atol(seconds.c_str()));
autos[who]->topic(top);
autos[who]->bon=true;
Serial << "New auto (" << seconds.c_str() << " topic:" << top.c_str() << ')' << endl;
}
else if (autop)
{
while(s.length())
{
if (s=="on")
{
autop->bon = true;
autop->interval(autop->interval_);
}
else if (s=="off")
autop->bon=false;
else if (s=="interval")
{
int32_t i=atol(getword(cmd).c_str());
if (i)
autop->interval(atol(s.c_str()));
else
Serial << "Bad value" << endl;
}
else if (s=="view")
{
Serial << " automatic "
<< (int32_t)autop->client
<< " interval " << autop->interval_
<< (autop->bon ? " on" : " off") << endl;
}
else
{
Serial << "Unknown auto command (" << s.c_str() << ")" << endl;
break;
}
s=getword(cmd);
}
}
else if (who==nullptr)
{
for(auto it: autos)
command(it.first, s+' '+cmd);
}
else
Serial << "what ? (" << s.c_str() << ")" << endl;
}
static void help()
{
Serial << " auto [$id] on/off" << endl;
Serial << " auto [$id] view" << endl;
Serial << " auto [$id] create [millis] [topic]" << endl;
}
private:
MqttClient* client;
uint32_t interval_;
uint32_t timer_;
std::string topic_;
bool bon=false;
static std::map<MqttClient*, automatic*> autos;
};
std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd)
{
if (s.length()==0 or s.length()>strlen(cmd)) return false;
return strncmp(cmd, s.c_str(), s.length())==0;
}
std::map<std::string, MqttClient*> clients;
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
void clientCommand(std::string& cmd, ClientFunction func, bool canBeNull=false)
{
std::string s=getword(cmd);
bool found = clients.find(s) != clients.end();
if (canBeNull && found==false)
{
cmd += ' ' + s;
}
if (found or canBeNull)
{
MqttClient* publish = publish = clients[s];
func(cmd, publish);
}
else
{
Serial << "client not found (" << s.c_str() << ")" << endl;
cmd="";
}
}
void loop()
{
broker.loop();
for(auto it: clients)
it.second->loop();
automatic::loop();
if (Serial.available())
{
static std::string cmd;
char c=Serial.read();
if (c==10 or c==14)
{
Serial << "------------------------------------------------------" << endl;
while(cmd.length())
{
std::string s = getword(cmd);
if (compare(s,"connect"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->connect(getword(cmd,"192.168.1.40").c_str(), 1883); });
}
else if (compare(s,"publish"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->publish(getword(cmd, topic.c_str())); });
}
else if (compare(s,"subscribe"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->subscribe(getword(cmd, topic.c_str())); });
}
else if (compare(s, "view"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ publish->dump(); });
}
else if (compare(s, "auto"))
{
clientCommand(cmd, [](std::string& cmd, MqttClient* publish)
{ automatic::command(publish, cmd);
if (publish == nullptr)
cmd.clear();
}, true);
}
else if (compare(s, "new"))
{
std::string id=getword(cmd);
if (id.length())
{
MqttClient* client = new MqttClient(&broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
}
else
Serial << "missing id" << endl;
cmd+=" ls";
}
else if (compare(s, "delete"))
{
s = getword(cmd);
auto it=clients.find(s);
if (it != clients.end())
{
delete it->second;
clients.erase(it);
cmd+=" ls";
}
else
Serial << "Unknown client (" << s.c_str() << ")" << endl;
}
else if (compare(s, "ls"))
{
Serial << "main : " << clients.size() << " client/s." << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
broker.dump();
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " new/delete $id" << endl;
Serial << " connect $id [ip]" << endl;
Serial << " subscribe $id [topic]" << endl;
Serial << " publish $id [topic]" << endl;
Serial << " view $id " << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " ls" << endl;
Serial << " reset" << endl;
Serial << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
}
}
else
{
cmd=cmd+c;
}
}
}

View File

@@ -2,6 +2,12 @@
#include <sstream>
#include <Streaming.h>
#if 1
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else
#define debug(what) {}
#endif
void outstring(const char* prefix, const char*p, uint16_t len)
{
return;
@@ -10,14 +16,12 @@ void outstring(const char* prefix, const char*p, uint16_t len)
Serial << '\'' << endl;
}
MqttBroker::MqttBroker(uint16_t port)
: server(port)
MqttBroker::MqttBroker(uint16_t port) : server(port)
{
}
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent),
mqtt_connected(false)
: parent(parent)
{
client = new_client ? new WiFiClient(new_client) : nullptr;
alive = millis()+5000; // client expires after 5s if no CONNECT msg
@@ -34,16 +38,48 @@ MqttClient::MqttClient(MqttBroker* parent)
MqttClient::~MqttClient()
{
close();
parent->removeClient(this);
delete client;
}
void MqttClient::close()
{
debug("close " << id().c_str());
mqtt_connected = false;
if (client)
{
client->stop();
delete client;
client = nullptr;
}
if (parent)
{
parent->removeClient(this);
parent=nullptr;
}
}
void MqttClient::connect(std::string broker, uint16_t port)
{
debug("cnx: closing");
close();
debug("cnx: closed");
if (client) delete client;
client = new WiFiClient;
if (client->connect(broker.c_str(), port))
{
debug("cnx: connecting");
message.create(MqttMessage::Type::Connect);
message.add("MQTT",4);
message.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name
keep_alive = 1;
message.add(0x00); // keep_alive
message.add((char)keep_alive);
message.add(clientId);
debug("cnx: mqtt connecting");
message.sendTo(this);
debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0);
}
}
@@ -59,11 +95,13 @@ void MqttBroker::removeClient(MqttClient* remove)
auto client=*it;
if (client==remove)
{
debug("Remove " << clients.size());
clients.erase(it);
debug("Client removed " << clients.size());
return;
}
}
Serial << "Error cannot remove client" << endl; // TODO should not occur
debug("Error cannot remove client"); // TODO should not occur
}
void MqttBroker::loop()
@@ -73,19 +111,21 @@ void MqttBroker::loop()
if (client)
{
addClient(new MqttClient(this, client));
Serial << "New client (" << clients.size() << ')' << endl;
debug("New client (" << clients.size() << ')');
}
for(auto it=clients.begin(); it!=clients.end(); it++)
// for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop
for(int i=0; i<clients.size(); i++)
{
auto client=*it;
auto client = clients[i];
if(client->connected())
{
client->loop();
}
else
{
Serial << "Client " << client->id().c_str() << " Disconnected" << endl;
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent);
// Note: deleting a client not added by the broker itself will probably crash later.
delete client;
break;
@@ -93,28 +133,34 @@ void MqttBroker::loop()
}
}
// Should be called for inside and outside incoming publishes (all)
void MqttBroker::publish(const MqttClient* source, const Topic& topic, MqttMessage& msg)
{
debug("publish ");
int i=0;
for(auto client: clients)
{
i++;
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected();
bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6
{
if (!client->isLocal()) // R2 go outside allowed
doit = true;
else // R3 any client to outside allowed
// ext broker -> clients or
// or clients -> ext broker
if (source == broker) // broker -> clients
doit = true;
else // clients -> broker
broker->publish(topic, msg);
}
else // Disconnected: R3 R4 R5
else // Disconnected: R7
{
if (!source->isLocal()) // R3
doit = true;
else if (client->isLocal()) // R4 local -> local
doit = true;
// All is allowed
doit = true;
}
Serial << ", doit=" << doit << ' ';
if (doit) client->publish(topic, msg); // goes outside R2
if (doit) client->publish(topic, msg);
debug("");
}
}
@@ -134,11 +180,11 @@ void MqttMessage::getString(char* &buffer, uint16_t& len)
buffer+=2;
}
void MqttClient::clientAlive()
void MqttClient::clientAlive(uint32_t more_seconds)
{
if (keep_alive)
{
alive=millis()+1000*(keep_alive+5);
alive=millis()+1000*(keep_alive+more_seconds);
}
else
alive=0;
@@ -148,8 +194,20 @@ void MqttClient::loop()
{
if (alive && (millis() > alive))
{
Serial << "timeout client" << endl;
close();
if (parent)
{
debug("timeout client");
close();
}
else
{
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local browser
// there is no need to send one PingReq per instance.
}
}
while(client && client->available()>0)
@@ -175,19 +233,45 @@ void MqttClient::processMessage()
switch(message.type() & 0XF0)
{
case MqttMessage::Type::Connect:
if (mqtt_connected) break;
if (mqtt_connected)
{
debug("already connected");
break;
}
payload = header+10;
flags = header[7];
mqtt_flags = header[7];
keep_alive = (header[8]<<8)|(header[9]);
if (strncmp("MQTT", header+2,4)) break;
if (header[6]!=0x04) break; // Level 3.1.1
if (strncmp("MQTT", header+2,4))
{
debug("bad mqtt header");
break;
}
if (header[6]!=0x04)
{
debug("unknown level");
break; // Level 3.1.1
}
// ClientId
message.getString(payload, len);
debug("client id len=" << len);
if (len>30)
{
Serial << '(';
for(int i=0; i<30; i++)
{
if (i%5==0) Serial << ' ';
char c=*(header+i);
Serial << (c < 32 ? '.' : c);
}
Serial << " )" << endl;
debug("Bad client id length");
break;
}
clientId = std::string(payload, len);
payload += len;
if (flags & FlagWill) // Will topic
if (mqtt_flags & FlagWill) // Will topic
{
message.getString(payload, len); // Will Topic
outstring("WillTopic", payload, len);
@@ -197,13 +281,14 @@ void MqttClient::processMessage()
outstring("WillMessage", payload, len);
payload += len;
}
if (flags & FlagUserName)
// FIXME forgetting credential is allowed (security hole)
if (mqtt_flags & FlagUserName)
{
message.getString(payload, len);
if (!parent->checkUser(payload, len)) break;
payload += len;
}
if (flags & FlagPassword)
if (mqtt_flags & FlagPassword)
{
message.getString(payload, len);
if (!parent->checkPassword(payload, len)) break;
@@ -221,10 +306,17 @@ void MqttClient::processMessage()
break;
case MqttMessage::Type::PingReq:
message.create(MqttMessage::Type::PingResp);
message.add(0);
message.sendTo(this);
bclose = false;
if (!mqtt_connected) break;
if (client)
{
uint16_t pingreq = MqttMessage::Type::PingResp;
client->write((uint8_t*)(&pingreq), 2);
bclose = false;
}
else
{
debug("internal pingreq ?");
}
break;
case MqttMessage::Type::Subscribe:
@@ -252,6 +344,7 @@ void MqttClient::processMessage()
if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP
// TODO reset RETAIN
debug("publishing to parent");
parent->publish(this, published, message);
// TODO should send PUBACK
bclose = false;
@@ -276,7 +369,7 @@ void MqttClient::processMessage()
}
else
{
clientAlive();
clientAlive(5);
}
message.reset();
}
@@ -305,19 +398,23 @@ void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_len
// republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg)
{
debug("mqttclient publish " << subscriptions.size());
for(const auto& subscription: subscriptions)
{
Serial << " client=" << (int32_t)client << ", topic " << topic.str().c_str() << ' ';
if (subscription.matches(topic))
{
Serial << " match/send";
if (client)
{
msg.sendTo(this);
}
else if (callback)
{
callback(topic, nullptr, 0); // TODO
callback(this, topic, nullptr, 0); // TODO
}
}
Serial << endl;
}
}
@@ -374,14 +471,16 @@ void MqttMessage::incoming(char in_byte)
}
if (curr-buffer > 250)
{
Serial << "Too much incoming bytes." << endl;
debug("Spurious byte " << _HEX(in_byte));
curr=buffer;
}
}
void MqttMessage::add(const char* p, size_t len)
{
while(len--) incoming(*p);
incoming(len>>8);
incoming(len & 0xFF);
while(len--) incoming(*p++);
}
void MqttMessage::encodeLength(char* msb, int length)

View File

@@ -81,7 +81,7 @@ class MqttMessage
private:
void encodeLength(char* msb, int length);
char buffer[256]; // TODO 256 ?
char buffer[256]; // TODO why 256 ? (should be replaced by a std::string)
char* vheader;
char* curr;
uint16_t size; // bytes left to receive
@@ -91,14 +91,14 @@ class MqttMessage
class MqttBroker;
class MqttClient
{
using CallBack = void (*)(const Topic& topic, const char* payload, size_t payload_length);
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
enum Flags
{
FlagUserName = 128,
FlagPassword = 64,
FlagWillRetain = 32, // unsupported
FlagWillQos = 16 | 8, // unsupported
FlagWill = 4, // unsupported
FlagWill = 4, // unsupported
FlagCleanSession = 2, // unsupported
FlagReserved = 1
};
@@ -107,11 +107,15 @@ class MqttClient
~MqttClient();
void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port);
bool connected() { return client==nullptr || client->connected(); }
void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); }
const std::string& id() const { return clientId; }
void id(std::string& new_id) { clientId = new_id; }
void loop();
void close();
@@ -125,7 +129,24 @@ class MqttClient
void subscribe(Topic topic) { subscriptions.insert(topic); }
void unsubscribe(Topic& topic);
bool isLocal() const { return client==nullptr; }
// connected to local broker
// TODO seems to be useless
bool isLocal() const { return client == nullptr; }
void dump()
{
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << " [";
bool c=false;
for(auto s: subscriptions)
{
Serial << (c?", ": "")<< s.str().c_str();
c=true;
}
Serial << "]" << endl;
}
private:
friend class MqttBroker;
@@ -133,19 +154,19 @@ class MqttClient
// republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg);
void clientAlive();
void clientAlive(uint32_t more_seconds);
void processMessage();
char flags;
bool mqtt_connected = false;
char mqtt_flags;
uint32_t keep_alive;
uint32_t alive;
bool mqtt_connected;
WiFiClient* client; // nullptr if this client is local
MqttMessage message;
MqttBroker* parent;
MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback;
CallBack callback = nullptr;
};
/***********************************************
@@ -155,16 +176,17 @@ class MqttClient
* R4 - allows local publish to local clients
* R5 - tries to connect elsewhere (*)
* R6 - disconnect external clients
* R7 - allows all publish to go everywhere
* ---------------------------------------------
* (*) single client or ip range
* ---------------------------------------------
*
* =============================================+
* | connected | not connected |
* -------------+---------------+---------------+
* proxy broker | R2 R3 R5 R6 | R3 R4 R5 |
* normal broker| R2 R3 R5 R6 | R1 R3 R4 R5 |
* -------------+---------------+---------------+
* ================================================+
* | connected | not connected |
* -------------+---------------+------------------+
* proxy broker | R2 R3 R5 R6 | R5 R7 |
* normal broker| R2 R3 R5 R6 | R1 R5 R7 |
* -------------+---------------+------------------+
*
*/
class MqttBroker
@@ -176,6 +198,7 @@ class MqttBroker
Connected, // this->broker is connected and circular cnx avoided
};
public:
// TODO limit max number of clients
MqttBroker(uint16_t port);
void begin() { server.begin(); }
@@ -186,6 +209,16 @@ class MqttBroker
void connect(std::string host, uint32_t port=1883);
bool connected() const { return state == Connected; }
void dump()
{
Serial << "broker: " << clients.size() << " client/s" << endl;
for(auto client: clients)
{
Serial << " ";
client->dump();
}
}
private:
friend class MqttClient;

2
src/my_credentials.h Normal file
View File

@@ -0,0 +1,2 @@
const char *ssid = "YOUR-SSID-HERE";
const char *password = "YOUR-PASSWORD-HERE";