Compare commits
11 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d92aa1fe3c | ||
|
|
0d6e194560 | ||
|
|
7107da2cce | ||
|
|
28c8713415 | ||
|
|
70cf8137de | ||
|
|
5ab315e472 | ||
|
|
b96b36f10c | ||
|
|
ba831ea366 | ||
|
|
4020393f90 | ||
|
|
7b20e7deb5 | ||
|
|
efe6a05bbd |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1 +1,2 @@
|
||||
*~
|
||||
src/my_credentials.h
|
||||
|
||||
@@ -23,16 +23,16 @@ ESP 8266 is a small and very capable Mqtt Broker and Client
|
||||
* Implement zeroconf mode (needs async)
|
||||
* Add a max_clients in MqttBroker. Used with zeroconf, there will be
|
||||
no need for having tons of clients (also RAM is the problem with many clients)
|
||||
* Test what is the real max number of clients for broker. As far as I saw, 3k is needed per client which would make more than 10 clients critical.
|
||||
* MqttMessage uses a buffer 256 bytes which is usually far than needed.
|
||||
* MqttClient auto reconnection
|
||||
* Test what is the real max number of clients for broker. As far as I saw, 1k is needed per client which would make more than 30 clients critical.
|
||||
* ~~MqttMessage uses a buffer 256 bytes which is usually far than needed.~~
|
||||
* ~~MqttClient does not support more than one subscription at time~~
|
||||
* MqttClient auto re-subscribe
|
||||
* MqttClient auto reconnection
|
||||
* MqttClient does not callback payload...
|
||||
* MqttClient user/password
|
||||
|
||||
## Quickstart
|
||||
|
||||
* install [Streaming library](https://github.com/janelia-arduino/Streaming)
|
||||
* install [TinyMqtt library](https://github.com/hsaturn/TinyMqtt)
|
||||
* modify <libraries/TinyMqtt/src/my_credentials.h> (wifi setup)
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
|
||||
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
|
||||
|
||||
/**
|
||||
* Local broker that accept connections and two local clients
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
|
||||
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
|
||||
|
||||
/** TinyMQTT allows a disconnected mode:
|
||||
*
|
||||
@@ -15,10 +14,10 @@ MqttBroker broker(1883);
|
||||
MqttClient mqtt_a(&broker);
|
||||
MqttClient mqtt_b(&broker);
|
||||
|
||||
void onPublishA(const Topic& topic, const char* payload, size_t length)
|
||||
void onPublishA(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
||||
{ Serial << "--> A Received " << topic.c_str() << endl; }
|
||||
|
||||
void onPublishB(const Topic& topic, const char* payload, size_t length)
|
||||
void onPublishB(const MqttClient* srce, const Topic& topic, const char* payload, size_t length)
|
||||
{ Serial << "--> B Received " << topic.c_str() << endl; }
|
||||
|
||||
void setup()
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
|
||||
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
|
||||
|
||||
#include <my_credentials.h>
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
#include <ESP8266WiFi.h>
|
||||
#include "TinyMqtt.h" // https://github.com/hsaturn/TinyMqtt
|
||||
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
|
||||
|
||||
/** Simple Client
|
||||
*
|
||||
@@ -43,7 +42,11 @@ void loop()
|
||||
|
||||
delay(1000);
|
||||
|
||||
temp += (random(100)>50 ? 0.1 : -0.1);
|
||||
auto rnd=random(100);
|
||||
|
||||
if (rnd > 66) temp += 0.1;
|
||||
else if (rnd < 33) temp -= 0.1;
|
||||
|
||||
client.publish("sensor/temperature", String(temp));
|
||||
|
||||
}
|
||||
|
||||
11
examples/tinymqtt-test/commands.txt
Normal file
11
examples/tinymqtt-test/commands.txt
Normal file
@@ -0,0 +1,11 @@
|
||||
// vim: ts=30
|
||||
Exemple of commands that can be sent via the serial monitor to tinymqtt-test
|
||||
----------------------------------------------------------------------------
|
||||
|
||||
client a starts a client (not connected no internal broker)
|
||||
a.connect [server][port][alive] connects the client, default port=1883
|
||||
a.publish topic [payload] send a topic with a payload
|
||||
a.subscribe topic subscribes to a topic
|
||||
delete a destroy the client
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
#define TINY_MQTT_DEBUG
|
||||
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
|
||||
#include <Streaming.h> // https://github.com/janelia-arduino/Streaming
|
||||
#include <TinyMqtt.h> // https://github.com/hsaturn/TinyMqtt
|
||||
#include <MqttStreaming.h>
|
||||
#include <map>
|
||||
|
||||
/**
|
||||
@@ -54,15 +54,15 @@ void setup()
|
||||
brokers["broker"] = broker;
|
||||
}
|
||||
|
||||
int getint(std::string& str, const int if_empty=0, char sep=' ')
|
||||
int getint(std::string& str, const int if_empty=0)
|
||||
{
|
||||
std::string sword;
|
||||
while(str.length() && str[0]!=sep)
|
||||
while(str.length() && str[0]>='0' && str[0]<='9')
|
||||
{
|
||||
sword += str[0]; str.erase(0,1);
|
||||
}
|
||||
while(str[0]==sep) str.erase(0,1);
|
||||
if (if_empty and sword.length()==0) sword=if_empty;
|
||||
while(str[0]==' ') str.erase(0,1);
|
||||
if (if_empty and sword.length()==0) return if_empty;
|
||||
return atoi(sword.c_str());
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ std::string getword(std::string& str, const char* if_empty=nullptr, char sep=' '
|
||||
return sword;
|
||||
}
|
||||
|
||||
// publish at regular interval
|
||||
class automatic
|
||||
{
|
||||
public:
|
||||
@@ -189,6 +190,7 @@ class automatic
|
||||
std::string topic_;
|
||||
bool bon=false;
|
||||
static std::map<MqttClient*, automatic*> autos;
|
||||
float temp=19;
|
||||
};
|
||||
std::map<MqttClient*, automatic*> automatic::autos;
|
||||
|
||||
@@ -225,6 +227,8 @@ void loop()
|
||||
last_cmd=cmd;
|
||||
while(cmd.length())
|
||||
{
|
||||
MqttError retval = MqttOk;
|
||||
|
||||
std::string s;
|
||||
MqttBroker* broker = nullptr;
|
||||
MqttClient* client = nullptr;
|
||||
@@ -309,16 +313,14 @@ void loop()
|
||||
{
|
||||
if (compare(s,"connect"))
|
||||
{
|
||||
client->connect(getword(cmd,"192.168.1.40").c_str(), 1883);
|
||||
client->connect(getword(cmd,"192.168.1.40").c_str(), getint(cmd, 1883), getint(cmd, 60));
|
||||
Serial << (client->connected() ? "connected." : "not connected") << endl;
|
||||
}
|
||||
else if (compare(s,"publish"))
|
||||
{
|
||||
auto ok=client->publish(getword(cmd, topic.c_str()));
|
||||
if (ok != MqttOk)
|
||||
{
|
||||
Serial << "## ERROR " << ok << endl;
|
||||
}
|
||||
while (cmd[0]==' ') cmd.erase(0,1);
|
||||
retval = client->publish(getword(cmd, topic.c_str()), cmd.c_str(), cmd.length());
|
||||
cmd=""; // remove payload
|
||||
}
|
||||
else if (compare(s,"subscribe"))
|
||||
{
|
||||
@@ -409,9 +411,9 @@ void loop()
|
||||
Serial << endl;
|
||||
Serial << " MqttClient:" << endl;
|
||||
Serial << " client {name} {parent broker} : create a client then" << endl;
|
||||
Serial << " name.connect [ip]" << endl;
|
||||
Serial << " name.connect [ip] [port] [alive]" << endl;
|
||||
Serial << " name.subscribe [topic]" << endl;
|
||||
Serial << " name.publish [topic]" << endl;
|
||||
Serial << " name.publish [topic][payload]" << endl;
|
||||
Serial << " name.view" << endl;
|
||||
Serial << " name.delete" << endl;
|
||||
|
||||
@@ -431,6 +433,11 @@ void loop()
|
||||
if (s.length())
|
||||
Serial << "Unknown command (" << s.c_str() << ")" << endl;
|
||||
}
|
||||
|
||||
if (retval != MqttOk)
|
||||
{
|
||||
Serial << "## ERROR " << retval << endl;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
412
src/MqttStreaming.h
Normal file
412
src/MqttStreaming.h
Normal file
@@ -0,0 +1,412 @@
|
||||
/* MqttStreaming.h - Fork of Streaming.h adding std::string and with some minor fixes
|
||||
* (I have to speek to the author in order to include my changes to his library if possible)
|
||||
**/
|
||||
|
||||
/*
|
||||
Streaming.h - Arduino library for supporting the << streaming operator
|
||||
Copyright (c) 2010-2012 Mikal Hart. All rights reserved.
|
||||
|
||||
This library is free software; you can redistribute it and/or
|
||||
modify it under the terms of the GNU Lesser General Public
|
||||
License as published by the Free Software Foundation; either
|
||||
version 2.1 of the License, or (at your option) any later version.
|
||||
|
||||
This library is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
||||
Lesser General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public
|
||||
License along with this library; if not, write to the Free Software
|
||||
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
||||
*/
|
||||
|
||||
/*
|
||||
Version 6 library changes
|
||||
Copyright (c) 2019 Gazoodle. All rights reserved.
|
||||
|
||||
1. _BASED moved to template to remove type conversion to long and
|
||||
sign changes which break int8_t and int16_t negative numbers.
|
||||
The print implementation still upscales to long for it's internal
|
||||
print routine.
|
||||
|
||||
2. _PAD added to allow padding & filling of characters to the stream
|
||||
|
||||
3. _WIDTH & _WIDTHZ added to allow width printing with space padding
|
||||
and zero padding for numerics
|
||||
|
||||
4. Simple _FMT mechanism ala printf, but without the typeunsafetyness
|
||||
and no internal buffers for replaceable stream printing
|
||||
*/
|
||||
|
||||
#ifndef ARDUINO_STREAMING
|
||||
#define ARDUINO_STREAMING
|
||||
|
||||
#if defined(ARDUINO) && ARDUINO >= 100
|
||||
#include "Arduino.h"
|
||||
#else
|
||||
#ifndef STREAMING_CONSOLE
|
||||
#include "WProgram.h"
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include <string>
|
||||
|
||||
#if defined(ARDUINO_ARCH_AVR) || defined(ARDUINO_ARCH_MEGAAVR)
|
||||
// No stl library, so need trivial version of std::is_signed ...
|
||||
namespace std {
|
||||
template<typename T>
|
||||
struct is_signed { static const bool value = false; };
|
||||
template<>
|
||||
struct is_signed<int8_t> { static const bool value = true; };
|
||||
template<>
|
||||
struct is_signed<int16_t> { static const bool value = true; };
|
||||
template<>
|
||||
struct is_signed<int32_t> { static const bool value = true; };
|
||||
};
|
||||
#else
|
||||
#include <type_traits>
|
||||
#endif
|
||||
|
||||
#define STREAMING_LIBRARY_VERSION 6
|
||||
|
||||
#if !defined(typeof)
|
||||
#define typeof(x) __typeof__(x)
|
||||
#endif
|
||||
|
||||
// PrintBuffer implementation of Print, a small buffer to print in
|
||||
// see its use with pad_float()
|
||||
template <size_t N>
|
||||
class PrintBuffer : public Print
|
||||
{
|
||||
size_t pos = 0;
|
||||
char str[N] {};
|
||||
public:
|
||||
inline const char *operator() ()
|
||||
{ return str; };
|
||||
|
||||
// inline void clear()
|
||||
// { pos = 0; str[0] = '\0'; };
|
||||
|
||||
inline size_t write(uint8_t c)
|
||||
{ return write(&c, 1); };
|
||||
|
||||
inline size_t write(const uint8_t *buffer, size_t size)
|
||||
{
|
||||
size_t s = std::min(size, N-1 - pos); // need a /0 left
|
||||
if (s)
|
||||
{
|
||||
memcpy(&str[pos], buffer, s);
|
||||
pos += s;
|
||||
}
|
||||
return s;
|
||||
};
|
||||
};
|
||||
|
||||
// Generic template
|
||||
template<class T>
|
||||
inline Print &operator <<(Print &stream, const T &arg)
|
||||
{ stream.print(arg); return stream; }
|
||||
|
||||
// TODO sfinae maybe could do the trick ?
|
||||
inline Print &operator <<(Print &stream, const std::string &str)
|
||||
{ stream.print(str.c_str()); return stream; }
|
||||
|
||||
template<typename T>
|
||||
struct _BASED
|
||||
{
|
||||
T val;
|
||||
int base;
|
||||
_BASED(T v, int b): val(v), base(b)
|
||||
{}
|
||||
};
|
||||
|
||||
#if ARDUINO >= 100
|
||||
|
||||
struct _BYTE_CODE
|
||||
{
|
||||
byte val;
|
||||
_BYTE_CODE(byte v) : val(v)
|
||||
{}
|
||||
};
|
||||
#define _BYTE(a) _BYTE_CODE(a)
|
||||
|
||||
inline Print &operator <<(Print &obj, const _BYTE_CODE &arg)
|
||||
{ obj.write(arg.val); return obj; }
|
||||
|
||||
#else
|
||||
|
||||
#define _BYTE(a) _BASED<typeof(a)>(a, BYTE)
|
||||
|
||||
#endif
|
||||
|
||||
#define _HEX(a) _BASED<typeof(a)>(a, HEX)
|
||||
#define _DEC(a) _BASED<typeof(a)>(a, DEC)
|
||||
#define _OCT(a) _BASED<typeof(a)>(a, OCT)
|
||||
#define _BIN(a) _BASED<typeof(a)>(a, BIN)
|
||||
|
||||
// Specialization for class _BASED
|
||||
// Thanks to Arduino forum user Ben Combee who suggested this
|
||||
// clever technique to allow for expressions like
|
||||
// Serial << _HEX(a);
|
||||
|
||||
template<typename T>
|
||||
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
|
||||
{ obj.print(arg.val, arg.base); return obj; }
|
||||
|
||||
#if ARDUINO >= 18
|
||||
// Specialization for class _FLOAT
|
||||
// Thanks to Michael Margolis for suggesting a way
|
||||
// to accommodate Arduino 0018's floating point precision
|
||||
// feature like this:
|
||||
// Serial << _FLOAT(gps_latitude, 6); // 6 digits of precision
|
||||
|
||||
struct _FLOAT
|
||||
{
|
||||
double val; // only Print::print(double)
|
||||
int digits;
|
||||
_FLOAT(double v, int d): val(v), digits(d)
|
||||
{}
|
||||
};
|
||||
|
||||
inline Print &operator <<(Print &obj, const _FLOAT &arg)
|
||||
{ obj.print(arg.val, arg.digits); return obj; }
|
||||
#endif
|
||||
|
||||
// Specialization for enum _EndLineCode
|
||||
// Thanks to Arduino forum user Paul V. who suggested this
|
||||
// clever technique to allow for expressions like
|
||||
// Serial << "Hello!" << endl;
|
||||
|
||||
enum _EndLineCode { endl };
|
||||
|
||||
inline Print &operator <<(Print &obj, _EndLineCode)
|
||||
{ obj.println(); return obj; }
|
||||
|
||||
// Specialization for padding & filling, mainly utilized
|
||||
// by the width printers
|
||||
//
|
||||
// Use like
|
||||
// Serial << _PAD(10,' '); // Will output 10 spaces
|
||||
// Serial << _PAD(4, '0'); // Will output 4 zeros
|
||||
struct _PAD
|
||||
{
|
||||
int8_t width;
|
||||
char chr;
|
||||
_PAD(int8_t w, char c) : width(w), chr(c) {}
|
||||
};
|
||||
|
||||
inline Print &operator <<(Print& stm, const _PAD &arg)
|
||||
{
|
||||
for(int8_t i = 0; i < arg.width; i++)
|
||||
stm.print(arg.chr);
|
||||
return stm;
|
||||
}
|
||||
|
||||
// Specialization for width printing
|
||||
//
|
||||
// Use like Result
|
||||
// -------- ------
|
||||
// Serial << _WIDTH(1,5) " 1"
|
||||
// Serial << _WIDTH(10,5) " 10"
|
||||
// Serial << _WIDTH(100,5) " 100"
|
||||
// Serial << _WIDTHZ(1,5) "00001"
|
||||
//
|
||||
// Great for times & dates, or hex dumps
|
||||
//
|
||||
// Serial << _WIDTHZ(hour,2) << ':' << _WIDTHZ(min,2) << ':' << _WIDTHZ(sec,2)
|
||||
//
|
||||
// for(int index=0; index<byte_array_size; index++)
|
||||
// Serial << _WIDTHZ(_HEX(byte_array[index]))
|
||||
|
||||
template<typename T>
|
||||
struct __WIDTH
|
||||
{
|
||||
const T val;
|
||||
int8_t width;
|
||||
char pad;
|
||||
__WIDTH(const T& v, int8_t w, char p) : val(v), width(w), pad(p) {}
|
||||
};
|
||||
|
||||
// Count digits in an integer of specific base
|
||||
template<typename T>
|
||||
inline uint8_t digits(T v, int8_t base = 10)
|
||||
{
|
||||
uint8_t digits = 0;
|
||||
if ( std::is_signed<T>::value )
|
||||
{
|
||||
if ( v < 0 )
|
||||
{
|
||||
digits++;
|
||||
v = -v; // v needs to be postive for the digits counter to work
|
||||
}
|
||||
}
|
||||
do
|
||||
{
|
||||
v /= base;
|
||||
digits++;
|
||||
} while( v > 0 );
|
||||
return digits;
|
||||
}
|
||||
|
||||
// Generic get the width of a value in base 10
|
||||
template<typename T>
|
||||
inline uint8_t get_value_width(T val)
|
||||
{ return digits(val); }
|
||||
|
||||
inline uint8_t get_value_width(const char * val)
|
||||
{ return strlen(val); }
|
||||
|
||||
#ifdef ARDUINO
|
||||
inline uint8_t get_value_width(const __FlashStringHelper * val)
|
||||
{ return strlen_P(reinterpret_cast<const char *>(val)); }
|
||||
#endif
|
||||
|
||||
// _BASED<T> get the width of a value
|
||||
template<typename T>
|
||||
inline uint8_t get_value_width(_BASED<T> b)
|
||||
{ return digits(b.val, b.base); }
|
||||
|
||||
// Constructor wrapper to allow automatic template parameter deduction
|
||||
template<typename T>
|
||||
__WIDTH<T> _WIDTH(T val, int8_t width) { return __WIDTH<T>(val, width, ' '); }
|
||||
template<typename T>
|
||||
__WIDTH<T> _WIDTHZ(T val, int8_t width) { return __WIDTH<T>(val, width, '0'); }
|
||||
|
||||
|
||||
// Operator overload to handle width printing.
|
||||
template<typename T>
|
||||
inline Print &operator <<(Print &stm, const __WIDTH<T> &arg)
|
||||
{ stm << _PAD(arg.width - get_value_width(arg.val), arg.pad) << arg.val; return stm; }
|
||||
|
||||
// explicit Operator overload to handle width printing of _FLOAT, double and float
|
||||
template<typename T>
|
||||
inline Print &pad_float(Print &stm, const __WIDTH<T> &arg, const double val, const int digits = 2) // see Print::print(double, int = 2)
|
||||
{
|
||||
PrintBuffer<32> buf; // it's only ~45B on the stack, no allocation, leak or fragmentation
|
||||
size_t size = buf.print(val, digits); // print in buf
|
||||
return stm << _PAD(arg.width - size, arg.pad) << buf(); // pad and concat what's in buf
|
||||
}
|
||||
|
||||
inline Print &operator <<(Print &stm, const __WIDTH<float> &arg)
|
||||
{ return pad_float(stm, arg, arg.val); }
|
||||
|
||||
inline Print &operator <<(Print &stm, const __WIDTH<double> &arg)
|
||||
{ return pad_float(stm, arg, arg.val); }
|
||||
|
||||
inline Print &operator <<(Print &stm, const __WIDTH<_FLOAT> &arg)
|
||||
{ auto& f = arg.val; return pad_float(stm, arg, f.val, f.digits); }
|
||||
|
||||
// a less verbose _FLOATW for _WIDTH(_FLOAT)
|
||||
#define _FLOATW(val, digits, width) _WIDTH<_FLOAT>(_FLOAT((val), (digits)), (width))
|
||||
|
||||
// Specialization for replacement formatting
|
||||
//
|
||||
// Designed to be similar to printf that everyone knows and loves/hates. But without
|
||||
// the internal buffers and type agnosticism. This version only has placeholders in
|
||||
// the format string, the actual values are supplied using the stream safe operators
|
||||
// defined in this library.
|
||||
//
|
||||
// Use like this:
|
||||
//
|
||||
// Serial << FMT(F("Replace % with %"), 1, 2 )
|
||||
// Serial << FMT("Time is %:%:%", _WIDTHZ(hours,2), _WIDTHZ(minutes,2), _WIDTHZ(seconds,2))
|
||||
// Serial << FMT("Your score is %\\%", score); // Note the \\ to escape the % sign
|
||||
|
||||
// Ok, hold your hats. This is a foray into C++11's variadic template engine ...
|
||||
|
||||
inline char get_next_format_char(const char *& format_string)
|
||||
{
|
||||
char format_char = *format_string;
|
||||
if ( format_char > 0 ) format_string++;
|
||||
return format_char;
|
||||
}
|
||||
|
||||
#ifdef ARDUINO
|
||||
inline char get_next_format_char(const __FlashStringHelper*& format_string)
|
||||
{
|
||||
char format_char = pgm_read_byte(format_string);
|
||||
if ( format_char > 0 ) format_string = reinterpret_cast<const __FlashStringHelper*>(reinterpret_cast<const char *>(format_string)+1);
|
||||
return format_char;
|
||||
}
|
||||
#endif
|
||||
|
||||
template<typename Ft>
|
||||
inline bool check_backslash(char& format_char, Ft& format_string)
|
||||
{
|
||||
if ( format_char == '\\')
|
||||
{
|
||||
format_char = get_next_format_char(format_string);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
// The template tail printer helper
|
||||
template<typename Ft, typename... Ts>
|
||||
struct __FMT
|
||||
{
|
||||
Ft format_string;
|
||||
__FMT(Ft f, Ts ... args) : format_string(f) {}
|
||||
inline void tstreamf(Print& stm, Ft format) const
|
||||
{
|
||||
while(char c = get_next_format_char(format))
|
||||
{
|
||||
check_backslash(c, format);
|
||||
if ( c )
|
||||
stm.print(c);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// The variadic template helper
|
||||
template<typename Ft, typename T, typename... Ts>
|
||||
struct __FMT<Ft, T, Ts...> : __FMT<Ft, Ts...>
|
||||
{
|
||||
T val;
|
||||
__FMT(Ft f, T t, Ts... ts) : __FMT<Ft, Ts...>(f, ts...), val(t) {}
|
||||
inline void tstreamf(Print& stm, Ft format) const
|
||||
{
|
||||
while(char c = get_next_format_char(format))
|
||||
{
|
||||
if (!check_backslash(c, format))
|
||||
{
|
||||
if ( c == '%')
|
||||
{
|
||||
stm << val;
|
||||
// Variadic recursion ... compiler rolls this out during
|
||||
// template argument pack expansion
|
||||
__FMT<Ft, Ts...>::tstreamf(stm, format);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (c)
|
||||
stm.print(c);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// The actual operator should you only instanciate the FMT
|
||||
// helper with a format string and no parameters
|
||||
template<typename Ft, typename... Ts>
|
||||
inline Print& operator <<(Print &stm, const __FMT<Ft, Ts...> &args)
|
||||
{
|
||||
args.tstreamf(stm, args.format_string);
|
||||
return stm;
|
||||
}
|
||||
|
||||
// The variadic stream helper
|
||||
template<typename Ft, typename T, typename... Ts>
|
||||
inline Print& operator <<(Print &stm, const __FMT<Ft, T, Ts...> &args)
|
||||
{
|
||||
args.tstreamf(stm, args.format_string);
|
||||
return stm;
|
||||
}
|
||||
|
||||
// As we don't have C++17, we can't get a constructor to use
|
||||
// automatic argument deduction, but ... this little trick gets
|
||||
// around that ...
|
||||
template<typename Ft, typename... Ts>
|
||||
__FMT<Ft, Ts...> _FMT(Ft format, Ts ... args) { return __FMT<Ft, Ts...>(format, args...); }
|
||||
|
||||
#endif
|
||||
@@ -2,7 +2,6 @@
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <string.h>
|
||||
// #include <Streaming.h>
|
||||
#include <ESP8266WiFi.h>
|
||||
|
||||
/***
|
||||
|
||||
159
src/TinyMqtt.cpp
159
src/TinyMqtt.cpp
@@ -1,6 +1,5 @@
|
||||
#include "TinyMqtt.h"
|
||||
#include <sstream>
|
||||
#include <Streaming.h>
|
||||
|
||||
void outstring(const char* prefix, const char*p, uint16_t len)
|
||||
{
|
||||
@@ -60,7 +59,7 @@ void MqttClient::close()
|
||||
}
|
||||
}
|
||||
|
||||
void MqttClient::connect(std::string broker, uint16_t port)
|
||||
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
||||
{
|
||||
debug("cnx: closing");
|
||||
close();
|
||||
@@ -75,13 +74,15 @@ void MqttClient::connect(std::string broker, uint16_t port)
|
||||
message.add(0x4); // Mqtt protocol version 3.1.1
|
||||
message.add(0x0); // Connect flags TODO user / name
|
||||
|
||||
keep_alive = 1; // TODO not configurable
|
||||
keep_alive = ka; // TODO not configurable
|
||||
message.add(0x00); // keep_alive
|
||||
message.add((char)keep_alive);
|
||||
message.add(clientId);
|
||||
debug("cnx: mqtt connecting");
|
||||
message.sendTo(this);
|
||||
message.reset();
|
||||
debug("cnx: mqtt sent " << (int32_t)parent);
|
||||
|
||||
clientAlive(0);
|
||||
}
|
||||
}
|
||||
@@ -146,16 +147,19 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
|
||||
{
|
||||
i++;
|
||||
Serial << "brk_" << (broker && broker->connected() ? "con" : "dis") <<
|
||||
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected();
|
||||
" srce=" << (source->isLocal() ? "loc" : "rem") << " clt#" << i << ", local=" << client->isLocal() << ", con=" << client->connected() << endl;
|
||||
bool doit = false;
|
||||
if (broker && broker->connected()) // Connected: R2 R3 R5 R6
|
||||
if (broker && broker->connected()) // Broker is connected
|
||||
{
|
||||
// ext broker -> clients or
|
||||
// or clients -> ext broker
|
||||
if (source == broker) // broker -> clients
|
||||
doit = true;
|
||||
else // clients -> broker
|
||||
retval=broker->publish(topic, msg);
|
||||
{
|
||||
MqttError ret = broker->publish(topic, msg);
|
||||
if (ret != MqttOk) retval = ret;
|
||||
}
|
||||
}
|
||||
else // Disconnected: R7
|
||||
{
|
||||
@@ -164,7 +168,7 @@ MqttError MqttBroker::publish(const MqttClient* source, const Topic& topic, Mqtt
|
||||
}
|
||||
Serial << ", doit=" << doit << ' ';
|
||||
|
||||
if (doit) client->publish(topic, msg);
|
||||
if (doit) retval = client->publish(topic, msg);
|
||||
debug("");
|
||||
}
|
||||
return retval;
|
||||
@@ -208,6 +212,7 @@ void MqttClient::loop()
|
||||
}
|
||||
else if (client && client->connected())
|
||||
{
|
||||
debug("pingreq");
|
||||
uint16_t pingreq = MqttMessage::Type::PingReq;
|
||||
client->write((uint8_t*)(&pingreq), 2);
|
||||
clientAlive(0);
|
||||
@@ -227,11 +232,40 @@ void MqttClient::loop()
|
||||
}
|
||||
}
|
||||
|
||||
MqttError MqttClient::subscribe(Topic topic, uint8_t qos)
|
||||
{
|
||||
debug("subsribe(" << topic.c_str() << ")");
|
||||
MqttError ret = MqttOk;
|
||||
|
||||
subscriptions.insert(topic);
|
||||
|
||||
if (parent==nullptr) // remote broker ?
|
||||
{
|
||||
debug("remote subscribe");
|
||||
MqttMessage msg(MqttMessage::Type::Subscribe, 2);
|
||||
|
||||
// TODO manage packet identifier
|
||||
msg.add(0);
|
||||
msg.add(0);
|
||||
|
||||
msg.add(topic.str());
|
||||
msg.add(qos);
|
||||
ret = msg.sendTo(this);
|
||||
|
||||
// TODO we should wait (state machine) for SUBACK
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
void MqttClient::processMessage()
|
||||
{
|
||||
std::string error;
|
||||
std::string s;
|
||||
// Serial << "---> INCOMING " << _HEX(message.type()) << ", mem=" << ESP.getFreeHeap() << endl;
|
||||
if (message.type() != MqttMessage::Type::PingReq && message.type() != MqttMessage::Type::PingResp)
|
||||
{
|
||||
Serial << "---> INCOMING " << _HEX(message.type()) << " client(" << (int)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
|
||||
message.hexdump("Incoming");
|
||||
}
|
||||
auto header = message.getVHeader();
|
||||
const char* payload;
|
||||
uint16_t len;
|
||||
@@ -312,6 +346,24 @@ void MqttClient::processMessage()
|
||||
message.sendTo(this);
|
||||
break;
|
||||
|
||||
case MqttMessage::Type::Connack:
|
||||
// TODO what more on connack ?
|
||||
mqtt_connected = true;
|
||||
bclose = false;
|
||||
break;
|
||||
|
||||
case MqttMessage::Type::Suback:
|
||||
case MqttMessage::Type::Puback:
|
||||
if (!mqtt_connected) break;
|
||||
// Ignore acks
|
||||
bclose = false;
|
||||
break;
|
||||
|
||||
case MqttMessage::Type::PingResp:
|
||||
// TODO: no PingResp is suspicious (server dead)
|
||||
bclose = false;
|
||||
break;
|
||||
|
||||
case MqttMessage::Type::PingReq:
|
||||
if (!mqtt_connected) break;
|
||||
if (client)
|
||||
@@ -327,14 +379,26 @@ void MqttClient::processMessage()
|
||||
break;
|
||||
|
||||
case MqttMessage::Type::Subscribe:
|
||||
if (!mqtt_connected) break;
|
||||
payload = header+2;
|
||||
message.getString(payload, len); // Topic
|
||||
outstring("Subscribes", payload, len);
|
||||
{
|
||||
if (!mqtt_connected) break;
|
||||
payload = header+2;
|
||||
|
||||
subscribe(Topic(payload, len));
|
||||
bclose = false;
|
||||
// TODO SUBACK
|
||||
debug("subscribe loop");
|
||||
while(payload < message.end())
|
||||
{
|
||||
message.getString(payload, len); // Topic
|
||||
debug( " topic (" << std::string(payload, len) << ')');
|
||||
outstring("Subscribes", payload, len);
|
||||
// subscribe(Topic(payload, len));
|
||||
subscriptions.insert(Topic(payload, len));
|
||||
payload += len;
|
||||
uint8_t qos = *payload++;
|
||||
debug(" qos=" << qos);
|
||||
}
|
||||
debug("end loop");
|
||||
bclose = false;
|
||||
// TODO SUBACK
|
||||
}
|
||||
break;
|
||||
|
||||
case MqttMessage::Type::Publish:
|
||||
@@ -351,8 +415,15 @@ void MqttClient::processMessage()
|
||||
if (qos) payload+=2; // ignore packet identifier if any
|
||||
// TODO reset DUP
|
||||
// TODO reset RETAIN
|
||||
debug("publishing to parent");
|
||||
parent->publish(this, published, message);
|
||||
if (parent)
|
||||
{
|
||||
debug("publishing to parent");
|
||||
parent->publish(this, published, message);
|
||||
}
|
||||
else if (callback && subscriptions.find(published)!=subscriptions.end())
|
||||
{
|
||||
callback(this, published, nullptr, 0); // TODO send the real payload
|
||||
}
|
||||
// TODO should send PUBACK
|
||||
bclose = false;
|
||||
}
|
||||
@@ -376,7 +447,7 @@ void MqttClient::processMessage()
|
||||
}
|
||||
else
|
||||
{
|
||||
clientAlive(5);
|
||||
clientAlive(parent ? 5 : 0);
|
||||
}
|
||||
message.reset();
|
||||
}
|
||||
@@ -398,7 +469,7 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
|
||||
if (parent)
|
||||
return parent->publish(this, topic, msg);
|
||||
else if (client)
|
||||
msg.sendTo(this);
|
||||
return msg.sendTo(this);
|
||||
else
|
||||
return MqttNowhereToSend;
|
||||
}
|
||||
@@ -417,7 +488,7 @@ MqttError MqttClient::publish(const Topic& topic, MqttMessage& msg)
|
||||
Serial << " match/send";
|
||||
if (client)
|
||||
{
|
||||
msg.sendTo(this);
|
||||
retval = msg.sendTo(this);
|
||||
}
|
||||
else if (callback)
|
||||
{
|
||||
@@ -506,28 +577,58 @@ void MqttMessage::encodeLength(char* msb, int length)
|
||||
} while (length);
|
||||
};
|
||||
|
||||
void MqttMessage::sendTo(MqttClient* client)
|
||||
MqttError MqttMessage::sendTo(MqttClient* client)
|
||||
{
|
||||
if (buffer.size()>2)
|
||||
{
|
||||
debug("sending " << buffer.size() << " bytes");
|
||||
encodeLength(&buffer[1], buffer.size()-2);
|
||||
// hexdump("snd");
|
||||
hexdump("snd");
|
||||
client->write(&buffer[0], buffer.size());
|
||||
}
|
||||
else
|
||||
{
|
||||
Serial << "??? Invalid send" << endl;
|
||||
debug("??? Invalid send");
|
||||
return MqttInvalidMessage;
|
||||
}
|
||||
return MqttOk;
|
||||
}
|
||||
|
||||
void MqttMessage::hexdump(const char* prefix) const
|
||||
{
|
||||
if (prefix) Serial << prefix << ' ';
|
||||
Serial << "size(" << buffer.size() << ") : ";
|
||||
for(const char chr: buffer)
|
||||
uint16_t addr=0;
|
||||
const int bytes_per_row = 8;
|
||||
const char* hex_to_str = " | ";
|
||||
const char* separator = hex_to_str;
|
||||
const char* half_sep = " - ";
|
||||
std::string ascii;
|
||||
|
||||
Serial << prefix << " size(" << buffer.size() << "), state=" << state << endl;
|
||||
|
||||
for(const char chr: buffer)
|
||||
{
|
||||
if ((addr % bytes_per_row) == 0)
|
||||
{
|
||||
if (chr<16) Serial << '0';
|
||||
Serial << _HEX(chr) << ' ';
|
||||
if (ascii.length()) Serial << hex_to_str << ascii << separator << endl;
|
||||
if (prefix) Serial << prefix << separator;
|
||||
ascii.clear();
|
||||
}
|
||||
Serial << endl;
|
||||
addr++;
|
||||
if (chr<16) Serial << '0';
|
||||
Serial << _HEX(chr) << ' ';
|
||||
|
||||
ascii += (chr<32 ? '.' : chr);
|
||||
if (ascii.length() == (bytes_per_row/2)) ascii += half_sep;
|
||||
}
|
||||
if (ascii.length())
|
||||
{
|
||||
while(ascii.length() < bytes_per_row+strlen(half_sep))
|
||||
{
|
||||
Serial << " "; // spaces per hexa byte
|
||||
ascii += ' ';
|
||||
}
|
||||
Serial << hex_to_str << ascii << separator;
|
||||
}
|
||||
|
||||
Serial << endl;
|
||||
}
|
||||
|
||||
@@ -3,9 +3,11 @@
|
||||
#include <set>
|
||||
#include <string>
|
||||
#include "StringIndexer.h"
|
||||
#include <MqttStreaming.h>
|
||||
|
||||
#define TINY_MQTT_DEBUG
|
||||
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
#include <Streaming.h>
|
||||
#define debug(what) { Serial << __LINE__ << ' ' << what << endl; delay(100); }
|
||||
#else
|
||||
#define debug(what) {}
|
||||
@@ -15,6 +17,7 @@ enum MqttError
|
||||
{
|
||||
MqttOk = 0,
|
||||
MqttNowhereToSend=1,
|
||||
MqttInvalidMessage=2,
|
||||
};
|
||||
|
||||
class Topic : public IndexedString
|
||||
@@ -42,6 +45,7 @@ class MqttMessage
|
||||
Publish = 0x30,
|
||||
PubAck = 0x40,
|
||||
Subscribe = 0x80,
|
||||
Suback = 0x90,
|
||||
PingReq = 0xC0,
|
||||
PingResp = 0xD0,
|
||||
};
|
||||
@@ -57,7 +61,7 @@ class MqttMessage
|
||||
};
|
||||
|
||||
MqttMessage() { reset(); }
|
||||
MqttMessage(Type t) { create(t); }
|
||||
MqttMessage(Type t, uint8_t bits_d3_d0=0) { create(t); buffer[0] |= bits_d3_d0; }
|
||||
void incoming(char byte);
|
||||
void add(char byte) { incoming(byte); }
|
||||
void add(const char* p, size_t len, bool addLength=true );
|
||||
@@ -87,7 +91,7 @@ class MqttMessage
|
||||
size=0;
|
||||
state=Create;
|
||||
}
|
||||
void sendTo(MqttClient*);
|
||||
MqttError sendTo(MqttClient*);
|
||||
void hexdump(const char* prefix=nullptr) const;
|
||||
|
||||
private:
|
||||
@@ -120,7 +124,7 @@ class MqttClient
|
||||
~MqttClient();
|
||||
|
||||
void connect(MqttBroker* parent);
|
||||
void connect(std::string broker, uint16_t port);
|
||||
void connect(std::string broker, uint16_t port, uint16_t ka=10);
|
||||
|
||||
bool connected() { return
|
||||
(parent!=nullptr and client==nullptr) or
|
||||
@@ -141,8 +145,8 @@ class MqttClient
|
||||
MqttError publish(const Topic& t, const std::string& s) { return publish(t,s.c_str(),s.length());}
|
||||
MqttError publish(const Topic& t) { return publish(t, nullptr, 0);};
|
||||
|
||||
void subscribe(Topic topic) { subscriptions.insert(topic); }
|
||||
void unsubscribe(Topic& topic);
|
||||
MqttError subscribe(Topic topic, uint8_t qos=0);
|
||||
MqttError unsubscribe(Topic& topic);
|
||||
|
||||
// connected to local broker
|
||||
// TODO seems to be useless
|
||||
@@ -151,15 +155,21 @@ class MqttClient
|
||||
#ifdef TINY_MQTT_DEBUG
|
||||
void dump()
|
||||
{
|
||||
uint32_t ms=millis();
|
||||
Serial << "MqttClient (" << clientId.c_str() << ") p=" << (int32_t) parent
|
||||
<< " c=" << (int32_t)client << (connected() ? " ON " : " OFF");
|
||||
Serial << ", alive=" << (uint32_t)alive << '/' << ms << ", ka=" << keep_alive;
|
||||
Serial << " cnx " << (client && client->connected());
|
||||
Serial << " [";
|
||||
message.hexdump("entrant msg");
|
||||
bool c=false;
|
||||
for(auto s: subscriptions)
|
||||
{
|
||||
Serial << (c?", ": "")<< s.str().c_str();
|
||||
c=true;
|
||||
}
|
||||
|
||||
|
||||
Serial << "]" << endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user