Very promising async commit

This commit is contained in:
hsaturn
2021-04-10 13:42:43 +02:00
parent 838df3a34a
commit 7ef18de755
2 changed files with 49 additions and 34 deletions

View File

@@ -9,8 +9,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
Serial << '\'' << endl; Serial << '\'' << endl;
} }
MqttBroker::MqttBroker(uint16_t port) : server(port) MqttBroker::MqttBroker(uint16_t port)
{ {
server = new AsyncServer(port);
server->onClient(onClient, this);
} }
MqttBroker::~MqttBroker() MqttBroker::~MqttBroker()
@@ -19,14 +21,16 @@ MqttBroker::~MqttBroker()
{ {
delete clients[0]; delete clients[0];
} }
server.close(); delete server;
} }
// private constructor used by broker only // private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client) MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client)
: parent(parent) : parent(parent), client(new_client)
{ {
client = new WiFiClient(new_client); client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
alive = millis()+5000; // client expires after 5s if no CONNECT msg alive = millis()+5000; // client expires after 5s if no CONNECT msg
} }
@@ -70,8 +74,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
debug("cnx: closing"); debug("cnx: closing");
close(); close();
if (client) delete client; if (client) delete client;
client = new WiFiClient; client = new AsyncClient;
debug("Trying to connect to " << broker.c_str() << ':' << port); debug("Trying to connect to " << broker.c_str() << ':' << port);
// TODO This may return immediately !!!
// TODO so I have to add onConnect and move this code to onConnect
// TODO also, as this is async now, I must take care of
// TODO the broker that may disconnect and delete the client immediately
if (client->connect(broker.c_str(), port)) if (client->connect(broker.c_str(), port))
{ {
debug("cnx: connecting"); debug("cnx: connecting");
@@ -126,10 +134,16 @@ void MqttBroker::removeClient(MqttClient* remove)
debug("Error cannot remove client"); // TODO should not occur debug("Error cannot remove client"); // TODO should not occur
} }
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client)
{
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client));
debug("New client #" << broker->clients->size());
}
void MqttBroker::loop() void MqttBroker::loop()
{ {
WiFiClient client = server.available();
if (broker) if (broker)
{ {
// TODO should monitor broker's activity. // TODO should monitor broker's activity.
@@ -137,11 +151,6 @@ void MqttBroker::loop()
broker->loop(); broker->loop();
} }
if (client)
{
addClient(new MqttClient(this, client));
debug("New client (" << clients.size() << ')');
}
// for(auto it=clients.begin(); it!=clients.end(); it++) // for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop // use index because size can change during the loop
@@ -168,6 +177,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{ {
return broker->subscribe(topic, qos); return broker->subscribe(topic, qos);
} }
return MqttNowhereToSend;
} }
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
@@ -179,7 +189,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons
for(auto client: clients) for(auto client: clients)
{ {
i++; i++;
#if TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl; " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif #endif
@@ -200,7 +210,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons
{ {
doit = true; doit = true;
} }
#if TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
#endif #endif
@@ -250,22 +260,28 @@ void MqttClient::loop()
{ {
debug("pingreq"); debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2); client->write((const char*)(&pingreq), 2);
clientAlive(0); clientAlive(0);
// TODO when many MqttClient passes through a local browser // TODO when many MqttClient passes through a local browser
// there is no need to send one PingReq per instance. // there is no need to send one PingReq per instance.
} }
} }
}
while(client && client->available()>0) void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
{
char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr);
while(len>0)
{ {
message.incoming(client->read()); client->message.incoming(*char_ptr++);
if (message.type()) if (client->message.type())
{ {
processMessage(&message); client->processMessage(&client->message);
message.reset(); client->message.reset();
} }
len--;
} }
} }
@@ -341,7 +357,7 @@ long MqttClient::counter=0;
void MqttClient::processMessage(const MqttMessage* mesg) void MqttClient::processMessage(const MqttMessage* mesg)
{ {
counter++; counter++;
#if TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{ {
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
@@ -438,7 +454,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
if (client) if (client)
{ {
uint16_t pingreq = MqttMessage::Type::PingResp; uint16_t pingreq = MqttMessage::Type::PingResp;
client->write((uint8_t*)(&pingreq), 2); client->write((const char*)(&pingreq), 2);
bclose = false; bclose = false;
} }
else else
@@ -470,7 +486,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
subscriptions.erase(it); subscriptions.erase(it);
} }
payload += len; payload += len;
uint8_t qos = *payload++; /* uint8_t qos =*/ *payload++;
debug(" qos=" << qos); debug(" qos=" << qos);
} }
debug("end loop"); debug("end loop");

View File

@@ -1,16 +1,15 @@
#pragma once #pragma once
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h>
#include <vector> #include <vector>
#include <set> #include <set>
#include <string> #include <string>
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h> #include <MqttStreaming.h>
#if 0 #ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#define TINY_MQTT_DEBUG 1
#else #else
#define TINY_MQTT_DEBUG 0
#define debug(what) {} #define debug(what) {}
#endif #endif
@@ -190,11 +189,12 @@ class MqttClient
static long counter; static long counter;
private: private:
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len);
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe(); void resubscribe();
friend class MqttBroker; friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client); MqttClient(MqttBroker* parent, AsyncClient* client);
// republish a received publish if topic matches any in subscriptions // republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
@@ -212,7 +212,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here) // (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker AsyncClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions; std::set<Topic> subscriptions;
std::string clientId; std::string clientId;
CallBack callback = nullptr; CallBack callback = nullptr;
@@ -231,11 +231,9 @@ class MqttBroker
MqttBroker(uint16_t port); MqttBroker(uint16_t port);
~MqttBroker(); ~MqttBroker();
void begin() { server.begin(); } void begin() { server->begin(); }
void loop(); void loop();
uint16_t port() const { return server.port(); }
void connect(const std::string& host, uint16_t port=1883); void connect(const std::string& host, uint16_t port=1883);
bool connected() const { return state == Connected; } bool connected() const { return state == Connected; }
@@ -254,6 +252,7 @@ class MqttBroker
private: private:
friend class MqttClient; friend class MqttClient;
static void onClient(void*, AsyncClient*);
bool checkUser(const char* user, uint8_t len) const bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); } { return compareString(auth_user, user, len); }
@@ -271,7 +270,7 @@ class MqttBroker
bool compareString(const char* good, const char* str, uint8_t str_len) const; bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients; std::vector<MqttClient*> clients;
WiFiServer server; AsyncServer* server;
const char* auth_user = "guest"; const char* auth_user = "guest";
const char* auth_password = "guest"; const char* auth_password = "guest";