Compare commits

...

11 Commits
0.4.0 ... 0.5.0

Author SHA1 Message Date
hsaturn
d92aa1fe3c Minor change to publish 2021-03-24 21:20:07 +01:00
hsaturn
0d6e194560 test client enhancements 2021-03-24 21:19:44 +01:00
hsaturn
7107da2cce Client supports (does not disconnect) Suback / Puback 2021-03-24 21:18:27 +01:00
hsaturn
28c8713415 Client keep_alive is now parameterized 2021-03-24 21:17:08 +01:00
hsaturn
70cf8137de Fixed build of client-without-wifi 2021-03-24 18:35:57 +01:00
hsaturn
5ab315e472 Removed dependency with Streaming.h 2021-03-24 18:35:11 +01:00
hsaturn
b96b36f10c README update 2021-03-24 01:33:45 +01:00
hsaturn
ba831ea366 README update 2021-03-24 01:32:47 +01:00
hsaturn
4020393f90 MqttClient can subscribe and receive publishes from distant broker 2021-03-24 01:30:56 +01:00
hsaturn
7b20e7deb5 Supports multiple subscriptions 2021-03-23 23:51:33 +01:00
hsaturn
efe6a05bbd MqttStreaming.h, streaming with fixes 2021-03-23 23:41:00 +01:00
12 changed files with 603 additions and 62 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
*~ *~
src/my_credentials.h

View File

@@ -23,16 +23,16 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
* Implement zeroconf mode (needs async) * Implement zeroconf mode (needs async)
* Add a max_clients in MqttBroker. Used with zeroconf, there will be * Add a max_clients in MqttBroker. Used with zeroconf, there will be
no need for having tons of clients (also RAM is the problem with many clients) no need for having tons of clients (also RAM is the problem with many clients)
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical. * Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
* MqttMessage uses a buffer 256 bytes which is usually far than needed. * ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
* MqttClient auto reconnection * ~~MqttClient does not support more than one subscription at time~~
* MqttClient auto re-subscribe * MqttClient auto re-subscribe
* MqttClient auto reconnection
* MqttClient does not callback payload... * MqttClient does not callback payload...
* MqttClient user/password * MqttClient user/password
## Quickstart ## Quickstart
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt) * install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup) * modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)

View File

