Trying to fuse togeter Async and not async version
This commit is contained in:
@@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len)
|
|||||||
|
|
||||||
MqttBroker::MqttBroker(uint16_t port)
|
MqttBroker::MqttBroker(uint16_t port)
|
||||||
{
|
{
|
||||||
server = new AsyncServer(port);
|
server = new TcpServer(port);
|
||||||
|
#ifdef TCP_ASYNC
|
||||||
server->onClient(onClient, this);
|
server->onClient(onClient, this);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
MqttBroker::~MqttBroker()
|
MqttBroker::~MqttBroker()
|
||||||
@@ -25,12 +27,14 @@ MqttBroker::~MqttBroker()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// private constructor used by broker only
|
// private constructor used by broker only
|
||||||
MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client)
|
MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
|
||||||
: parent(parent), client(new_client)
|
: parent(parent), client(new_client)
|
||||||
{
|
{
|
||||||
|
#ifdef TCP_ASYNC
|
||||||
client->onData(onData, this);
|
client->onData(onData, this);
|
||||||
// client->onConnect() TODO
|
// client->onConnect() TODO
|
||||||
// client->onDisconnect() TODO
|
// client->onDisconnect() TODO
|
||||||
|
#endif
|
||||||
alive = millis()+5000; // client expires after 5s if no CONNECT msg
|
alive = millis()+5000; // client expires after 5s if no CONNECT msg
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -75,11 +79,16 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
|||||||
keep_alive = ka;
|
keep_alive = ka;
|
||||||
close();
|
close();
|
||||||
if (client) delete client;
|
if (client) delete client;
|
||||||
client = new AsyncClient;
|
client = new TcpClient;
|
||||||
client->onData(onData, this);
|
client->onData(onData, this);
|
||||||
client->onConnect(onConnect, this);
|
client->onConnect(onConnect, this);
|
||||||
debug("Trying to connect to " << broker.c_str() << ':' << port);
|
debug("Trying to connect to " << broker.c_str() << ':' << port);
|
||||||
client->connect(broker.c_str(), port);
|
if (client->connect(broker.c_str(), port))
|
||||||
|
{
|
||||||
|
#ifndef TCP_ASYNC
|
||||||
|
onConnect(this, client);
|
||||||
|
#endif
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttBroker::addClient(MqttClient* client)
|
void MqttBroker::addClient(MqttClient* client)
|
||||||
@@ -115,7 +124,7 @@ void MqttBroker::removeClient(MqttClient* remove)
|
|||||||
debug("Error cannot remove client"); // TODO should not occur
|
debug("Error cannot remove client"); // TODO should not occur
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttBroker::onClient(void* broker_ptr, AsyncClient* client)
|
void MqttBroker::onClient(void* broker_ptr, TcpClient* client)
|
||||||
{
|
{
|
||||||
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
MqttBroker* broker = static_cast<MqttBroker*>(broker_ptr);
|
||||||
|
|
||||||
@@ -125,6 +134,14 @@ void MqttBroker::onClient(void* broker_ptr, AsyncClient* client)
|
|||||||
|
|
||||||
void MqttBroker::loop()
|
void MqttBroker::loop()
|
||||||
{
|
{
|
||||||
|
#ifndef TCP_ASYNC
|
||||||
|
WiFiClient client = server.available();
|
||||||
|
|
||||||
|
if (client)
|
||||||
|
{
|
||||||
|
onClient(this, &client);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
if (broker)
|
if (broker)
|
||||||
{
|
{
|
||||||
// TODO should monitor broker's activity.
|
// TODO should monitor broker's activity.
|
||||||
@@ -270,6 +287,7 @@ void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*)
|
|||||||
mqtt->clientAlive(0);
|
mqtt->clientAlive(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#ifdef TCP_ASYNC
|
||||||
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
|
void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
|
||||||
{
|
{
|
||||||
char* char_ptr = static_cast<char*>(data);
|
char* char_ptr = static_cast<char*>(data);
|
||||||
@@ -285,6 +303,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len)
|
|||||||
len--;
|
len--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void MqttClient::resubscribe()
|
void MqttClient::resubscribe()
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -16,13 +16,22 @@
|
|||||||
#include <MqttStreaming.h>
|
#include <MqttStreaming.h>
|
||||||
|
|
||||||
// #define TINY_MQTT_DEBUG
|
// #define TINY_MQTT_DEBUG
|
||||||
|
|
||||||
#ifdef TINY_MQTT_DEBUG
|
#ifdef TINY_MQTT_DEBUG
|
||||||
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
|
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
|
||||||
#else
|
#else
|
||||||
#define debug(what) {}
|
#define debug(what) {}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define TCP_ASYNC
|
||||||
|
|
||||||
|
#ifdef TCP_ASYNC
|
||||||
|
using TcpClient = AsyncClient;
|
||||||
|
using TcpServer = AsyncServer;
|
||||||
|
#else
|
||||||
|
using TcpClient = WiFiClient;
|
||||||
|
using TcpServer = WiFiServer;
|
||||||
|
#endif
|
||||||
|
|
||||||
enum MqttError
|
enum MqttError
|
||||||
{
|
{
|
||||||
MqttOk = 0,
|
MqttOk = 0,
|
||||||
@@ -192,14 +201,15 @@ class MqttClient
|
|||||||
static long counter;
|
static long counter;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
#ifdef TCP_ASYNC
|
||||||
static void onConnect(void * client_ptr, AsyncClient*);
|
static void onConnect(void * client_ptr, TcpClient*);
|
||||||
static void onData(void* client_ptr, AsyncClient*, void* data, size_t len);
|
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
|
||||||
|
#endif
|
||||||
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
|
MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos);
|
||||||
void resubscribe();
|
void resubscribe();
|
||||||
|
|
||||||
friend class MqttBroker;
|
friend class MqttBroker;
|
||||||
MqttClient(MqttBroker* parent, AsyncClient* client);
|
MqttClient(MqttBroker* parent, TcpClient* client);
|
||||||
// republish a received publish if topic matches any in subscriptions
|
// republish a received publish if topic matches any in subscriptions
|
||||||
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
|
MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg);
|
||||||
|
|
||||||
@@ -217,7 +227,7 @@ class MqttClient
|
|||||||
// (this is the case when MqttBroker isn't used except here)
|
// (this is the case when MqttBroker isn't used except here)
|
||||||
MqttBroker* parent=nullptr; // connection to local broker
|
MqttBroker* parent=nullptr; // connection to local broker
|
||||||
|
|
||||||
AsyncClient* client=nullptr; // connection to mqtt client or to remote broker
|
TcpClient* client=nullptr; // connection to mqtt client or to remote broker
|
||||||
std::set<Topic> subscriptions;
|
std::set<Topic> subscriptions;
|
||||||
std::string clientId;
|
std::string clientId;
|
||||||
CallBack callback = nullptr;
|
CallBack callback = nullptr;
|
||||||
@@ -257,7 +267,7 @@ class MqttBroker
|
|||||||
private:
|
private:
|
||||||
friend class MqttClient;
|
friend class MqttClient;
|
||||||
|
|
||||||
static void onClient(void*, AsyncClient*);
|
static void onClient(void*, TcpClient*);
|
||||||
bool checkUser(const char* user, uint8_t len) const
|
bool checkUser(const char* user, uint8_t len) const
|
||||||
{ return compareString(auth_user, user, len); }
|
{ return compareString(auth_user, user, len); }
|
||||||
|
|
||||||
@@ -275,7 +285,7 @@ class MqttBroker
|
|||||||
|
|
||||||
bool compareString(const char* good, const char* str, uint8_t str_len) const;
|
bool compareString(const char* good, const char* str, uint8_t str_len) const;
|
||||||
std::vector<MqttClient*> clients;
|
std::vector<MqttClient*> clients;
|
||||||
AsyncServer* server;
|
TcpServer* server;
|
||||||
|
|
||||||
const char* auth_user = "guest";
|
const char* auth_user = "guest";
|
||||||
const char* auth_password = "guest";
|
const char* auth_password = "guest";
|
||||||
|
|||||||
Reference in New Issue
Block a user