Version 0.2

This commit is contained in:
hsaturn
2021-03-19 19:02:40 +01:00
parent bb2a2e6737
commit b33c9ba687
5 changed files with 159 additions and 40 deletions

View File

@@ -15,20 +15,29 @@ MqttBroker::MqttBroker(uint16_t port)
{
}
MqttCnx::MqttCnx(MqttBroker* parent, WiFiClient& new_client)
MqttClient::MqttClient(MqttBroker* parent, WiFiClient& new_client)
: parent(parent),
mqtt_connected(false)
{
client = new_client ? new WiFiClient(new_client) : nullptr;
clientAlive();
alive = millis()+5000; // client expires after 5s if no CONNECT msg
}
MqttCnx::~MqttCnx()
MqttClient::MqttClient(MqttBroker* parent)
: parent(parent)
{
client = nullptr;
parent->addClient(this);
}
MqttClient::~MqttClient()
{
close();
parent->removeClient(this);
}
void MqttCnx::close()
void MqttClient::close()
{
if (client)
{
@@ -38,13 +47,32 @@ void MqttCnx::close()
}
}
void MqttBroker::addClient(MqttClient* client)
{
clients.push_back(client);
}
void MqttBroker::removeClient(MqttClient* remove)
{
for(auto it=clients.begin(); it!=clients.end(); it++)
{
auto client=*it;
if (client==remove)
{
clients.erase(it);
return;
}
}
Serial << "Error cannot remove client" << endl; // TODO should not occur
}
void MqttBroker::loop()
{
WiFiClient client = server.available();
if (client)
{
clients.push_back(new MqttCnx(this, client));
addClient(new MqttClient(this, client));
Serial << "New client (" << clients.size() << ')' << endl;
}
@@ -58,7 +86,7 @@ void MqttBroker::loop()
else
{
Serial << "Client " << client->id().c_str() << " Disconnected" << endl;
clients.erase(it);
// Note: deleting a client not added by the broker itself will probably crash later.
delete client;
break;
}
@@ -67,6 +95,7 @@ void MqttBroker::loop()
void MqttBroker::publish(const Topic& topic, MqttMessage& msg)
{
Serial << " publish" << __LINE__ << endl;
for(auto client: clients)
client->publish(topic, msg);
}
@@ -87,7 +116,7 @@ void MqttMessage::getString(char* &buffer, uint16_t& len)
buffer+=2;
}
void MqttCnx::clientAlive()
void MqttClient::clientAlive()
{
if (keep_alive)
{
@@ -97,7 +126,7 @@ void MqttCnx::clientAlive()
alive=0;
}
void MqttCnx::loop()
void MqttClient::loop()
{
if (alive && (millis() > alive))
{
@@ -115,7 +144,7 @@ void MqttCnx::loop()
}
}
void MqttCnx::processMessage()
void MqttClient::processMessage()
{
std::string error;
std::string s;
@@ -186,7 +215,7 @@ void MqttCnx::processMessage()
message.getString(payload, len); // Topic
outstring("Subscribes", payload, len);
subscriptions.insert(Topic(payload, len));
subscribe(Topic(payload, len));
bclose = false;
// TODO SUBACK
break;
@@ -241,14 +270,40 @@ bool Topic::matches(const Topic& topic) const
return false;
}
void MqttCnx::publish(const Topic& topic, MqttMessage& msg)
// publish from local client to a broker
void MqttClient::publish(const Topic& topic, const char* payload, size_t pay_length)
{
Serial << " publish" << __LINE__ << endl;
message.create(MqttMessage::Publish);
message.add(topic);
message.add(payload, pay_length);
if (parent)
parent->publish(topic, message);
else if (client)
publish(topic, message);
else
Serial << " Should not happen" << endl;
}
// republish a received publish if it matches any in subscriptions
void MqttClient::publish(const Topic& topic, MqttMessage& msg)
{
Serial << " publish " << topic.c_str() << __LINE__ << endl;
for(const auto& subscription: subscriptions)
{
Serial << " check " << subscription.c_str() << __LINE__ << endl;
if (subscription.matches(topic))
{
// Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl;
msg.sendTo(this);
Serial << " matche !" << endl;
if (client)
{
// Serial << "Republishing " << topic.str().c_str() << " to " << clientId.c_str() << endl;
msg.sendTo(this);
}
else if (callback)
{
callback(topic, nullptr, 0); // TODO
}
}
}
}
@@ -311,6 +366,11 @@ void MqttMessage::incoming(char in_byte)
}
}
void MqttMessage::add(const char* p, size_t len)
{
while(len--) incoming(*p);
}
void MqttMessage::encodeLength(char* msb, int length)
{
do
@@ -322,7 +382,7 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length);
};
void MqttMessage::sendTo(MqttCnx* client)
void MqttMessage::sendTo(MqttClient* client)
{
if (curr-buffer-2 >= 0)
{

View File

@@ -11,11 +11,14 @@ class Topic : public IndexedString
public:
Topic(const char* s, uint8_t len) : IndexedString(s,len){}
Topic(const char* s) : Topic(s, strlen(s)) {}
Topic(const std::string s) : Topic(s.c_str(), s.length()){};
const char* c_str() const { return str().c_str(); }
bool matches(const Topic&) const;
};
class MqttCnx;
class MqttClient;
class MqttMessage
{
public:
@@ -45,6 +48,9 @@ class MqttMessage
MqttMessage(Type t) { create(t); }
void incoming(char byte);
void add(char byte) { incoming(byte); }
void add(const char* p, size_t len);
void add(const std::string& s) { add(s.c_str(), s.length()); }
void add(const Topic& t) { add(t.str()); }
char* getVHeader() const { return vheader; }
char* end() const { return curr; }
uint16_t length() const { return curr-buffer; }
@@ -69,7 +75,7 @@ class MqttMessage
size=0;
state=Create;
}
void sendTo(MqttCnx*);
void sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const;
private:
@@ -83,8 +89,9 @@ class MqttMessage
};
class MqttBroker;
class MqttCnx
class MqttClient
{
using CallBack = void (*)(const Topic& topic, const char* payload, size_t payload_length);
enum Flags
{
FlagUserName = 128,
@@ -96,11 +103,11 @@ class MqttCnx
FlagReserved = 1
};
public:
MqttCnx(MqttBroker* parent, WiFiClient& client);
MqttClient(MqttBroker*);
~MqttCnx();
~MqttClient();
bool connected() { return client && client->connected(); }
bool connected() { return client==nullptr || client->connected(); }
void write(const char* buf, size_t length)
{ if (client) client->write(buf, length); }
@@ -108,9 +115,22 @@ class MqttCnx
void loop();
void close();
void publish(const Topic& topic, MqttMessage& msg);
void setCallback(CallBack fun) {callback=fun; };
// Publish from client to the world
void publish(const Topic&, const char* payload, size_t pay_length);
void publish(const Topic& t, const std::string& s) { publish(t,s.c_str(),s.length());}
void publish(const Topic& t) { publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); }
void unsubscribe(Topic& topic);
private:
friend class MqttBroker;
MqttClient(MqttBroker* parent, WiFiClient& client);
// republish a received publish if topic matches any in subscriptions
void publish(const Topic& topic, MqttMessage& msg);
void clientAlive();
void processMessage();
@@ -118,20 +138,12 @@ class MqttCnx
uint32_t keep_alive;
uint32_t alive;
bool mqtt_connected;
WiFiClient* client;
WiFiClient* client; // nullptr if this client is local
MqttMessage message;
MqttBroker* parent;
std::set<Topic> subscriptions;
std::string clientId;
};
class MqttClient
{
public:
MqttClient(IPAddress broker) : broker_ip(broker) {}
protected:
IPAddress broker_ip;
CallBack callback;
};
class MqttBroker
@@ -142,17 +154,26 @@ class MqttBroker
void begin() { server.begin(); }
void loop();
uint8_t port() const { return server.port(); }
private:
friend class MqttClient;
bool checkUser(const char* user, uint8_t len) const
{ return compareString(auth_user, user, len); }
bool checkPassword(const char* password, uint8_t len) const
{ return compareString(auth_password, password, len); }
void publish(const Topic& topic, MqttMessage& msg);
private:
// For clients that are added not by the broker itself
void addClient(MqttClient* client);
void removeClient(MqttClient* client);
bool compareString(const char* good, const char* str, uint8_t str_len) const;
std::vector<MqttCnx*> clients;
std::vector<MqttClient*> clients;
WiFiServer server;
const char* auth_user = "guest";