@@ -1,5 +1,4 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** /**
* Local broker that accept connections and two local clients * Local broker that accept connections and two local clients

View File

@@ -1,5 +1,4 @@
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** TinyMQTT allows a disconnected mode: /** TinyMQTT allows a disconnected mode:
* *
@@ -15,10 +14,10 @@ MqttBroker broker(1883);
MqttClient mqtt_a(&broker); MqttClient mqtt_a(&broker);
MqttClient mqtt_b(&broker); MqttClient mqtt_b(&broker);
void onPublishA(const Topic& topic, const char* payload, size_t length) void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> A Received " << topic.c_str() << endl; } { Serial << "--> A Received " << topic.c_str() << endl; }
void onPublishB(const Topic& topic, const char* payload, size_t length) void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
{ Serial << "--> B Received " << topic.c_str() << endl; } { Serial << "--> B Received " << topic.c_str() << endl; }
void setup() void setup()

View File

@@ -1,5 +1,4 @@
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
#include <my_credentials.h> #include <my_credentials.h>

View File

@@ -1,6 +1,5 @@
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt #include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
/** Simple Client /** Simple Client
* *
@@ -43,7 +42,11 @@ void loop()
delay(1000); delay(1000);
temp += (random(100)>50 ? 0.1 : -0.1); auto rnd=random(100);
if (rnd > 66) temp += 0.1;
else if (rnd < 33) temp -= 0.1;
client.publish("sensor/temperature", String(temp)); client.publish("sensor/temperature", String(temp));
} }

View File

@@ -0,0 +1,11 @@
// vim: ts=30
Exemple of commands that can be sent via the serial monitor to tinymqtt-test
----------------------------------------------------------------------------
client a starts a client (not connected no internal broker)
a.connect [server][port][alive] connects the client, default port=1883
a.publish topic [payload] send a topic with a payload
a.subscribe topic subscribes to a topic
delete a destroy the client

View File

@@ -1,6 +1,6 @@
#define TINY_MQTT_DEBUG #define TINY_MQTT_DEBUG
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt #include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming #include <MqttStreaming.h>
#include <map> #include <map>
/** /**
@@ -54,15 +54,15 @@ void setup()
brokers["broker"] = broker; brokers["broker"] = broker;
} }
int getint(std::string& str, const int if_empty=0, char sep=' ') int getint(std::string& str, const int if_empty=0)
{ {
std::string sword; std::string sword;
while(str.length() && str[0]!=sep) while(str.length() && str[0]>='0' && str[0]<='9')
{ {
sword += str[0]; str.erase(0,1); sword += str[0]; str.erase(0,1);
} }
while(str[0]==sep) str.erase(0,1); while(str[0]==' ') str.erase(0,1);
if (if_empty and sword.length()==0) sword=if_empty; if (if_empty and sword.length()==0) return if_empty;
return atoi(sword.c_str()); return atoi(sword.c_str());
} }
@@ -78,6 +78,7 @@ std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' '
return sword; return sword;
} }
// publish at regular interval
class automatic class automatic
{ {
public: public:
@@ -189,6 +190,7 @@ class automatic
std::string topic_; std::string topic_;
bool bon=false; bool bon=false;
static std::map<MqttClient*, automatic*> autos; static std::map<MqttClient*, automatic*> autos;
float temp=19;
}; };
std::map<MqttClient*, automatic*> automatic::autos; std::map<MqttClient*, automatic*> automatic::autos;
@@ -225,6 +227,8 @@ void loop()
last_cmd=cmd; last_cmd=cmd;
while(cmd.length()) while(cmd.length())
{ {
MqttError retval = MqttOk;
std::string s; std::string s;
MqttBroker* broker = nullptr; MqttBroker* broker = nullptr;
MqttClient* client = nullptr; MqttClient* client = nullptr;
@@ -309,16 +313,14 @@ void loop()
{ {
if (compare(s,"connect")) if (compare(s,"connect"))
{ {
client->connect(getword(cmd,"192.168.1.40").c_str(), 1883); client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
Serial << (client->connected() ? "connected." : "not connected") << endl; Serial << (client->connected() ? "connected." : "not connected") << endl;
} }
else if (compare(s,"publish")) else if (compare(s,"publish"))
{ {
auto ok=client->publish(getword(cmd, topic.c_str())); while (cmd[0]==' ') cmd.erase(0,1);
if (ok != MqttOk) retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
{ cmd=""; // remove payload
Serial << "## ERROR " << ok << endl;
}
} }
else if (compare(s,"subscribe")) else if (compare(s,"subscribe"))
{ {
@@ -409,9 +411,9 @@ void loop()
Serial << endl; Serial << endl;
Serial << " MqttClient:" << endl; Serial << " MqttClient:" << endl;
Serial << " client {name} {parent broker} : create a client then" << endl; Serial << " client {name} {parent broker} : create a client then" << endl;
Serial << " name.connect [ip]" << endl; Serial << " name.connect [ip] [port] [alive]" << endl;
Serial << " name.subscribe [topic]" << endl; Serial << " name.subscribe [topic]" << endl;
Serial << " name.publish [topic]" << endl; Serial << " name.publish [topic][payload]" << endl;
Serial << " name.view" << endl; Serial << " name.view" << endl;
Serial << " name.delete" << endl; Serial << " name.delete" << endl;
@@ -431,6 +433,11 @@ void loop()
if (s.length()) if (s.length())
Serial << "Unknown command (" << s.c_str() << ")" << endl; Serial << "Unknown command (" << s.c_str() << ")" << endl;
} }
if (retval != MqttOk)
{
Serial << "## ERROR " << retval << endl;
}
} }
} }
else else

412
src/MqttStreaming.h Normal file
View File

@@ -0,0 +1,412 @@
/* MqttStreaming.h - Fork of Streaming.h adding std::string and with some minor fixes
* (I have to speek to the author in order to include my changes to his library if possible)
**/
/*
Streaming.h - Arduino library for supporting the << streaming operator
Copyright (c) 2010-2012 Mikal Hart. All rights reserved.
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
Version 6 library changes
Copyright (c) 2019 Gazoodle. All rights reserved.
1. _BASED moved to template to remove type conversion to long and
sign changes which break int8_t and int16_t negative numbers.
The print implementation still upscales to long for it's internal
print routine.
2. _PAD added to allow padding & filling of characters to the stream
3. _WIDTH & _WIDTHZ added to allow width printing with space padding
and zero padding for numerics
4. Simple _FMT mechanism ala printf, but without the typeunsafetyness
and no internal buffers for replaceable stream printing
*/
#ifndef ARDUINO_STREAMING
#define ARDUINO_STREAMING
#if defined(ARDUINO) && ARDUINO >= 100
#include "Arduino.h"
#else
#ifndef STREAMING_CONSOLE
#include "WProgram.h"
#endif
#endif
#include <string>
#if defined(ARDUINO_ARCH_AVR) || defined(ARDUINO_ARCH_MEGAAVR)
// No stl library, so need trivial version of std::is_signed ...
namespace std {
template<typename T>
struct is_signed { static const bool value = false; };
template<>
struct is_signed<int8_t> { static const bool value = true; };
template<>
struct is_signed<int16_t> { static const bool value = true; };
template<>
struct is_signed<int32_t> { static const bool value = true; };
};
#else
#include <type_traits>
#endif
#define STREAMING_LIBRARY_VERSION 6
#if !defined(typeof)
#define typeof(x) __typeof__(x)
#endif
// PrintBuffer implementation of Print, a small buffer to print in
// see its use with pad_float()
template <size_t N>
class PrintBuffer : public Print
{
size_t pos = 0;
char str[N] {};
public:
inline const char *operator() ()
{ return str; };
// inline void clear()
// { pos = 0; str[0] = '\0'; };
inline size_t write(uint8_t c)
{ return write(&c, 1); };
inline size_t write(const uint8_t *buffer, size_t size)
{
size_t s = std::min(size, N-1 - pos); // need a /0 left
if (s)
{
memcpy(&str[pos], buffer, s);
pos += s;
}
return s;
};
};
// Generic template
template<class T>
inline Print &operator <<(Print &stream, const T &arg)
{ stream.print(arg); return stream; }
// TODO sfinae maybe could do the trick ?
inline Print &operator <<(Print &stream, const std::string &str)
{ stream.print(str.c_str()); return stream; }
template<typename T>
struct _BASED
{
T val;
int base;
_BASED(T v, int b): val(v), base(b)
{}
};
#if ARDUINO >= 100
struct _BYTE_CODE
{
byte val;
_BYTE_CODE(byte v) : val(v)
{}
};
#define _BYTE(a) _BYTE_CODE(a)
inline Print &operator <<(Print &obj, const _BYTE_CODE &arg)
{ obj.write(arg.val); return obj; }
#else
#define _BYTE(a) _BASED<typeof(a)>(a, BYTE)
#endif
#define _HEX(a) _BASED<typeof(a)>(a, HEX)
#define _DEC(a) _BASED<typeof(a)>(a, DEC)
#define _OCT(a) _BASED<typeof(a)>(a, OCT)
#define _BIN(a) _BASED<typeof(a)>(a, BIN)
// Specialization for class _BASED
// Thanks to Arduino forum user Ben Combee who suggested this
// clever technique to allow for expressions like
// Serial << _HEX(a);
template<typename T>
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
{ obj.print(arg.val, arg.base); return obj; }
#if ARDUINO >= 18
// Specialization for class _FLOAT
// Thanks to Michael Margolis for suggesting a way
// to accommodate Arduino 0018's floating point precision
// feature like this:
// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision
struct _FLOAT
{
double val; // only Print::print(double)
int digits;
_FLOAT(double v, int d): val(v), digits(d)
{}
};
inline Print &operator <<(Print &obj, const _FLOAT &arg)
{ obj.print(arg.val, arg.digits); return obj; }
#endif
// Specialization for enum _EndLineCode
// Thanks to Arduino forum user Paul V. who suggested this
// clever technique to allow for expressions like
// Serial << "Hello!" << endl;
enum _EndLineCode { endl };
inline Print &operator <<(Print &obj, _EndLineCode)
{ obj.println(); return obj; }
// Specialization for padding & filling, mainly utilized
// by the width printers
//
// Use like
// Serial << _PAD(10,' '); // Will output 10 spaces
// Serial << _PAD(4, '0'); // Will output 4 zeros
struct _PAD
{
int8_t width;
char chr;
_PAD(int8_t w, char c) : width(w), chr(c) {}
};
inline Print &operator <<(Print& stm, const _PAD &arg)
{
for(int8_t i = 0; i < arg.width; i++)
stm.print(arg.chr);
return stm;
}
// Specialization for width printing
//
// Use like Result
// -------- ------
// Serial << _WIDTH(1,5) " 1"
// Serial << _WIDTH(10,5) " 10"
// Serial << _WIDTH(100,5) " 100"
// Serial << _WIDTHZ(1,5) "00001"
//
// Great for times & dates, or hex dumps
//
// Serial << _WIDTHZ(hour,2) << ':' << _WIDTHZ(min,2) << ':' << _WIDTHZ(sec,2)
//
// for(int index=0; index<byte_array_size; index++)
// Serial << _WIDTHZ(_HEX(byte_array[index]))
template<typename T>
struct __WIDTH
{
const T val;
int8_t width;
char pad;
__WIDTH(const T& v, int8_t w, char p) : val(v), width(w), pad(p) {}
};
// Count digits in an integer of specific base
template<typename T>
inline uint8_t digits(T v, int8_t base = 10)
{
uint8_t digits = 0;
if ( std::is_signed<T>::value )
{
if ( v < 0 )
{
digits++;
v = -v; // v needs to be postive for the digits counter to work
}
}
do
{
v /= base;
digits++;
} while( v > 0 );
return digits;
}
// Generic get the width of a value in base 10
template<typename T>
inline uint8_t get_value_width(T val)
{ return digits(val); }
inline uint8_t get_value_width(const char * val)
{ return strlen(val); }
#ifdef ARDUINO
inline uint8_t get_value_width(const __FlashStringHelper * val)
{ return strlen_P(reinterpret_cast<const char *>(val)); }
#endif
// _BASED<T> get the width of a value
template<typename T>
inline uint8_t get_value_width(_BASED<T> b)
{ return digits(b.val, b.base); }
// Constructor wrapper to allow automatic template parameter deduction
template<typename T>
__WIDTH<T> _WIDTH(T val, int8_t width) { return __WIDTH<T>(val, width, ' '); }
template<typename T>
__WIDTH<T> _WIDTHZ(T val, int8_t width) { return __WIDTH<T>(val, width, '0'); }
// Operator overload to handle width printing.
template<typename T>
inline Print &operator <<(Print &stm, const __WIDTH<T> &arg)
{ stm << _PAD(arg.width - get_value_width(arg.val), arg.pad) << arg.val; return stm; }
// explicit Operator overload to handle width printing of _FLOAT, double and float
template<typename T>
inline Print &pad_float(Print &stm, const __WIDTH<T> &arg, const double val, const int digits = 2) // see Print::print(double, int = 2)
{
PrintBuffer<32> buf; // it's only ~45B on the stack, no allocation, leak or fragmentation
size_t size = buf.print(val, digits); // print in buf
return stm << _PAD(arg.width - size, arg.pad) << buf(); // pad and concat what's in buf
}
inline Print &operator <<(Print &stm, const __WIDTH<float> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<double> &arg)
{ return pad_float(stm, arg, arg.val); }
inline Print &operator <<(Print &stm, const __WIDTH<_FLOAT> &arg)
{ auto& f = arg.val; return pad_float(stm, arg, f.val, f.digits); }
// a less verbose _FLOATW for _WIDTH(_FLOAT)
#define _FLOATW(val, digits, width) _WIDTH<_FLOAT>(_FLOAT((val), (digits)), (width))
// Specialization for replacement formatting
//
// Designed to be similar to printf that everyone knows and loves/hates. But without
// the internal buffers and type agnosticism. This version only has placeholders in
// the format string, the actual values are supplied using the stream safe operators
// defined in this library.
//
// Use like this:
//
// Serial << FMT(F("Replace % with %"), 1, 2 )
// Serial << FMT("Time is %:%:%", _WIDTHZ(hours,2), _WIDTHZ(minutes,2), _WIDTHZ(seconds,2))
// Serial << FMT("Your score is %\\%", score); // Note the \\ to escape the % sign
// Ok, hold your hats. This is a foray into C++11's variadic template engine ...
inline char get_next_format_char(const char *& format_string)
{
char format_char = *format_string;
if ( format_char > 0 ) format_string++;
return format_char;
}
#ifdef ARDUINO
inline char get_next_format_char(const __FlashStringHelper*& format_string)
{
char format_char = pgm_read_byte(format_string);
if ( format_char > 0 ) format_string = reinterpret_cast<const __FlashStringHelper*>(reinterpret_cast<const char *>(format_string)+1);
return format_char;
}
#endif
template<typename Ft>
inline bool check_backslash(char& format_char, Ft& format_string)
{
if ( format_char == '\\')
{
format_char = get_next_format_char(format_string);
return true;
}
return false;
}
// The template tail printer helper
template<typename Ft, typename... Ts>
struct __FMT
{
Ft format_string;
__FMT(Ft f, Ts ... args) : format_string(f) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
check_backslash(c, format);
if ( c )
stm.print(c);
}
}
};
// The variadic template helper
template<typename Ft, typename T, typename... Ts>
struct __FMT<Ft, T, Ts...> : __FMT<Ft, Ts...>
{
T val;
__FMT(Ft f, T t, Ts... ts) : __FMT<Ft, Ts...>(f, ts...), val(t) {}
inline void tstreamf(Print& stm, Ft format) const
{
while(char c = get_next_format_char(format))
{
if (!check_backslash(c, format))
{
if ( c == '%')
{
stm << val;
// Variadic recursion ... compiler rolls this out during
// template argument pack expansion
__FMT<Ft, Ts...>::tstreamf(stm, format);
return;
}
}
if (c)
stm.print(c);
}
}
};
// The actual operator should you only instanciate the FMT
// helper with a format string and no parameters
template<typename Ft, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// The variadic stream helper
template<typename Ft, typename T, typename... Ts>
inline Print& operator <<(Print &stm, const __FMT<Ft, T, Ts...> &args)
{
args.tstreamf(stm, args.format_string);
return stm;
}
// As we don't have C++17, we can't get a constructor to use
// automatic argument deduction, but ... this little trick gets
// around that ...
template<typename Ft, typename... Ts>
__FMT<Ft, Ts...> _FMT(Ft format, Ts ... args) { return __FMT<Ft, Ts...>(format, args...); }
#endif

