Very promising async commit

Very promising async commit
This commit is contained in:
hsaturn
2021-04-10 13:42:43 +02:00
committed by hsaturn
parent 67a296eb28
commit 3e8d34e4e7
2 changed files with 49 additions and 34 deletions

View File

@@ -9,8 +9,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
Serial << '\'' << endl;
}
MqttBroker::MqttBroker(uint16_t port) : server(port)
MqttBroker::MqttBroker(uint16_t port)
{
server = new AsyncServer(port);
server->onClient(onClient, this);
}
MqttBroker::~MqttBroker()
@@ -19,14 +21,16 @@ MqttBroker::~MqttBroker()
{
delete clients[0];
}
server.close();
delete server;
}
// private constructor used by broker only
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent)
MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client)
: parent(parent), client(new_client)
{
client = new WiFiClient(new_client);
client->onData(onData, this);
// client->onConnect() TODO
// client->onDisconnect() TODO
alive = millis()+5000; // client expires after 5s if no CONNECT msg
}
@@ -70,8 +74,12 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
debug("cnx: closing");
close();
if (client) delete client;
client = new WiFiClient;
client = new AsyncClient;
debug("Trying to connect to " << broker.c_str() << ':' << port);
// TODO This may return immediately !!!
// TODO so I have to add onConnect and move this code to onConnect
// TODO also, as this is async now, I must take care of
// TODO the broker that may disconnect and delete the client immediately
if (client->connect(broker.c_str(), port))
{
debug("cnx: connecting");
@@ -126,10 +134,16 @@ void MqttBroker::removeClient(MqttClient* remove)
debug("Error cannot remove client"); // TODO should not occur
}
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client)
{
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
broker->addClient(new MqttClient(broker, client));
debug("New client #" << broker->clients->size());
}
void MqttBroker::loop()
{
WiFiClient client = server.available();
if (broker)
{
// TODO should monitor broker's activity.
@@ -137,11 +151,6 @@ void MqttBroker::loop()
broker->loop();
}
if (client)
{
addClient(new MqttClient(this, client));
debug("New client (" << clients.size() << ')');
}
// for(auto it=clients.begin(); it!=clients.end(); it++)
// use index because size can change during the loop
@@ -168,6 +177,7 @@ MqttError MqttBroker::subscribe(const Topic& topic, uint8_t qos)
{
return broker->subscribe(topic, qos);
}
return MqttNowhereToSend;
}
MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, const MqttMessage& msg) const
@@ -179,7 +189,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons
for(auto client: clients)
{
i++;
#if TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
#endif
@@ -200,7 +210,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, cons
{
doit = true;
}
#if TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG
Serial << ", doit=" << doit << ' ';
#endif
@@ -250,22 +260,28 @@ void MqttClient::loop()
{
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2);
client->write((const char*)(&pingreq), 2);
clientAlive(0);
// TODO when many MqttClient passes through a local browser
// there is no need to send one PingReq per instance.
}
}
}
while(client && client->available()>0)
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
{
char* char_ptr = static_cast<char*>(data);
MqttClient* client=static_cast<MqttClient*>(client_ptr);
while(len>0)
{
message.incoming(client->read());
if (message.type())
client->message.incoming(*char_ptr++);
if (client->message.type())
{
processMessage(&message);
message.reset();
client->processMessage(&client->message);
client->message.reset();
}
len--;
}
}
@@ -341,7 +357,7 @@ long MqttClient::counter=0;
void MqttClient::processMessage(const MqttMessage* mesg)
{
counter++;
#if TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
@@ -438,7 +454,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
if (client)
{
uint16_t pingreq = MqttMessage::Type::PingResp;
client->write((uint8_t*)(&pingreq), 2);
client->write((const char*)(&pingreq), 2);
bclose = false;
}
else
@@ -470,7 +486,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
subscriptions.erase(it);
}
payload += len;
uint8_t qos = *payload++;
/* uint8_t qos =*/ *payload++;
debug(" qos=" << qos);
}
debug("end loop");
@@ -488,7 +504,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
Topic published(payload, len);
payload += len;
// Serial << "Received Publish (" << published.str().c_str() << ") size=" << (int)len
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
// << '(' << std::string(payload, len).c_str() << ')' << " msglen=" << mesg->length() << endl;
if (qos) payload+=2; // ignore packet identifier if any
len=mesg->end()-payload;
// TODO reset DUP

View File

@@ -1,4 +1,5 @@
#pragma once
#include <ESP8266WiFi.h>
#include <ESPAsyncTCP.h>
#include <vector>
#include <set>
@@ -6,11 +7,9 @@
#include "StringIndexer.h"
#include <MqttStreaming.h>
#if 0
#ifdef TINY_MQTT_DEBUG
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#define TINY_MQTT_DEBUG 1
#else
#define TINY_MQTT_DEBUG 0
#define debug(what) {}
#endif
@@ -190,11 +189,12 @@ class MqttClient
static long counter;
private:
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len);
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
void resubscribe();
friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client);
MqttClient(MqttBroker* parent, AsyncClient* client);
// republish a received publish if topic matches any in subscriptions
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
@@ -212,7 +212,7 @@ class MqttClient
// (this is the case when MqttBroker isn't used except here)
MqttBroker* parent=nullptr; // connection to local broker
WiFiClient* client=nullptr; // connection to mqtt client or to remote broker
AsyncClient* client=nullptr; // connection to mqtt client or to remote broker
std::set<Topic> subscriptions;
std::string clientId;
CallBack callback = nullptr;
@@ -231,11 +231,9 @@ class MqttBroker
MqttBroker(uint16_t port);
~MqttBroker();
void begin() { server.begin(); }
void begin() { server->begin(); }
void loop();
uint16_t port() const { return server.port(); }
void connect(const std::string& host, uint16_t port=1883);
bool connected() const { return state == Connected; }
@@ -254,6 +252,7 @@ class MqttBroker
private:
friend class MqttClient;
static void onClient(void*, AsyncClient*);
bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); }
@@ -271,7 +270,7 @@ class MqttBroker
bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttClient*> clients;
WiFiServer server;
AsyncServer* server;
const char* auth_user = "guest";
const char* auth_password = "guest";