Merge pull request #25 from hsaturn/pr22

was pr#22 : added reply to message subscribe and unsubscribe
This commit is contained in:
hsaturn
2022-01-10 05:28:08 +01:00
committed by GitHub
5 changed files with 84 additions and 24 deletions

View File

@@ -1,6 +1,10 @@
#include "TinyMqtt.h" #include "TinyMqtt.h"
#include <sstream> #include <sstream>
#ifdef EPOXY_DUINO
std::map<MqttMessage::Type, int> MqttClient::counters;
#endif
MqttBroker::MqttBroker(uint16_t port) MqttBroker::MqttBroker(uint16_t port)
{ {
server = new TcpServer(port); server = new TcpServer(port);
@@ -30,7 +34,11 @@ MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client)
#else #else
client = new WiFiClient(*new_client); client = new WiFiClient(*new_client);
#endif #endif
#ifdef EPOXY_DUINO
alive = millis()+500000;
#else
alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg alive = millis()+5000; // TODO MAGIC client expires after 5s if no CONNECT msg
#endif
} }
MqttClient::MqttClient(MqttBroker* parent, const std::string& id) MqttClient::MqttClient(MqttBroker* parent, const std::string& id)
@@ -242,7 +250,11 @@ void MqttClient::clientAlive(uint32_t more_seconds)
{ {
if (keep_alive) if (keep_alive)
{ {
#ifdef EPOXY_DUINO
alive=millis()+500000;
#else
alive=millis()+1000*(keep_alive+more_seconds); alive=millis()+1000*(keep_alive+more_seconds);
#endif
} }
else else
alive=0; alive=0;
@@ -387,11 +399,8 @@ MqttError MqttClient::sendTopic(const Topic& topic, MqttMessage::Type type, uint
return msg.sendTo(this); return msg.sendTo(this);
} }
long MqttClient::counter=0;
void MqttClient::processMessage(MqttMessage* mesg) void MqttClient::processMessage(MqttMessage* mesg)
{ {
counter++;
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
{ {
@@ -409,7 +418,11 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
uint16_t len; uint16_t len;
bool bclose=true; bool bclose=true;
switch(mesg->type() & 0XF0) #ifdef EPOXY_DUINO
counters[mesg->type()]++;
#endif
switch(mesg->type())
{ {
case MqttMessage::Type::Connect: case MqttMessage::Type::Connect:
if (mqtt_connected) if (mqtt_connected)
@@ -510,17 +523,25 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
payload = header+2; payload = header+2;
debug("un/subscribe loop"); debug("un/subscribe loop");
std::string qoss;
while(payload < mesg->end()) while(payload < mesg->end())
{ {
mesg->getString(payload, len); // Topic mesg->getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')'); debug( " topic (" << std::string(payload, len) << ')');
// subscribe(Topic(payload, len)); // subscribe(Topic(payload, len));
Topic topic(payload, len); Topic topic(payload, len);
payload += len; payload += len;
if ((mesg->type() & 0XF0) == MqttMessage::Type::Subscribe) if (mesg->type() == MqttMessage::Type::Subscribe)
{ {
uint8_t qos = *payload++; uint8_t qos = *payload++;
if (qos != 0) debug("Unsupported QOS" << qos << endl); if (qos != 0)
{
debug("Unsupported QOS" << qos << endl);
qoss.push_back(0x80);
}
else
qoss.push_back(qos);
subscriptions.insert(topic); subscriptions.insert(topic);
} }
else else
@@ -532,7 +553,12 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
} }
debug("end loop"); debug("end loop");
bclose = false; bclose = false;
// TODO SUBACK
MqttMessage ack(mesg->type() == MqttMessage::Type::Subscribe ? MqttMessage::Type::SubAck : MqttMessage::Type::UnSuback);
ack.add(header[0]);
ack.add(header[1]);
ack.add(qoss.c_str(), qoss.size(), false);
ack.sendTo(this);
} }
break; break;
@@ -547,7 +573,7 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
#endif #endif
if (mqtt_connected or client == nullptr) if (mqtt_connected or client == nullptr)
{ {
uint8_t qos = mesg->type() & 0x6; uint8_t qos = mesg->flags();
payload = header; payload = header;
mesg->getString(payload, len); mesg->getString(payload, len);
Topic published(payload, len); Topic published(payload, len);

View File

@@ -42,7 +42,7 @@
using TcpServer = WiFiServer; using TcpServer = WiFiServer;
#endif #endif
enum MqttError enum __attribute__((packed)) MqttError
{ {
MqttOk = 0, MqttOk = 0,
MqttNowhereToSend=1, MqttNowhereToSend=1,
@@ -66,7 +66,7 @@ class MqttMessage
{ {
const uint16_t MaxBufferLength = 4096; //hard limit: 16k due to size decoding const uint16_t MaxBufferLength = 4096; //hard limit: 16k due to size decoding
public: public:
enum Type enum __attribute__((packed)) Type
{ {
Unknown = 0, Unknown = 0,
Connect = 0x10, Connect = 0x10,
@@ -81,7 +81,7 @@ class MqttMessage
PingResp = 0xD0, PingResp = 0xD0,
Disconnect = 0xE0 Disconnect = 0xE0
}; };
enum State enum __attribute__((packed)) State
{ {
FixedHeader=0, FixedHeader=0,
Length=1, Length=1,
@@ -111,12 +111,14 @@ class MqttMessage
Type type() const Type type() const
{ {
return state == Complete ? static_cast<Type>(buffer[0]) : Unknown; return state == Complete ? static_cast<Type>(buffer[0] & 0xF0) : Unknown;
} }
uint8_t flags() const { return static_cast<uint8_t>(buffer[0] & 0x0F); }
void create(Type type) void create(Type type)
{ {
buffer=(char)type; buffer=(decltype(buffer)::value_type)type;
buffer+='\0'; // reserved for msg length byte 1/2 buffer+='\0'; // reserved for msg length byte 1/2
buffer+='\0'; // reserved for msg length byte 2/2 (fixed) buffer+='\0'; // reserved for msg length byte 2/2 (fixed)
vheader=3; // Should never change vheader=3; // Should never change
@@ -139,7 +141,7 @@ class MqttBroker;
class MqttClient class MqttClient
{ {
using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length); using CallBack = void (*)(const MqttClient* source, const Topic& topic, const char* payload, size_t payload_length);
enum Flags enum __attribute__((packed)) Flags
{ {
FlagUserName = 128, FlagUserName = 128,
FlagPassword = 64, FlagPassword = 64,
@@ -223,7 +225,9 @@ class MqttClient
#endif #endif
} }
static long counter; // Number of processed messages #ifdef EPOXY_DUINO
static std::map<MqttMessage::Type, int> counters; // Number of processed messages
#endif
private: private:
@@ -262,7 +266,7 @@ class MqttClient
class MqttBroker class MqttBroker
{ {
enum State enum __attribute__((packed)) State
{ {
Disconnected, // Also the initial state Disconnected, // Also the initial state
Connecting, // connect and sends a fake publish to avoid circular cnx Connecting, // connect and sends a fake publish to avoid circular cnx

View File

@@ -4,3 +4,4 @@ git clone https://github.com/bxparks/AUnit.git
git clone https://github.com/bxparks/EpoxyDuino.git git clone https://github.com/bxparks/EpoxyDuino.git
cd TinyMqtt/tests cd TinyMqtt/tests
make make
make runtests

View File

@@ -31,12 +31,10 @@ void onPublish(const MqttClient* srce, const Topic& topic, const char* payload,
test(local_client_should_unregister_when_destroyed) test(local_client_should_unregister_when_destroyed)
{ {
return;
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
{ {
MqttClient client;
assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected assertEqual(broker.clientsCount(), (size_t)0); // Ensure client is not yet connected
client.connect("127.0.0.1", 1883); MqttClient client(&broker);
assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected assertEqual(broker.clientsCount(), (size_t)1); // Ensure client is now connected
} }
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);

View File

@@ -12,6 +12,11 @@
using namespace std; using namespace std;
String toString(const IPAddress& ip)
{
return String(ip[0])+'.'+String(ip[1])+'.'+String(ip[2])+'.'+String(ip[3]);
}
MqttBroker broker(1883); MqttBroker broker(1883);
std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count std::map<std::string, std::map<Topic, int>> published; // map[client_id] => map[topic] = count
@@ -51,10 +56,36 @@ test(network_single_broker_begin)
// TODO Nothing is tested here ! // TODO Nothing is tested here !
} }
test(suback)
{
start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883);
broker.begin();
IPAddress broker_ip = WiFi.localIP();
ESP8266WiFiClass::selectInstance(2);
MqttClient client;
client.connect(broker_ip.toString().c_str(), 1883);
broker.loop();
assertTrue(broker.clientsCount() == 1);
assertTrue(client.connected());
MqttClient::counters[MqttMessage::Type::SubAck] = 0;
client.subscribe("a/b");
// TODO how to avoid these loops ???
broker.loop();
client.loop();
assertEqual(MqttClient::counters[MqttMessage::Type::SubAck], 1);
}
test(network_client_to_broker_connexion) test(network_client_to_broker_connexion)
{ {
start_servers(2, true); start_servers(2, true);
assertEqual(WiFi.status(), WL_CONNECTED); assertEqual(WiFi.status(), WL_CONNECTED);
MqttBroker broker(1883); MqttBroker broker(1883);
@@ -147,7 +178,6 @@ test(network_client_should_unregister_when_destroyed)
assertEqual(broker.clientsCount(), (size_t)0); assertEqual(broker.clientsCount(), (size_t)0);
} }
#if 0
// THESE TESTS ARE IN LOCAL MODE // THESE TESTS ARE IN LOCAL MODE
// WE HAVE TO CONVERT THEM TO WIFI MODE (pass through virtual TCP link) // WE HAVE TO CONVERT THEM TO WIFI MODE (pass through virtual TCP link)
@@ -277,8 +307,8 @@ test(network_hudge_payload)
// onPublish should have filled lastPayload and lastLength // onPublish should have filled lastPayload and lastLength
assertEqual(payload, lastPayload); assertEqual(payload, lastPayload);
assertEqual(lastLength, strlen(payload)); assertEqual(lastLength, strlen(payload));
assertEqual(strcmp(payload, lastPayload), 0);
} }
#endif
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
// setup() and loop() // setup() and loop()
@@ -286,9 +316,10 @@ void setup() {
/* delay(1000); /* delay(1000);
Serial.begin(115200); Serial.begin(115200);
while(!Serial); while(!Serial);
*/
Serial.println("=============[ FAKE NETWORK TinyMqtt TESTS ]========================"); Serial.println("=============[ FAKE NETWORK TinyMqtt TESTS ]========================");
*/
WiFi.mode(WIFI_STA); WiFi.mode(WIFI_STA);
WiFi.begin("network", "password"); WiFi.begin("network", "password");
} }