View File

@@ -2,7 +2,6 @@
#include <map> #include <map>
#include <string> #include <string>
#include <string.h> #include <string.h>
// #include <Streaming.h>
#include <ESP8266WiFi.h> #include <ESP8266WiFi.h>
/*** /***

View File

@@ -1,6 +1,5 @@
#include "TinyMqtt.h" #include "TinyMqtt.h"
#include <sstream> #include <sstream>
#include <Streaming.h>
void outstring(const char* prefix, const char*p, uint16_t len) void outstring(const char* prefix, const char*p, uint16_t len)
{ {
@@ -60,7 +59,7 @@ void MqttClient::close()
} }
} }
void MqttClient::connect(std::string broker, uint16_t port) void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
{ {
debug("cnx: closing"); debug("cnx: closing");
close(); close();
@@ -75,13 +74,15 @@ void MqttClient::connect(std::string broker, uint16_t port)
message.add(0x4); // Mqtt protocol version 3.1.1 message.add(0x4); // Mqtt protocol version 3.1.1
message.add(0x0); // Connect flags TODO user / name message.add(0x0); // Connect flags TODO user / name
keep_alive = 1; // TODO not configurable keep_alive = ka; // TODO not configurable
message.add(0x00); // keep_alive message.add(0x00); // keep_alive
message.add((char)keep_alive); message.add((char)keep_alive);
message.add(clientId); message.add(clientId);
debug("cnx: mqtt connecting"); debug("cnx: mqtt connecting");
message.sendTo(this); message.sendTo(this);
message.reset();
debug("cnx: mqtt sent " << (int32_t)parent); debug("cnx: mqtt sent " << (int32_t)parent);
clientAlive(0); clientAlive(0);
} }
} }
@@ -146,16 +147,19 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
{ {
i++; i++;
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") << Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected(); " srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
bool doit = false; bool doit = false;
if (broker && broker->connected()) // Connected: R2 R3 R5 R6 if (broker && broker->connected()) // Broker is connected
{ {
// ext broker -> clients or // ext broker -> clients or
// or clients -> ext broker // or clients -> ext broker
if (source == broker) // broker -> clients if (source == broker) // broker -> clients
doit = true; doit = true;
else // clients -> broker else // clients -> broker
retval=broker->publish(topic, msg); {
MqttError ret = broker->publish(topic, msg);
if (ret != MqttOk) retval = ret;
}
} }
else // Disconnected: R7 else // Disconnected: R7
{ {
@@ -164,7 +168,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
} }
Serial << ", doit=" << doit << ' '; Serial << ", doit=" << doit << ' ';
if (doit) client->publish(topic, msg); if (doit) retval = client->publish(topic, msg);
debug(""); debug("");
} }
return retval; return retval;
@@ -208,6 +212,7 @@ void MqttClient::loop()
} }
else if (client && client->connected()) else if (client && client->connected())
{ {
debug("pingreq");
uint16_t pingreq = MqttMessage::Type::PingReq; uint16_t pingreq = MqttMessage::Type::PingReq;
client->write((uint8_t*)(&pingreq), 2); client->write((uint8_t*)(&pingreq), 2);
clientAlive(0); clientAlive(0);
@@ -227,11 +232,40 @@ void MqttClient::loop()
} }
} }
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
{
debug("subsribe(" << topic.c_str() << ")");
MqttError ret = MqttOk;
subscriptions.insert(topic);
if (parent==nullptr) // remote broker ?
{
debug("remote subscribe");
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
// TODO manage packet identifier
msg.add(0);
msg.add(0);
msg.add(topic.str());
msg.add(qos);
ret = msg.sendTo(this);
// TODO we should wait (state machine) for SUBACK
}
return ret;
}
void MqttClient::processMessage() void MqttClient::processMessage()
{ {
std::string error; std::string error;
std::string s; std::string s;
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl; if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
{
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
message.hexdump("Incoming");
}
auto header = message.getVHeader(); auto header = message.getVHeader();
const char* payload; const char* payload;
uint16_t len; uint16_t len;
@@ -312,6 +346,24 @@ void MqttClient::processMessage()
message.sendTo(this); message.sendTo(this);
break; break;
case MqttMessage::Type::Connack:
// TODO what more on connack ?
mqtt_connected = true;
bclose = false;
break;
case MqttMessage::Type::Suback:
case MqttMessage::Type::Puback:
if (!mqtt_connected) break;
// Ignore acks
bclose = false;
break;
case MqttMessage::Type::PingResp:
// TODO: no PingResp is suspicious (server dead)
bclose = false;
break;
case MqttMessage::Type::PingReq: case MqttMessage::Type::PingReq:
if (!mqtt_connected) break; if (!mqtt_connected) break;
if (client) if (client)
@@ -327,14 +379,26 @@ void MqttClient::processMessage()
break; break;
case MqttMessage::Type::Subscribe: case MqttMessage::Type::Subscribe:
if (!mqtt_connected) break; {
payload = header+2; if (!mqtt_connected) break;
message.getString(payload, len); // Topic payload = header+2;
outstring("Subscribes", payload, len);
debug("subscribe loop");
subscribe(Topic(payload, len)); while(payload < message.end())
bclose = false; {
// TODO SUBACK message.getString(payload, len); // Topic
debug( " topic (" << std::string(payload, len) << ')');
outstring("Subscribes", payload, len);
// subscribe(Topic(payload, len));
subscriptions.insert(Topic(payload, len));
payload += len;
uint8_t qos = *payload++;
debug(" qos=" << qos);
}
debug("end loop");
bclose = false;
// TODO SUBACK
}
break; break;
case MqttMessage::Type::Publish: case MqttMessage::Type::Publish:
@@ -351,8 +415,15 @@ void MqttClient::processMessage()
if (qos) payload+=2; // ignore packet identifier if any if (qos) payload+=2; // ignore packet identifier if any
// TODO reset DUP // TODO reset DUP
// TODO reset RETAIN // TODO reset RETAIN
debug("publishing to parent"); if (parent)
parent->publish(this, published, message); {
debug("publishing to parent");
parent->publish(this, published, message);
}
else if (callback && subscriptions.find(published)!=subscriptions.end())
{
callback(this, published, nullptr, 0); // TODO send the real payload
}
// TODO should send PUBACK // TODO should send PUBACK
bclose = false; bclose = false;
} }
@@ -376,7 +447,7 @@ void MqttClient::processMessage()
} }
else else
{ {
clientAlive(5); clientAlive(parent ? 5 : 0);
} }
message.reset(); message.reset();
} }
@@ -398,7 +469,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
if (parent) if (parent)
return parent->publish(this, topic, msg); return parent->publish(this, topic, msg);
else if (client) else if (client)
msg.sendTo(this); return msg.sendTo(this);
else else
return MqttNowhereToSend; return MqttNowhereToSend;
} }
@@ -417,7 +488,7 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
Serial << " match/send"; Serial << " match/send";
if (client) if (client)
{ {
msg.sendTo(this); retval = msg.sendTo(this);
} }
else if (callback) else if (callback)
{ {
@@ -506,28 +577,58 @@ void MqttMessage::encodeLength(char* msb, int length)
} while (length); } while (length);
}; };
void MqttMessage::sendTo(MqttClient* client) MqttError MqttMessage::sendTo(MqttClient* client)
{ {
if (buffer.size()>2) if (buffer.size()>2)
{ {
debug("sending " << buffer.size() << " bytes");
encodeLength(&buffer[1], buffer.size()-2); encodeLength(&buffer[1], buffer.size()-2);
// hexdump("snd"); hexdump("snd");
client->write(&buffer[0], buffer.size()); client->write(&buffer[0], buffer.size());
} }
else else
{ {
Serial << "??? Invalid send" << endl; debug("??? Invalid send");
return MqttInvalidMessage;
} }
return MqttOk;
} }
void MqttMessage::hexdump(const char* prefix) const void MqttMessage::hexdump(const char* prefix) const
{ {
if (prefix) Serial << prefix << ' '; uint16_t addr=0;
Serial << "size(" << buffer.size() << ") : "; const int bytes_per_row = 8;
for(const char chr: buffer) const char* hex_to_str = " | ";
const char* separator = hex_to_str;
const char* half_sep = " - ";
std::string ascii;
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
for(const char chr: buffer)
{
if ((addr % bytes_per_row) == 0)
{ {
if (chr<16) Serial << '0'; if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
Serial << _HEX(chr) << ' '; if (prefix) Serial << prefix << separator;
ascii.clear();
} }
Serial << endl; addr++;
if (chr<16) Serial << '0';
Serial << _HEX(chr) << ' ';
ascii += (chr<32 ? '.' : chr);
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
}
if (ascii.length())
{
while(ascii.length() < bytes_per_row+strlen(half_sep))
{
Serial << " "; // spaces per hexa byte
ascii += ' ';
}
Serial << hex_to_str << ascii << separator;
}
Serial << endl;
} }

View File

@@ -3,9 +3,11 @@
#include <set> #include <set>
#include <string> #include <string>
#include "StringIndexer.h" #include "StringIndexer.h"
#include <MqttStreaming.h>
#define TINY_MQTT_DEBUG
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
#include <Streaming.h>
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); } #define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
#else #else
#define debug(what) {} #define debug(what) {}
@@ -15,6 +17,7 @@ enum MqttError
{ {
MqttOk = 0, MqttOk = 0,
MqttNowhereToSend=1, MqttNowhereToSend=1,
MqttInvalidMessage=2,
}; };
class Topic : public IndexedString class Topic : public IndexedString
@@ -42,6 +45,7 @@ class MqttMessage
Publish = 0x30, Publish = 0x30,
PubAck = 0x40, PubAck = 0x40,
Subscribe = 0x80, Subscribe = 0x80,
Suback = 0x90,
PingReq = 0xC0, PingReq = 0xC0,
PingResp = 0xD0, PingResp = 0xD0,
}; };
@@ -57,7 +61,7 @@ class MqttMessage
}; };
MqttMessage() { reset(); } MqttMessage() { reset(); }
MqttMessage(Type t) { create(t); } MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
void incoming(char byte); void incoming(char byte);
void add(char byte) { incoming(byte); } void add(char byte) { incoming(byte); }
void add(const char* p, size_t len, bool addLength=true ); void add(const char* p, size_t len, bool addLength=true );
@@ -87,7 +91,7 @@ class MqttMessage
size=0; size=0;
state=Create; state=Create;
} }
void sendTo(MqttClient*); MqttError sendTo(MqttClient*);
void hexdump(const char* prefix=nullptr) const; void hexdump(const char* prefix=nullptr) const;
private: private:
@@ -120,7 +124,7 @@ class MqttClient
~MqttClient(); ~MqttClient();
void connect(MqttBroker* parent); void connect(MqttBroker* parent);
void connect(std::string broker, uint16_t port); void connect(std::string broker, uint16_t port, uint16_t ka=10);
bool connected() { return bool connected() { return
(parent!=nullptr and client==nullptr) or (parent!=nullptr and client==nullptr) or
@@ -141,8 +145,8 @@ class MqttClient
MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());} MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);}; MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
void subscribe(Topic topic) { subscriptions.insert(topic); } MqttError subscribe(Topic topic, uint8_t qos=0);
void unsubscribe(Topic& topic); MqttError unsubscribe(Topic& topic);
// connected to local broker // connected to local broker
// TODO seems to be useless // TODO seems to be useless
@@ -151,15 +155,21 @@ class MqttClient
#ifdef TINY_MQTT_DEBUG #ifdef TINY_MQTT_DEBUG
void dump() void dump()
{ {
uint32_t ms=millis();
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF"); << " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
Serial << " cnx " << (client && client->connected());
Serial << " ["; Serial << " [";
message.hexdump("entrant msg");
bool c=false; bool c=false;
for(auto s: subscriptions) for(auto s: subscriptions)
{ {
Serial << (c?", ": "")<< s.str().c_str(); Serial << (c?", ": "")<< s.str().c_str();
c=true; c=true;
} }
Serial << "]" << endl; Serial << "]" << endl;
} }
#endif #endif