diff --git a/examples/client-with-wifi/client-with-wifi.ino b/examples/client-with-wifi/client-with-wifi.ino index d8c5ba7..1f6cce6 100644 --- a/examples/client-with-wifi/client-with-wifi.ino +++ b/examples/client-with-wifi/client-with-wifi.ino @@ -1,22 +1,31 @@ #include // https://github.com/hsaturn/TinyMqtt /** - * Local broker that accept connections and two local clients - * + * + * +-----------------------------+ + * | ESP | + * | +--------+ | 1883 <--- External client/s + * | +-------->| broker | | 1883 <--- External client/s + * | | +--------+ | + * | | ^ | + * | | | | + * | v v | + * | +----------+ +----------+ | + * | | internal | | internal | | + * | | client | | client | | + * | +----------+ +----------+ | + * | | + * +-----------------------------+ + * * pros - Reduces internal latency (when publish is received by the same ESP) * - Reduces wifi traffic * - No need to have an external broker - * - can still report to a 'main' broker (TODO see documentation that have to be written) - * - accepts external clients + * - can still report to a 'main' broker (TODO see documentation that have to be written) + * - accepts external clients * * cons - Takes more memory - * - a bit hard to understand - * - * This sounds crazy: a mqtt mqtt that do not need a broker ! - * The use case arise when one ESP wants to publish topics and subscribe to them at the same time. - * Without broker, the ESP won't react to its own topics. - * - * TinyMqtt mqtt allows this use case to work. + * - a bit hard to understand + * */ #include @@ -37,8 +46,8 @@ void onPublishB(const MqttClient* source, const Topic& topic, const char* payloa void setup() { Serial.begin(115200); - delay(500); - Serial << "Clients with wifi " << endl; + delay(500); + Serial << "Clients with wifi " << endl; WiFi.mode(WIFI_STA); WiFi.begin(ssid, password); @@ -47,8 +56,8 @@ void setup() Serial << "Connected to " << ssid << "IP address: " << WiFi.localIP() << endl; - broker.begin(); - + broker.begin(); + mqtt_a.setCallback(onPublishA); mqtt_a.subscribe(topic); @@ -58,30 +67,30 @@ void setup() void loop() { - broker.loop(); + broker.loop(); // Don't forget to add loop for every broker and clients mqtt_a.loop(); mqtt_b.loop(); // ============= client A publish ================ - static const int intervalA = 5000; // publishes every 5s - static uint32_t timerA = millis() + intervalA; - - if (millis() > timerA) - { - Serial << "A is publishing " << topic.c_str() << endl; - timerA += intervalA; - mqtt_a.publish(topic); - } + static const int intervalA = 5000; // publishes every 5s + static uint32_t timerA = millis() + intervalA; + + if (millis() > timerA) + { + Serial << "A is publishing " << topic.c_str() << endl; + timerA += intervalA; + mqtt_a.publish(topic); + } // ============= client B publish ================ - static const int intervalB = 7000; // will send topic each 7s - static uint32_t timerB = millis() + intervalB; - - if (millis() > timerB) - { - Serial << "B is publishing " << topic.c_str() << endl; - timerB += intervalB; - mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); - } + static const int intervalB = 7000; // will send topic each 7s + static uint32_t timerB = millis() + intervalB; + + if (millis() > timerB) + { + Serial << "B is publishing " << topic.c_str() << endl; + timerB += intervalB; + mqtt_b.publish(topic, std::string(String(15+millis()%10).c_str())); + } } diff --git a/examples/tinymqtt-test/tinymqtt-test.ino b/examples/tinymqtt-test/tinymqtt-test.ino index d8d28e7..883b338 100644 --- a/examples/tinymqtt-test/tinymqtt-test.ino +++ b/examples/tinymqtt-test/tinymqtt-test.ino @@ -138,7 +138,7 @@ std::set commands = { "auto", "broker", "blink", "client", "connect", "create", "delete", "help", "interval", "ls", "ip", "off", "on", "set", - "publish", "reset", "subscribe", "unsubscribe", "view" + "publish", "reset", "subscribe", "unsubscribe", "view", "every" }; void getCommand(std::string& search) @@ -316,22 +316,389 @@ std::map automatic::autos; bool compare(std::string s, const char* cmd) { - if (s.length()==0 or s.length()>strlen(cmd)) return false; - return strncmp(cmd, s.c_str(), s.length())==0; + uint8_t p=0; + while(s[p++]==*cmd++) + { + if (*cmd==0 or s[p]==0) return true; + if (s[p]==' ') return true; + } + return false; } using ClientFunction = void(*)(std::string& cmd, MqttClient* publish); +struct Every +{ + std::string cmd; + uint32_t ms; + uint32_t next; + + void dump() + { + auto mill=millis(); + Serial << ms << "ms [" << cmd << "] next in "; + if (mill > next) + Serial << "now"; + else + Serial << next-mill << "ms"; + } +}; + uint32_t blink_ms_on[16]; uint32_t blink_ms_off[16]; uint32_t blink_next[16]; bool blink_state[16]; int16_t blink; + +std::vector everies; + +void eval(std::string& cmd) +{ + while(cmd.length()) + { + MqttError retval = MqttOk; + + std::string s; + MqttBroker* broker = nullptr; + MqttClient* client = nullptr; + + // client.function notation + // ("a.fun " becomes "fun a ") + if (cmd.find('.') != std::string::npos && + cmd.find('.') < cmd.find(' ')) + { + s=getword(cmd, nullptr, '.'); + + if (s.length()) + { + if (clients.find(s) != clients.end()) + { + client = clients[s]; + } + else if (brokers.find(s) != brokers.end()) + { + broker = brokers[s]; + } + else + { + Serial << "Unknown class (" << s.c_str() << ")" << endl; + cmd=""; + } + } + } + + s = getword(cmd); + if (s.length()) getCommand(s); + if (s.length()==0) + {} + else if (compare(s, "delete")) + { + if (client==nullptr && broker==nullptr) + { + s = getword(cmd); + if (clients.find(s) != clients.end()) + { + client = clients[s]; + } + else if (brokers.find(s) != brokers.end()) + { + broker = brokers[s]; + } + else + Serial << "Unable to find (" << s.c_str() << ")" << endl; + } + if (client) + { + for (auto it: clients) + { + if (it.second != client) continue; + Serial << "deleted" << endl; + delete (it.second); + clients.erase(it.first); + break; + } + cmd += " ls"; + } + else if (broker) + { + for(auto it: brokers) + { + if (broker != it.second) continue; + Serial << "deleted" << endl; + delete (it.second); + brokers.erase(it.first); + break; + } + cmd += " ls"; + } + else + Serial << "Nothing to delete" << endl; + } + else if (broker) + { + if (compare(s,"connect")) + { + Serial << "NYI" << endl; + } + else if (compare(s, "view")) + { + broker->dump(); + } + } + else if (client) + { + if (compare(s,"connect")) + { + client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); + Serial << (client->connected() ? "connected." : "not connected") << endl; + } + else if (compare(s,"publish")) + { + while (cmd[0]==' ') cmd.erase(0,1); + retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length()); + cmd=""; // remove payload + } + else if (compare(s,"subscribe")) + { + client->subscribe(getword(cmd, topic.c_str())); + } + else if (compare(s, "unsubscribe")) + { + client->unsubscribe(getword(cmd, topic.c_str())); + } + else if (compare(s, "view")) + { + client->dump(); + } + } + else if (compare(s, "on")) + { + uint8_t pin=getint(cmd, 2); + pinMode(pin, OUTPUT); + digitalWrite(pin, HIGH); + } + else if (compare(s, "off")) + { + uint8_t pin=getint(cmd, 2); + pinMode(pin, OUTPUT); + digitalWrite(pin, LOW); + } + else if (compare(s, "every")) + { + uint32_t ms = getint(cmd, 0); + if (ms and cmd.length()) + { + Every every; + every.ms=ms; + every.cmd=cmd; + every.next=millis()+ms; + everies.push_back(every); + every.dump(); + Serial << endl; + cmd=""; + } + else if (ms==0 and compare(cmd, "list")) + { + getword(cmd); + Serial << "List of everies (ms=" << millis() << ")" << endl; + uint8_t count=0; + for(auto& every: everies) + { + Serial << count << ": "; + every.dump(); + Serial << endl; + count++; + } + } + else if (ms==0 and compare(cmd, "remove")) + { + getword(cmd); + int8_t every=getint(cmd, -1); + if (every==-1 and compare(cmd, "all")) + { + getword(cmd); + everies.clear(); + } + else if (everies.size() > (uint8_t)every) + { + everies.erase(everies.begin()+every); + } + } + } + else if (compare(s, "blink")) + { + int8_t blink_nr = getint(cmd, -1); + if (blink_nr >= 0) + { + blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); + blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); + pinMode(blink_nr, OUTPUT); + blink_next[blink_nr] = millis(); + Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl; + if (blink_ms_on[blink_nr]) + blink |= 1<< blink_nr; + else + { + blink &= ~(1<< blink_nr); + } + } + } + else if (compare(s, "auto")) + { + automatic::command(client, cmd); + if (client == nullptr) + cmd.clear(); + } + else if (compare(s, "broker")) + { + std::string id=getword(cmd); + if (id.length() or brokers.find(id)!=brokers.end()) + { + int port=getint(cmd, 0); + if (port) + { + MqttBroker* broker = new MqttBroker(port); + broker->begin(); + + brokers[id] = broker; + Serial << "new broker (" << id.c_str() << ")" << endl; + } + else + Serial << "Missing port" << endl; + } + else + Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl; + cmd+=" ls"; + } + else if (compare(s, "client")) + { + std::string id=getword(cmd); + if (id.length() or clients.find(id)!=clients.end()) + { + s=getword(cmd); // broker name + if (s=="" or brokers.find(s) != brokers.end()) + { + MqttBroker* broker = nullptr; + if (s.length()) broker = brokers[s]; + MqttClient* client = new MqttClient(broker); + client->id(id); + clients[id]=client; + client->setCallback(onPublish); + client->subscribe(topic); + Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl; + } + else if (s.length()) + { + Serial << " not found." << endl; + } + } + else + Serial << "Missing or existing client name" << endl; + cmd+=" ls"; + } + else if (compare(s, "set")) + { + std::string name(getword(cmd)); + if (name.length()==0) + { + for(auto it: vars) + { + Serial << " " << it.first << " -> " << it.second << endl; + } + } + else if (commands.find(name) != commands.end()) + { + Serial << "Reserved keyword (" << name << ")" << endl; + cmd.clear(); + } + else + { + if (cmd.length()) + { + vars[name] = cmd; + cmd.clear(); + } + else if (vars.find(name) != vars.end()) + vars.erase(vars.find(name)); + } + } + else if (compare(s, "ls") or compare(s, "view")) + { + Serial << "--< " << clients.size() << " client/s. >--" << endl; + for(auto it: clients) + { + Serial << " "; it.second->dump(); + } + + Serial << "--< " << brokers.size() << " brokers/s. >--" << endl; + for(auto it: brokers) + { + Serial << " ==[ Broker: " << it.first.c_str() << " ]== "; + it.second->dump(); + } + } + else if (compare(s, "reset")) + ESP.restart(); + else if (compare(s, "ip")) + Serial << "IP: " << WiFi.localIP() << endl; + else if (compare(s,"help")) + { + Serial << "syntax:" << endl; + Serial << " MqttBroker:" << endl; + Serial << " broker {name} {port} : create a new broker" << endl; + Serial << endl; + Serial << " MqttClient:" << endl; + Serial << " client {name} {parent broker} : create a client then" << endl; + Serial << " name.connect [ip] [port] [alive]" << endl; + Serial << " name.[un]subscribe [topic]" << endl; + Serial << " name.publish [topic][payload]" << endl; + Serial << " name.view" << endl; + Serial << " name.delete" << endl; + + automatic::help(); + Serial << endl; + Serial << " help" << endl; + Serial << " blink [Dx on_ms off_ms]" << endl; + Serial << " ls / ip / reset" << endl; + Serial << " set [name][value]" << endl; + Serial << " ! repeat last command" << endl; + Serial << endl; + Serial << " every ms [command]; every list; every remove [nr|all]" << endl; + Serial << " on {output}; off {output}" << endl; + Serial << " $id : name of the client." << endl; + Serial << " default topic is '" << topic.c_str() << "'" << endl; + Serial << endl; + } + else + { + while(s[0]==' ') s.erase(0,1); + if (s.length()) + Serial << "Unknown command (" << s.c_str() << ")" << endl; + } + + if (retval != MqttOk) + { + Serial << "## ERROR " << retval << endl; + } + } +} + void loop() { auto ms=millis(); int8_t out=0; int16_t blink_bits = blink; + + for(auto& every: everies) + { + if (every.ms && every.cmd.length() && ms > every.next) + { + std::string cmd(every.cmd); + eval(cmd); + every.next += every.ms; + } + } + while(blink_bits) { if (blink_ms_on[out] and ms > blink_next[out]) @@ -375,7 +742,6 @@ void loop() if (c==10 or c==14) { - Serial << "----------------[ " << cmd.c_str() << " ]--------------" << endl; static std::string last_cmd; if (cmd=="!") @@ -384,289 +750,7 @@ void loop() last_cmd=cmd; if (cmd.substr(0,3)!="set") replaceVars(cmd); - - while(cmd.length()) - { - std::string cmd_end; - if (cmd.find(';') != std::string::npos) - { - cmd_end = cmd; - cmd = getword(cmd_end,"",';'); - } - MqttError retval = MqttOk; - - std::string s; - MqttBroker* broker = nullptr; - MqttClient* client = nullptr; - - // client.function notation - // ("a.fun " becomes "fun a ") - if (cmd.find('.') != std::string::npos && - cmd.find('.') < cmd.find(' ')) - { - s=getword(cmd, nullptr, '.'); - - if (s.length()) - { - if (clients.find(s) != clients.end()) - { - client = clients[s]; - } - else if (brokers.find(s) != brokers.end()) - { - broker = brokers[s]; - } - else - { - Serial << "Unknown class (" << s.c_str() << ")" << endl; - cmd=""; - } - } - } - - s = getword(cmd); - if (s.length()) getCommand(s); - if (s.length()==0) - {} - else if (compare(s, "delete")) - { - if (client==nullptr && broker==nullptr) - { - s = getword(cmd); - if (clients.find(s) != clients.end()) - { - client = clients[s]; - } - else if (brokers.find(s) != brokers.end()) - { - broker = brokers[s]; - } - else - Serial << "Unable to find (" << s.c_str() << ")" << endl; - } - if (client) - { - for (auto it: clients) - { - if (it.second != client) continue; - Serial << "deleted" << endl; - delete (it.second); - clients.erase(it.first); - break; - } - cmd += " ls"; - } - else if (broker) - { - for(auto it: brokers) - { - if (broker != it.second) continue; - Serial << "deleted" << endl; - delete (it.second); - brokers.erase(it.first); - break; - } - cmd += " ls"; - } - else - Serial << "Nothing to delete" << endl; - } - else if (broker) - { - if (compare(s,"connect")) - { - Serial << "NYI" << endl; - } - else if (compare(s, "view")) - { - broker->dump(); - } - } - else if (client) - { - if (compare(s,"connect")) - { - client->connect(getip(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60)); - Serial << (client->connected() ? "connected." : "not connected") << endl; - } - else if (compare(s,"publish")) - { - while (cmd[0]==' ') cmd.erase(0,1); - retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length()); - cmd=""; // remove payload - } - else if (compare(s,"subscribe")) - { - client->subscribe(getword(cmd, topic.c_str())); - } - else if (compare(s, "unsubscribe")) - { - client->unsubscribe(getword(cmd, topic.c_str())); - } - else if (compare(s, "view")) - { - client->dump(); - } - } - else if (compare(s, "blink")) - { - int8_t blink_nr = getint(cmd, -1); - if (blink_nr >= 0) - { - blink_ms_on[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); - blink_ms_off[blink_nr]=getint(cmd, blink_ms_on[blink_nr]); - pinMode(blink_nr, OUTPUT); - blink_next[blink_nr] = millis(); - Serial << "Blink " << blink_nr << ' ' << (blink_ms_on[blink_nr] ? "on" : "off") << endl; - if (blink_ms_on[blink_nr]) - blink |= 1<< blink_nr; - else - { - blink &= ~(1<< blink_nr); - } - } - } - else if (compare(s, "auto")) - { - automatic::command(client, cmd); - if (client == nullptr) - cmd.clear(); - } - else if (compare(s, "broker")) - { - std::string id=getword(cmd); - if (id.length() or brokers.find(id)!=brokers.end()) - { - int port=getint(cmd, 0); - if (port) - { - MqttBroker* broker = new MqttBroker(port); - broker->begin(); - - brokers[id] = broker; - Serial << "new broker (" << id.c_str() << ")" << endl; - } - else - Serial << "Missing port" << endl; - } - else - Serial << "Missing or existing broker name (" << id.c_str() << ")" << endl; - cmd+=" ls"; - } - else if (compare(s, "client")) - { - std::string id=getword(cmd); - if (id.length() or clients.find(id)!=clients.end()) - { - s=getword(cmd); // broker name - if (s=="" or brokers.find(s) != brokers.end()) - { - MqttBroker* broker = nullptr; - if (s.length()) broker = brokers[s]; - MqttClient* client = new MqttClient(broker); - client->id(id); - clients[id]=client; - client->setCallback(onPublish); - client->subscribe(topic); - Serial << "new client (" << id.c_str() << ", " << s.c_str() << ')' << endl; - } - else if (s.length()) - { - Serial << " not found." << endl; - } - } - else - Serial << "Missing or existing client name" << endl; - cmd+=" ls"; - } - else if (compare(s, "set")) - { - std::string name(getword(cmd)); - if (name.length()==0) - { - for(auto it: vars) - { - Serial << " " << it.first << " -> " << it.second << endl; - } - } - else if (commands.find(name) != commands.end()) - { - Serial << "Reserved keyword (" << name << ")" << endl; - cmd.clear(); - } - else - { - if (cmd.length()) - { - vars[name] = cmd; - cmd.clear(); - } - else if (vars.find(name) != vars.end()) - vars.erase(vars.find(name)); - } - } - else if (compare(s, "ls") or compare(s, "view")) - { - Serial << "--< " << clients.size() << " client/s. >--" << endl; - for(auto it: clients) - { - Serial << " "; it.second->dump(); - } - - Serial << "--< " << brokers.size() << " brokers/s. >--" << endl; - for(auto it: brokers) - { - Serial << " ==[ Broker: " << it.first.c_str() << " ]== "; - it.second->dump(); - } - } - else if (compare(s, "reset")) - ESP.restart(); - else if (compare(s, "ip")) - Serial << "IP: " << WiFi.localIP() << endl; - else if (compare(s,"help")) - { - Serial << "syntax: instr; instr; ..." << endl; - Serial << " MqttBroker:" << endl; - Serial << " broker {name} {port} : create a new broker" << endl; - Serial << endl; - Serial << " MqttClient:" << endl; - Serial << " client {name} {parent broker} : create a client then" << endl; - Serial << " name.connect [ip] [port] [alive]" << endl; - Serial << " name.[un]subscribe [topic]" << endl; - Serial << " name.publish [topic][payload]" << endl; - Serial << " name.view" << endl; - Serial << " name.delete" << endl; - - automatic::help(); - Serial << endl; - Serial << " help" << endl; - Serial << " blink [Dx on_ms off_ms]" << endl; - Serial << " ls / ip / reset" << endl; - Serial << " set [name][value]" << endl; - Serial << " ! repeat last command" << endl; - Serial << endl; - Serial << " $id : name of the client." << endl; - Serial << " default topic is '" << topic.c_str() << "'" << endl; - Serial << endl; - } - else - { - while(s[0]==' ') s.erase(0,1); - if (s.length()) - Serial << "Unknown command (" << s.c_str() << ")" << endl; - } - - if (retval != MqttOk) - { - Serial << "## ERROR " << retval << endl; - } - if (cmd_end.length()) - { - cmd = cmd_end; - cmd_end = ""; - } - } + eval(cmd); } else { diff --git a/src/TinyMqtt.cpp b/src/TinyMqtt.cpp index 40ddef4..f6fe9fe 100644 --- a/src/TinyMqtt.cpp +++ b/src/TinyMqtt.cpp @@ -11,8 +11,10 @@ void outstring(const char* prefix, const char*p, uint16_t len) MqttBroker::MqttBroker(uint16_t port) { - server = new AsyncServer(port); + server = new TcpServer(port); +#ifdef TCP_ASYNC server->onClient(onClient, this); +#endif } MqttBroker::~MqttBroker() @@ -25,12 +27,17 @@ MqttBroker::~MqttBroker() } // private constructor used by broker only -MqttClient::MqttClient(MqttBroker* parent, AsyncClient* new_client) - : parent(parent), client(new_client) +MqttClient::MqttClient(MqttBroker* parent, TcpClient* new_client) + : parent(parent) { +#ifdef TCP_ASYNC + client = new_client; client->onData(onData, this); // client->onConnect() TODO // client->onDisconnect() TODO +#else + client = new WiFiClient(*new_client); +#endif alive = millis()+5000; // client expires after 5s if no CONNECT msg } @@ -75,11 +82,19 @@ void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka) keep_alive = ka; close(); if (client) delete client; - client = new AsyncClient; + client = new TcpClient; + + debug("Trying to connect to " << broker.c_str() << ':' << port); +#ifdef TCP_ASYNC client->onData(onData, this); client->onConnect(onConnect, this); - debug("Trying to connect to " << broker.c_str() << ':' << port); client->connect(broker.c_str(), port); +#else + if (client->connect(broker.c_str(), port)) + { + onConnect(this, client); + } +#endif } void MqttBroker::addClient(MqttClient* client) @@ -115,16 +130,24 @@ void MqttBroker::removeClient(MqttClient* remove) debug("Error cannot remove client"); // TODO should not occur } -void MqttBroker::onClient(void* broker_ptr, AsyncClient* client) +void MqttBroker::onClient(void* broker_ptr, TcpClient* client) { MqttBroker* broker = static_cast(broker_ptr); broker->addClient(new MqttClient(broker, client)); - debug("New client #" << broker->clients.size()); + debug("New client"); } void MqttBroker::loop() { +#ifndef TCP_ASYNC + WiFiClient client = server->available(); + + if (client) + { + onClient(this, &client); + } +#endif if (broker) { // TODO should monitor broker's activity. @@ -144,7 +167,7 @@ void MqttBroker::loop() } else { - debug("Client " << client->id().c_str() << " Disconnected, parent=" << (int32_t)client->parent); + debug("Client " << client->id().c_str() << " Disconnected, parent=" << (dbg_ptr)client->parent); // Note: deleting a client not added by the broker itself will probably crash later. delete client; break; @@ -248,9 +271,20 @@ void MqttClient::loop() // there is no need to send one PingReq per instance. } } +#ifndef TCP_ASYNC + while(client && client->available()>0) + { + message.incoming(client->read()); + if (message.type()) + { + processMessage(&message); + message.reset(); + } + } +#endif } -void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) +void MqttClient::onConnect(void *mqttclient_ptr, TcpClient*) { MqttClient* mqtt = static_cast(mqttclient_ptr); debug("cnx: connecting"); @@ -265,12 +299,13 @@ void MqttClient::onConnect(void *mqttclient_ptr, AsyncClient*) debug("cnx: mqtt connecting"); msg.sendTo(mqtt); msg.reset(); - debug("cnx: mqtt sent " << (int32_t)mqtt->parent); + debug("cnx: mqtt sent " << (dbg_ptr)mqtt->parent); mqtt->clientAlive(0); } -void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) +#ifdef TCP_ASYNC +void MqttClient::onData(void* client_ptr, TcpClient*, void* data, size_t len) { char* char_ptr = static_cast(data); MqttClient* client=static_cast(client_ptr); @@ -285,6 +320,7 @@ void MqttClient::onData(void* client_ptr, AsyncClient*, void* data, size_t len) len--; } } +#endif void MqttClient::resubscribe() { @@ -361,7 +397,7 @@ void MqttClient::processMessage(const MqttMessage* mesg) #ifdef TINY_MQTT_DEBUG if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp) { - Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; + Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl; // mesg->hexdump("Incoming"); } #endif diff --git a/src/TinyMqtt.h b/src/TinyMqtt.h index c5d5964..ed96fb3 100644 --- a/src/TinyMqtt.h +++ b/src/TinyMqtt.h @@ -1,14 +1,28 @@ #pragma once -#ifdef ESP8266 - #include + +// TODO Should add a AUnit with both TCP_ASYNC and not TCP_ASYNC +// #define TCP_ASYNC // Uncomment this to use ESPAsyncTCP instead of normal cnx + +#if defined(ESP8266) || defined(EPOXY_DUINO) + #ifdef TCP_ASYNC + #include + #else + #include + #endif #elif defined(ESP32) - #include - #include // https://github.com/me-no-dev/AsyncTCP -#elif defined(EPOXY_DUINO) - #include + #ifdef TCP_ASYNC + #include // https://github.com/me-no-dev/AsyncTCP + #else + #include + #endif #else #error "Unsupported platform" #endif +#ifdef EPOXY_DUINO + #define dbg_ptr uint64_t +#else + #define dbg_ptr uint32_t +#endif #include #include #include @@ -23,6 +37,14 @@ #define debug(what) {} #endif +#ifdef TCP_ASYNC + using TcpClient = AsyncClient; + using TcpServer = AsyncServer; +#else + using TcpClient = WiFiClient; + using TcpServer = WiFiServer; +#endif + enum MqttError { MqttOk = 0, @@ -57,7 +79,7 @@ class MqttMessage Subscribe = 0x80, SubAck = 0x90, UnSubscribe = 0xA0, - UnSuback = 0xB0, + UnSuback = 0xB0, PingReq = 0xC0, PingResp = 0xD0, Disconnect = 0xE0 @@ -192,14 +214,15 @@ class MqttClient static long counter; private: - - static void onConnect(void * client_ptr, AsyncClient*); - static void onData(void* client_ptr, AsyncClient*, void* data, size_t len); + static void onConnect(void * client_ptr, TcpClient*); +#ifdef TCP_ASYNC + static void onData(void* client_ptr, TcpClient*, void* data, size_t len); +#endif MqttError sendTopic(const Topic& topic, MqttMessage::Type type, uint8_t qos); void resubscribe(); friend class MqttBroker; - MqttClient(MqttBroker* parent, AsyncClient* client); + MqttClient(MqttBroker* parent, TcpClient* client); // republish a received publish if topic matches any in subscriptions MqttError publishIfSubscribed(const Topic& topic, const MqttMessage& msg); @@ -217,7 +240,7 @@ class MqttClient // (this is the case when MqttBroker isn't used except here) MqttBroker* parent=nullptr; // connection to local broker - AsyncClient* client=nullptr; // connection to mqtt client or to remote broker + TcpClient* client=nullptr; // connection to mqtt client or to remote broker std::set subscriptions; std::string clientId; CallBack callback = nullptr; @@ -257,7 +280,7 @@ class MqttBroker private: friend class MqttClient; - static void onClient(void*, AsyncClient*); + static void onClient(void*, TcpClient*); bool checkUser(const char* user, uint8_t len) const { return compareString(auth_user, user, len); } @@ -275,7 +298,7 @@ class MqttBroker bool compareString(const char* good, const char* str, uint8_t str_len) const; std::vector clients; - AsyncServer* server; + TcpServer* server; const char* auth_user = "guest"; const char* auth_password = "guest";