Merge pull request #6 from hsaturn/AsyncAndWifi

AsyncTcp can be activated by removing the command on TCP_ASYNC in TinyMqtt.h
But the code is not bug free yet.
This commit is contained in:
hsaturn
2021-04-11 23:33:29 +02:00
committed by GitHub
4 changed files with 499 additions and 347 deletions

View File

@@ -1,22 +1,31 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
/** /**
* Local broker that accept connections and two local clients *
* * +-----------------------------+
* | ESP |
* | +--------+ | 1883 <--- External client/s
* | +-------->| broker | | 1883 <--- External client/s
* | | +--------+ |
* | | ^ |
* | | | |
* | v v |
* | +----------+ +----------+ |
* | | internal | | internal | |
* | | client | | client | |
* | +----------+ +----------+ |
* | |
* +-----------------------------+
*
* pros - Reduces internal latency (when publish is received by the same ESP) * pros - Reduces internal latency (when publish is received by the same ESP)
* - Reduces wifi traffic * - Reduces wifi traffic
* - No need to have an external broker * - No need to have an external broker
* - can still report to a 'main' broker (TODO see documentation that have to be written) * - can still report to a 'main' broker (TODO see documentation that have to be written)
* - accepts external clients * - accepts external clients
* *
* cons - Takes more memory * cons - Takes more memory
* - a bit hard to understand * - a bit hard to understand
* *
* This sounds crazy: a mqtt mqtt that do not need a broker !
* The use case arise when one ESP wants to publish topics and subscribe to them at the same time.
* Without broker, the ESP won't react to its own topics.
*
* TinyMqtt mqtt allows this use case to work.
*/ */
#include <my_credentials.h> #include <my_credentials.h>
@@ -37,8 +46,8 @@ void onPublishB(const MqttClient* source, const Topic& topic, const char* payloa
void setup() void setup()
{ {
Serial.begin(115200); Serial.begin(115200);
delay(500); delay(500);
Serial << "Clients with wifi " << endl; Serial << "Clients with wifi " << endl;
WiFi.mode(WIFI_STA); WiFi.mode(WIFI_STA);
WiFi.begin(ssid, password); WiFi.begin(ssid, password);
@@ -47,8 +56,8 @@ void setup()
Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl;
broker.begin(); broker.begin();
mqtt_a.setCallback(onPublishA); mqtt_a.setCallback(onPublishA);
mqtt_a.subscribe(topic); mqtt_a.subscribe(topic);
@@ -58,30 +67,30 @@ void setup()
void loop() void loop()
{ {
broker.loop(); broker.loop(); // Don't forget to add loop for every broker and clients
mqtt_a.loop(); mqtt_a.loop();
mqtt_b.loop(); mqtt_b.loop();
// ============= client A publish ================ // ============= client A publish ================
static const int intervalA = 5000; // publishes every 5s static const int intervalA = 5000; // publishes every 5s
static uint32_t timerA = millis() + intervalA; static uint32_t timerA = millis() + intervalA;
if (millis() > timerA) if (millis() > timerA)
{ {
Serial << "A is publishing " << topic.c_str() << endl; Serial << "A is publishing " << topic.c_str() << endl;
timerA += intervalA; timerA += intervalA;
mqtt_a.publish(topic); mqtt_a.publish(topic);
} }
// ============= client B publish ================ // ============= client B publish ================
static const int intervalB = 7000; // will send topic each 7s static const int intervalB = 7000; // will send topic each 7s
static uint32_t timerB = millis() + intervalB; static uint32_t timerB = millis() + intervalB;
if (millis() > timerB) if (millis() > timerB)
{ {
Serial << "B is publishing " << topic.c_str() << endl; Serial << "B is publishing " << topic.c_str() << endl;
timerB += intervalB; timerB += intervalB;
mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str()));
} }
} }

View File

@@ -138,7 +138,7 @@ std::set<std::string> commands = {
"auto", "broker", "blink", "client", "connect", "auto", "broker", "blink", "client", "connect",
"create", "delete", "help", "interval", "create", "delete", "help", "interval",
"ls", "ip", "off", "on", "set", "ls", "ip", "off", "on", "set",
"publish", "reset", "subscribe", "unsubscribe", "view" "publish", "reset", "subscribe", "unsubscribe", "view", "every"
}; };
void getCommand(std::string& search) void getCommand(std::string& search)
@@ -316,22 +316,389 @@ std::map<MqttClient*, automatic*> automatic::autos;
bool compare(std::string s, const char* cmd) bool compare(std::string s, const char* cmd)
{ {
if (s.length()==0 or s.length()>strlen(cmd)) return false; uint8_t p=0;
return strncmp(cmd, s.c_str(), s.length())==0; while(s[p++]==*cmd++)
{
if (*cmd==0 or s[p]==0) return true;
if (s[p]==' ') return true;
}
return false;
} }
using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); using ClientFunction = void(*)(std::string& cmd, MqttClient* publish);
struct Every
{
std::string cmd;
uint32_t ms;
uint32_t next;
void dump()
{
auto mill=millis();
Serial << ms << "ms [" << cmd << "] next in ";
if (mill > next)
Serial << "now";
else
Serial << next-mill << "ms";
}
};
uint32_t blink_ms_on[16]; uint32_t blink_ms_on[16];
uint32_t blink_ms_off[16]; uint32_t blink_ms_off[16];
uint32_t blink_next[16]; uint32_t blink_next[16];
bool blink_state[16]; bool blink_state[16];
int16_t blink; int16_t blink;
std::vector<Every> everies;
void eval(std::string& cmd)
{
while(cmd.length())
{
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
}
else if (compare(s, "on"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, HIGH);
}
else if (compare(s, "off"))
{
uint8_t pin=getint(cmd, 2);
pinMode(pin, OUTPUT);
digitalWrite(pin, LOW);
}
else if (compare(s, "every"))
{
uint32_t ms = getint(cmd, 0);
if (ms and cmd.length())
{
Every every;
every.ms=ms;
every.cmd=cmd;
every.next=millis()+ms;
everies.push_back(every);
every.dump();
Serial << endl;
cmd="";
}
else if (ms==0 and compare(cmd, "list"))
{
getword(cmd);
Serial << "List of everies (ms=" << millis() << ")" << endl;
uint8_t count=0;
for(auto& every: everies)
{
Serial << count << ": ";
every.dump();
Serial << endl;
count++;
}
}
else if (ms==0 and compare(cmd, "remove"))
{
getword(cmd);
int8_t every=getint(cmd, -1);
if (every==-1 and compare(cmd, "all"))
{
getword(cmd);
everies.clear();
}
else if (everies.size() > (uint8_t)every)
{
everies.erase(everies.begin()+every);
}
}
}
else if (compare(s, "blink"))
{
int8_t blink_nr = getint(cmd, -1);
if (blink_nr >= 0)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< blink_nr;
else
{
blink &= ~(1<< blink_nr);
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax:" << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " every ms [command]; every list; every remove [nr|all]" << endl;
Serial << " on {output}; off {output}" << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
}
}
void loop() void loop()
{ {
auto ms=millis(); auto ms=millis();
int8_t out=0; int8_t out=0;
int16_t blink_bits = blink; int16_t blink_bits = blink;
for(auto& every: everies)
{
if (every.ms && every.cmd.length() && ms > every.next)
{
std::string cmd(every.cmd);
eval(cmd);
every.next += every.ms;
}
}
while(blink_bits) while(blink_bits)
{ {
if (blink_ms_on[out] and ms > blink_next[out]) if (blink_ms_on[out] and ms > blink_next[out])
@@ -375,7 +742,6 @@ void loop()
if (c==10 or c==14) if (c==10 or c==14)
{ {
Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl; Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl;
static std::string last_cmd; static std::string last_cmd;
if (cmd=="!") if (cmd=="!")
@@ -384,289 +750,7 @@ void loop()
last_cmd=cmd; last_cmd=cmd;
if (cmd.substr(0,3)!="set") replaceVars(cmd); if (cmd.substr(0,3)!="set") replaceVars(cmd);
eval(cmd);
while(cmd.length())
{
std::string cmd_end;
if (cmd.find(';') != std::string::npos)
{
cmd_end = cmd;
cmd = getword(cmd_end,"",';');
}
MqttError retval = MqttOk;
std::string s;
MqttBroker* broker = nullptr;
MqttClient* client = nullptr;
// client.function notation
// ("a.fun " becomes "fun a ")
if (cmd.find('.') != std::string::npos &&
cmd.find('.') < cmd.find(' '))
{
s=getword(cmd, nullptr, '.');
if (s.length())
{
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
{
Serial << "Unknown class (" << s.c_str() << ")" << endl;
cmd="";
}
}
}
s = getword(cmd);
if (s.length()) getCommand(s);
if (s.length()==0)
{}
else if (compare(s, "delete"))
{
if (client==nullptr && broker==nullptr)
{
s = getword(cmd);
if (clients.find(s) != clients.end())
{
client = clients[s];
}
else if (brokers.find(s) != brokers.end())
{
broker = brokers[s];
}
else
Serial << "Unable to find (" << s.c_str() << ")" << endl;
}
if (client)
{
for (auto it: clients)
{
if (it.second != client) continue;
Serial << "deleted" << endl;
delete (it.second);
clients.erase(it.first);
break;
}
cmd += " ls";
}
else if (broker)
{
for(auto it: brokers)
{
if (broker != it.second) continue;
Serial << "deleted" << endl;
delete (it.second);
brokers.erase(it.first);
break;
}
cmd += " ls";
}
else
Serial << "Nothing to delete" << endl;
}
else if (broker)
{
if (compare(s,"connect"))
{
Serial << "NYI" << endl;
}
else if (compare(s, "view"))
{
broker->dump();
}
}
else if (client)
{
if (compare(s,"connect"))
{
client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl;
}
else if (compare(s,"publish"))
{
while (cmd[0]==' ') cmd.erase(0,1);
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
cmd=""; // remove payload
}
else if (compare(s,"subscribe"))
{
client->subscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "unsubscribe"))
{
client->unsubscribe(getword(cmd, topic.c_str()));
}
else if (compare(s, "view"))
{
client->dump();
}
}
else if (compare(s, "blink"))
{
int8_t blink_nr = getint(cmd, -1);
if (blink_nr >= 0)
{
blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]);
pinMode(blink_nr, OUTPUT);
blink_next[blink_nr] = millis();
Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl;
if (blink_ms_on[blink_nr])
blink |= 1<< blink_nr;
else
{
blink &= ~(1<< blink_nr);
}
}
}
else if (compare(s, "auto"))
{
automatic::command(client, cmd);
if (client == nullptr)
cmd.clear();
}
else if (compare(s, "broker"))
{
std::string id=getword(cmd);
if (id.length() or brokers.find(id)!=brokers.end())
{
int port=getint(cmd, 0);
if (port)
{
MqttBroker* broker = new MqttBroker(port);
broker->begin();
brokers[id] = broker;
Serial << "new broker (" << id.c_str() << ")" << endl;
}
else
Serial << "Missing port" << endl;
}
else
Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl;
cmd+=" ls";
}
else if (compare(s, "client"))
{
std::string id=getword(cmd);
if (id.length() or clients.find(id)!=clients.end())
{
s=getword(cmd); // broker name
if (s=="" or brokers.find(s) != brokers.end())
{
MqttBroker* broker = nullptr;
if (s.length()) broker = brokers[s];
MqttClient* client = new MqttClient(broker);
client->id(id);
clients[id]=client;
client->setCallback(onPublish);
client->subscribe(topic);
Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl;
}
else if (s.length())
{
Serial << " not found." << endl;
}
}
else
Serial << "Missing or existing client name" << endl;
cmd+=" ls";
}
else if (compare(s, "set"))
{
std::string name(getword(cmd));
if (name.length()==0)
{
for(auto it: vars)
{
Serial << " " << it.first << " -> " << it.second << endl;
}
}
else if (commands.find(name) != commands.end())
{
Serial << "Reserved keyword (" << name << ")" << endl;
cmd.clear();
}
else
{
if (cmd.length())
{
vars[name] = cmd;
cmd.clear();
}
else if (vars.find(name) != vars.end())
vars.erase(vars.find(name));
}
}
else if (compare(s, "ls") or compare(s, "view"))
{
Serial << "--< " << clients.size() << " client/s. >--" << endl;
for(auto it: clients)
{
Serial << " "; it.second->dump();
}
Serial << "--< " << brokers.size() << " brokers/s. >--" << endl;
for(auto it: brokers)
{
Serial << " ==[ Broker: " << it.first.c_str() << " ]== ";
it.second->dump();
}
}
else if (compare(s, "reset"))
ESP.restart();
else if (compare(s, "ip"))
Serial << "IP: " << WiFi.localIP() << endl;
else if (compare(s,"help"))
{
Serial << "syntax: instr; instr; ..." << endl;
Serial << " MqttBroker:" << endl;
Serial << " broker {name} {port} : create a new broker" << endl;
Serial << endl;
Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.[un]subscribe [topic]" << endl;
Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl;
Serial << " name.delete" << endl;
automatic::help();
Serial << endl;
Serial << " help" << endl;
Serial << " blink [Dx on_ms off_ms]" << endl;
Serial << " ls / ip / reset" << endl;
Serial << " set [name][value]" << endl;
Serial << " ! repeat last command" << endl;
Serial << endl;
Serial << " $id : name of the client." << endl;
Serial << " default topic is '" << topic.c_str() << "'" << endl;
Serial << endl;
}
else
{
while(s[0]==' ') s.erase(0,1);
if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl;
}
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
if (cmd_end.length())
{
cmd = cmd_end;
cmd_end = "";
}
}
} }
else else
{ {

View File

@@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
MqttBroker::MqttBroker(uint16_t port) MqttBroker::MqttBroker(uint16_t port)
{ {
server = new AsyncServer(port); server = new TcpServer(port);
#ifdef TCP_ASYNC
server->onClient(onClient, this); server->onClient(onClient, this);
#endif
} }
MqttBroker::~MqttBroker() MqttBroker::~MqttBroker()
@@ -25,12 +27,17 @@ MqttBroker::~MqttBroker()
} }
// private constructor used by broker only // private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
: parent(parent), client(new_client) : parent(parent)
{ {
#ifdef TCP_ASYNC
client = new_client;
client->onData(onData, this); client->onData(onData, this);
// client->onConnect() TODO // client->onConnect() TODO
// client->onDisconnect() TODO // client->onDisconnect() TODO
#else
client = new WiFiClient(*new_client);
#endif
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
@@ -75,11 +82,19 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
keep_alive = ka; keep_alive = ka;
close(); close();
if (client) delete client; if (client) delete client;
client = new AsyncClient; client = new TcpClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
#ifdef TCP_ASYNC
client->onData(onData, this); client->onData(onData, this);
client->onConnect(onConnect, this); client->onConnect(onConnect, this);
debug("Trying to connect to " << broker.c_str() << ':' << port);
client->connect(broker.c_str(), port); client->connect(broker.c_str(), port);
#else
if (client->connect(broker.c_str(), port))
{
onConnect(this, client);
}
#endif
} }
void MqttBroker::addClient(MqttClient* client) void MqttBroker::addClient(MqttClient* client)
@@ -115,16 +130,24 @@ void MqttBroker::removeClient(MqttClient* remove)
debug("Error cannot remove client"); // TODO should not occur debug("Error cannot remove client"); // TODO should not occur
} }
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
{ {
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr); MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client)); broker->addClient(new MqttClient(broker, client));
debug("New client #" << broker->clients.size()); debug("New client");
} }
void MqttBroker::loop() void MqttBroker::loop()
{ {
#ifndef TCP_ASYNC
WiFiClient client = server->available();
if (client)
{
onClient(this, &client);
}
#endif
if (broker) if (broker)
{ {
// TODO should monitor broker's activity. // TODO should monitor broker's activity.
@@ -144,7 +167,7 @@ void MqttBroker::loop()
} }
else else
{ {
debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent); debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent);
// Note: deleting a client not added by the broker itself will probably crash later. // Note: deleting a client not added by the broker itself will probably crash later.
delete client; delete client;
break; break;
@@ -248,9 +271,20 @@ void MqttClient::loop()
// there is no need to send one PingReq per instance. // there is no need to send one PingReq per instance.
} }
} }
#ifndef TCP_ASYNC
while(client && client->available()>0)
{
message.incoming(client->read());
if (message.type())
{
processMessage(&message);
message.reset();
}
}
#endif
} }
void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*)
{ {
MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr); MqttClient* mqtt = static_cast<MqttClient*>(mqttclient_ptr);
debug("cnx: connecting"); debug("cnx: connecting");
@@ -265,12 +299,13 @@ void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*)
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
msg.sendTo(mqtt); msg.sendTo(mqtt);
msg.reset(); msg.reset();
debug("cnx: mqtt sent " << (int32_t)mqtt->parent); debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent);
mqtt->clientAlive(0); mqtt->clientAlive(0);
} }
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) #ifdef TCP_ASYNC
void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len)
{ {
char* char_ptr = static_cast<char*>(data); char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr); MqttClient* client=static_cast<MqttClient*>(client_ptr);
@@ -285,6 +320,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
len--; len--;
} }
} }
#endif
void MqttClient::resubscribe() void MqttClient::resubscribe()
{ {
@@ -361,7 +397,7 @@ void MqttClient::processMessage(const MqttMessage* mesg)
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{ {
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
// mesg->hexdump("Incoming"); // mesg->hexdump("Incoming");
} }
#endif #endif

View File

@@ -1,14 +1,28 @@
#pragma once #pragma once
#ifdef ESP8266
#include <ESPAsyncTCP.h> // TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC
// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx
#if defined(ESP8266) || defined(EPOXY_DUINO)
#ifdef TCP_ASYNC
#include <ESPAsyncTCP.h>
#else
#include <ESP8266WiFi.h>
#endif
#elif defined(ESP32) #elif defined(ESP32)
#include <WiFi.h> #ifdef TCP_ASYNC
#include <AsyncTCP.h> // https://github.com/me-no-dev/AsyncTCP #include <AsyncTCP.h> // https://github.com/me-no-dev/AsyncTCP
#elif defined(EPOXY_DUINO) #else
#include <ESPAsyncTCP.h> #include <WiFi.h>
#endif
#else #else
#error "Unsupported platform" #error "Unsupported platform"
#endif #endif
#ifdef EPOXY_DUINO
#define dbg_ptr uint64_t
#else
#define dbg_ptr uint32_t
#endif
#include <vector> #include <vector>
#include <set> #include <set>
#include <string> #include <string>
@@ -23,6 +37,14 @@
#define debug(what) {} #define debug(what) {}
#endif #endif
#ifdef TCP_ASYNC
using TcpClient = AsyncClient;
using TcpServer = AsyncServer;
#else
using TcpClient = WiFiClient;
using TcpServer = WiFiServer;
#endif
enum MqttError enum MqttError
{ {
MqttOk = 0, MqttOk = 0,
@@ -57,7 +79,7 @@ class MqttMessage
Subscribe = 0x80, Subscribe = 0x80,
SubAck = 0x90, SubAck = 0x90,
UnSubscribe = 0xA0, UnSubscribe = 0xA0,
UnSuback = 0xB0, UnSuback = 0xB0,
PingReq = 0xC0, PingReq = 0xC0,
PingResp = 0xD0, PingResp = 0xD0,
Disconnect = 0xE0 Disconnect = 0xE0
@@ -192,14 +214,15 @@ class MqttClient
static long counter; static long counter;
private: private:
static void onConnect(void * client_ptr, TcpClient*);
static void onConnect(void * client_ptr, AsyncClient*); #ifdef TCP_ASYNC
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
#endif
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe(); void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, AsyncClient* client); MqttClient(MqttBroker* parent, TcpClient* client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
@@ -217,7 +240,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here) // (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
AsyncClient* client=nullptr; // connection to mqtt client or to remote broker TcpClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
@@ -257,7 +280,7 @@ class MqttBroker
private: private:
friend class MqttClient; friend class MqttClient;
static void onClient(void*, AsyncClient*); static void onClient(void*, TcpClient*);
bool checkUser(const char* user, uint8_t len) const bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); } { return compareString(auth_user, user, len); }
@@ -275,7 +298,7 @@ class MqttBroker
bool compareString(const char* good, const char* str, uint8_t str_len) const; bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients; std::vector<MqttClient*> clients;
AsyncServer* server; TcpServer* server;
const char* auth_user = "guest"; const char* auth_user = "guest";
const char* auth_password = "guest"; const char* auth_password = "guest";