From 2e92a98db21270642260e3ae72f1f025d42ff0bd Mon Sep 17 00:00:00 2001 From: hsaturn Date: Sun, 11 Apr 2021 15:51:33 +0200 Subject: [PATCH] Trying to fuse togeter Async and not async version --- src/TinyMqtt.cpp | 29 ++++++++++++++++++++++++----- src/TinyMqtt.h | 28 +++++++++++++++++++--------- 2 files changed, 43 insertions(+), 14 deletions(-) diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 40ddef4..3b51ddc 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) MqttBroker::MqttBroker(uint16_t port) { - server = new AsyncServer(port); + server = new TcpServer(port); +#ifdef TCP_ASYNC server->onClient(onClient, this); +#endif } MqttBroker::~MqttBroker() @@ -25,12 +27,14 @@ MqttBroker::~MqttBroker() } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) +MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client) : parent(parent), client(new_client) { +#ifdef TCP_ASYNC client->onData(onData, this); // client->onConnect() TODO // client->onDisconnect() TODO +#endif alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -75,11 +79,16 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) keep_alive = ka; close(); if (client) delete client; - client = new AsyncClient; + client = new TcpClient; client->onData(onData, this); client->onConnect(onConnect, this); debug("Trying to connect to " << broker.c_str() << ':' << port); - client->connect(broker.c_str(), port); + if (client->connect(broker.c_str(), port)) + { + #ifndef TCP_ASYNC + onConnect(this, client); + #endif + } } void MqttBroker::addClient(MqttClient* client) @@ -115,7 +124,7 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } -void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +void MqttBroker::onClient(void* broker_ptr, TcpClient* client) { MqttBroker* broker = static_cast(broker_ptr); @@ -125,6 +134,14 @@ void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) void MqttBroker::loop() { +#ifndef TCP_ASYNC + WiFiClient client = server.available(); + + if (client) + { + onClient(this, &client); + } +#endif if (broker) { // TODO should monitor broker's activity. @@ -270,6 +287,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) mqtt->clientAlive(0); } +#ifdef TCP_ASYNC void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) { char* char_ptr = static_cast(data); @@ -285,6 +303,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) len--; } } +#endif void MqttClient::resubscribe() { diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index c5d5964..7cf31d3 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -16,13 +16,22 @@ #include // #define TINY_MQTT_DEBUG - #ifdef TINY_MQTT_DEBUG #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #else #define debug(what) {} #endif +#define TCP_ASYNC + +#ifdef TCP_ASYNC + using TcpClient = AsyncClient; + using TcpServer = AsyncServer; +#else + using TcpClient = WiFiClient; + using TcpServer = WiFiServer; +#endif + enum MqttError { MqttOk = 0, @@ -57,7 +66,7 @@ class MqttMessage Subscribe = 0x80, SubAck = 0x90, UnSubscribe = 0xA0, - UnSuback = 0xB0, + UnSuback = 0xB0, PingReq = 0xC0, PingResp = 0xD0, Disconnect = 0xE0 @@ -192,14 +201,15 @@ class MqttClient static long counter; private: - - static void onConnect(void * client_ptr, AsyncClient*); - static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); +#ifdef TCP_ASYNC + static void onConnect(void * client_ptr, TcpClient*); + static void onData(void* client_ptr, TcpClient*, void* data, size_t len); +#endif MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, AsyncClient* client); + MqttClient(MqttBroker* parent, TcpClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -217,7 +227,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - AsyncClient* client=nullptr; // connection to mqtt client or to remote broker + TcpClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -257,7 +267,7 @@ class MqttBroker private: friend class MqttClient; - static void onClient(void*, AsyncClient*); + static void onClient(void*, TcpClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -275,7 +285,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - AsyncServer* server; + TcpServer* server; const char* auth_user = "guest"; const char* auth_password = "guest";