Compare commits
7 Commits
0.7.8
...
issue_2_br
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d39c58d8f5 | ||
|
|
36dde2c063 | ||
|
|
64a05bb60b | ||
|
|
bb89fc5284 | ||
|
|
c1fd1bc907 | ||
|
|
d919188eb0 | ||
|
|
88c7d552cb |
@@ -6,7 +6,7 @@
|
|||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "https://github.com/hsaturn/TinyMqtt.git"
|
"url": "https://github.com/hsaturn/TinyMqtt.git"
|
||||||
},
|
},
|
||||||
"version": "0.7.7",
|
"version": "0.7.9",
|
||||||
"exclude": "",
|
"exclude": "",
|
||||||
"examples": "examples/*/*.ino",
|
"examples": "examples/*/*.ino",
|
||||||
"frameworks": "arduino",
|
"frameworks": "arduino",
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
name=TinyMqtt
|
name=TinyMqtt
|
||||||
version=0.7.7
|
version=0.7.9
|
||||||
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
author=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
||||||
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
maintainer=Francois BIOT, HSaturn, <hsaturn@gmail.com>
|
||||||
sentence=A tiny broker and client library for MQTT messaging.
|
sentence=A tiny broker and client library for MQTT messaging.
|
||||||
|
|||||||
@@ -42,7 +42,7 @@
|
|||||||
#ifndef ARDUINO_STREAMING
|
#ifndef ARDUINO_STREAMING
|
||||||
#define ARDUINO_STREAMING
|
#define ARDUINO_STREAMING
|
||||||
|
|
||||||
#if defined(ARDUINO) && ARDUINO >= 100
|
#if (defined(ARDUINO) && ARDUINO >= 100) || defined(EPOXY_DUINO)
|
||||||
#include "Arduino.h"
|
#include "Arduino.h"
|
||||||
#else
|
#else
|
||||||
#ifndef STREAMING_CONSOLE
|
#ifndef STREAMING_CONSOLE
|
||||||
@@ -154,7 +154,7 @@ template<typename T>
|
|||||||
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
|
inline Print &operator <<(Print &obj, const _BASED<T> &arg)
|
||||||
{ obj.print(arg.val, arg.base); return obj; }
|
{ obj.print(arg.val, arg.base); return obj; }
|
||||||
|
|
||||||
#if ARDUINO >= 18
|
#if ARDUINO >= 18 || defined(EPOXY_DUINO)
|
||||||
// Specialization for class _FLOAT
|
// Specialization for class _FLOAT
|
||||||
// Thanks to Michael Margolis for suggesting a way
|
// Thanks to Michael Margolis for suggesting a way
|
||||||
// to accommodate Arduino 0018's floating point precision
|
// to accommodate Arduino 0018's floating point precision
|
||||||
|
|||||||
@@ -59,7 +59,7 @@ void MqttClient::close(bool bSendDisconnect)
|
|||||||
{
|
{
|
||||||
debug("close " << id().c_str());
|
debug("close " << id().c_str());
|
||||||
mqtt_connected = false;
|
mqtt_connected = false;
|
||||||
if (client)
|
if (client) // connected to a remote broker
|
||||||
{
|
{
|
||||||
if (bSendDisconnect and client->connected())
|
if (bSendDisconnect and client->connected())
|
||||||
{
|
{
|
||||||
@@ -76,6 +76,12 @@ void MqttClient::close(bool bSendDisconnect)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void MqttClient::connect(MqttBroker* parentBroker)
|
||||||
|
{
|
||||||
|
close();
|
||||||
|
parent = parentBroker;
|
||||||
|
}
|
||||||
|
|
||||||
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
void MqttClient::connect(std::string broker, uint16_t port, uint16_t ka)
|
||||||
{
|
{
|
||||||
debug("cnx: closing");
|
debug("cnx: closing");
|
||||||
@@ -397,8 +403,13 @@ void MqttClient::processMessage(const MqttMessage* mesg)
|
|||||||
#ifdef TINY_MQTT_DEBUG
|
#ifdef TINY_MQTT_DEBUG
|
||||||
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
|
if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::Type::PingResp)
|
||||||
{
|
{
|
||||||
|
#ifdef NOT_ESP_CORE
|
||||||
|
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << " ESP.getFreeHeap() "<< endl;
|
||||||
|
#else
|
||||||
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
|
Serial << "---> INCOMING " << _HEX(mesg->type()) << " client(" << (dbg_ptr)client << ':' << clientId << ") mem=" << ESP.getFreeHeap() << endl;
|
||||||
|
#endif
|
||||||
// mesg->hexdump("Incoming");
|
// mesg->hexdump("Incoming");
|
||||||
|
mesg->hexdump("Incoming");
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
auto header = mesg->getVHeader();
|
auto header = mesg->getVHeader();
|
||||||
@@ -540,6 +551,9 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case MqttMessage::Type::Publish:
|
case MqttMessage::Type::Publish:
|
||||||
|
#ifdef TINY_MQTT_DEBUG
|
||||||
|
Serial << "publish " << mqtt_connected << '/' << (long) client << endl;
|
||||||
|
#endif
|
||||||
if (mqtt_connected or client == nullptr)
|
if (mqtt_connected or client == nullptr)
|
||||||
{
|
{
|
||||||
uint8_t qos = mesg->type() & 0x6;
|
uint8_t qos = mesg->type() & 0x6;
|
||||||
@@ -554,8 +568,12 @@ if (mesg->type() != MqttMessage::Type::PingReq && mesg->type() != MqttMessage::T
|
|||||||
// TODO reset DUP
|
// TODO reset DUP
|
||||||
// TODO reset RETAIN
|
// TODO reset RETAIN
|
||||||
|
|
||||||
if (client==nullptr) // internal MqttClient receives publish
|
if (parent==nullptr or client==nullptr) // internal MqttClient receives publish
|
||||||
{
|
{
|
||||||
|
#ifdef TINY_MQTT_DEBUG
|
||||||
|
Serial << (isSubscribedTo(published) ? "not" : "") << " subscribed.\n";
|
||||||
|
Serial << "has " << (callback ? "" : "no ") << " callback.\n";
|
||||||
|
#endif
|
||||||
if (callback and isSubscribedTo(published))
|
if (callback and isSubscribedTo(published))
|
||||||
{
|
{
|
||||||
callback(this, published, payload, len); // TODO send the real payload
|
callback(this, published, payload, len); // TODO send the real payload
|
||||||
@@ -609,7 +627,6 @@ MqttError MqttClient::publish(const Topic& topic, const char* payload, size_t pa
|
|||||||
MqttMessage msg(MqttMessage::Publish);
|
MqttMessage msg(MqttMessage::Publish);
|
||||||
msg.add(topic);
|
msg.add(topic);
|
||||||
msg.add(payload, pay_length, false);
|
msg.add(payload, pay_length, false);
|
||||||
msg.complete();
|
|
||||||
if (parent)
|
if (parent)
|
||||||
{
|
{
|
||||||
return parent->publish(this, topic, msg);
|
return parent->publish(this, topic, msg);
|
||||||
@@ -633,6 +650,10 @@ MqttError MqttClient::publishIfSubscribed(const Topic& topic, const MqttMessage&
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
processMessage(&msg);
|
processMessage(&msg);
|
||||||
|
|
||||||
|
#ifdef TINY_MQTT_DEBUG
|
||||||
|
Serial << "Should call the callback ?\n";
|
||||||
|
#endif
|
||||||
// callback(this, topic, nullptr, 0); // TODO Payload
|
// callback(this, topic, nullptr, 0); // TODO Payload
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -710,7 +731,7 @@ void MqttMessage::incoming(char in_byte)
|
|||||||
reset();
|
reset();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (buffer.length() > MaxBufferLength) // TODO magic 256 ?
|
if (buffer.length() > MaxBufferLength)
|
||||||
{
|
{
|
||||||
debug("Too long " << state);
|
debug("Too long " << state);
|
||||||
reset();
|
reset();
|
||||||
@@ -721,36 +742,47 @@ void MqttMessage::add(const char* p, size_t len, bool addLength)
|
|||||||
{
|
{
|
||||||
if (addLength)
|
if (addLength)
|
||||||
{
|
{
|
||||||
buffer.reserve(buffer.length()+addLength+2);
|
buffer.reserve(buffer.length()+2);
|
||||||
incoming(len>>8);
|
incoming(len>>8);
|
||||||
incoming(len & 0xFF);
|
incoming(len & 0xFF);
|
||||||
}
|
}
|
||||||
while(len--) incoming(*p++);
|
while(len--) incoming(*p++);
|
||||||
}
|
}
|
||||||
|
|
||||||
void MqttMessage::encodeLength(char* msb, int length) const
|
void MqttMessage::encodeLength() const
|
||||||
{
|
{
|
||||||
|
if (state != Complete)
|
||||||
|
{
|
||||||
|
int length = buffer.size()-2; // 1 byte for header, 1 byte for pre-reserved length field.
|
||||||
|
std::string::size_type ins=1;
|
||||||
do
|
do
|
||||||
{
|
{
|
||||||
uint8_t encoded(length & 0x7F);
|
uint8_t encoded(length & 0x7F);
|
||||||
length >>=7;
|
length >>=7;
|
||||||
if (length) encoded |= 0x80;
|
if (length) encoded |= 0x80;
|
||||||
*msb++ = encoded;
|
|
||||||
} while (length);
|
|
||||||
};
|
|
||||||
|
|
||||||
void MqttMessage::complete()
|
if (ins==1)
|
||||||
{
|
buffer[ins]=encoded;
|
||||||
encodeLength(&buffer[1], buffer.size()-2);
|
else
|
||||||
|
buffer.insert(ins, 1, encoded);
|
||||||
|
// On pourrait optimiser, cet insert est couteux, il faudrait en fait non pas
|
||||||
|
// insérer, mais réserver 4 octets pour les remplir
|
||||||
|
// plus tard avec ke fixed header et la taille.
|
||||||
|
// Cela changerait en revanche le début du message qui ne serait plus
|
||||||
|
// buffer[0], mais buffer[0..3] selon la taille du message.
|
||||||
|
|
||||||
|
++ins;
|
||||||
|
} while (length);
|
||||||
state = Complete;
|
state = Complete;
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
MqttError MqttMessage::sendTo(MqttClient* client) const
|
MqttError MqttMessage::sendTo(MqttClient* client) const
|
||||||
{
|
{
|
||||||
if (buffer.size())
|
if (buffer.size())
|
||||||
{
|
{
|
||||||
debug("sending " << buffer.size() << " bytes");
|
debug("sending " << buffer.size() << " bytes");
|
||||||
encodeLength(&buffer[1], buffer.size()-2);
|
encodeLength();
|
||||||
// hexdump("snd");
|
// hexdump("snd");
|
||||||
client->write(&buffer[0], buffer.size());
|
client->write(&buffer[0], buffer.size());
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -101,8 +101,6 @@ class MqttMessage
|
|||||||
void add(const Topic& t) { add(t.str()); }
|
void add(const Topic& t) { add(t.str()); }
|
||||||
const char* end() const { return &buffer[0]+buffer.size(); }
|
const char* end() const { return &buffer[0]+buffer.size(); }
|
||||||
const char* getVHeader() const { return &buffer[vheader]; }
|
const char* getVHeader() const { return &buffer[vheader]; }
|
||||||
uint16_t length() const { return buffer.size(); }
|
|
||||||
void complete();
|
|
||||||
|
|
||||||
void reset();
|
void reset();
|
||||||
|
|
||||||
@@ -127,12 +125,12 @@ class MqttMessage
|
|||||||
void hexdump(const char* prefix=nullptr) const;
|
void hexdump(const char* prefix=nullptr) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void encodeLength(char* msb, int length) const;
|
void encodeLength() const;
|
||||||
|
|
||||||
mutable std::string buffer; // mutable -> sendTo()
|
mutable std::string buffer; // mutable -> sendTo()
|
||||||
uint8_t vheader;
|
uint8_t vheader;
|
||||||
uint16_t size; // bytes left to receive
|
uint16_t size; // bytes left to receive
|
||||||
State state;
|
mutable State state; // mutable -> encodeLength()
|
||||||
};
|
};
|
||||||
|
|
||||||
class MqttBroker;
|
class MqttBroker;
|
||||||
@@ -172,7 +170,14 @@ class MqttClient
|
|||||||
/** Should be called in main loop() */
|
/** Should be called in main loop() */
|
||||||
void loop();
|
void loop();
|
||||||
void close(bool bSendDisconnect=true);
|
void close(bool bSendDisconnect=true);
|
||||||
void setCallback(CallBack fun) {callback=fun; };
|
void setCallback(CallBack fun)
|
||||||
|
{
|
||||||
|
callback=fun;
|
||||||
|
#ifdef TINY_MQTT_DEBUG
|
||||||
|
Serial << "Callback set to " << (long)fun << endl;
|
||||||
|
if (callback) callback(this, "test/topic", "value", 5);
|
||||||
|
#endif
|
||||||
|
};
|
||||||
|
|
||||||
// Publish from client to the world
|
// Publish from client to the world
|
||||||
MqttError publish(const Topic&, const char* payload, size_t pay_length);
|
MqttError publish(const Topic&, const char* payload, size_t pay_length);
|
||||||
@@ -214,6 +219,8 @@ class MqttClient
|
|||||||
static long counter;
|
static long counter;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
|
// event when tcp/ip link established (real or fake)
|
||||||
static void onConnect(void * client_ptr, TcpClient*);
|
static void onConnect(void * client_ptr, TcpClient*);
|
||||||
#ifdef TCP_ASYNC
|
#ifdef TCP_ASYNC
|
||||||
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
|
static void onData(void* client_ptr, TcpClient*, void* data, size_t len);
|
||||||
@@ -240,7 +247,7 @@ class MqttClient
|
|||||||
// (this is the case when MqttBroker isn't used except here)
|
// (this is the case when MqttBroker isn't used except here)
|
||||||
MqttBroker* parent=nullptr; // connection to local broker
|
MqttBroker* parent=nullptr; // connection to local broker
|
||||||
|
|
||||||
TcpClient* client=nullptr; // connection to mqtt client or to remote broker
|
TcpClient* client=nullptr; // connection to remote broker
|
||||||
std::set<Topic> subscriptions;
|
std::set<Topic> subscriptions;
|
||||||
std::string clientId;
|
std::string clientId;
|
||||||
CallBack callback = nullptr;
|
CallBack callback = nullptr;
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#include <Arduino.h>
|
||||||
#include <AUnit.h>
|
#include <AUnit.h>
|
||||||
#include <TinyMqtt.h>
|
#include <TinyMqtt.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#include <Arduino.h>
|
||||||
#include <AUnit.h>
|
#include <AUnit.h>
|
||||||
#include <TinyMqtt.h>
|
#include <TinyMqtt.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
#include <Arduino.h>
|
||||||
#include <AUnit.h>
|
#include <AUnit.h>
|
||||||
#include <StringIndexer.h>
|
#include <StringIndexer.h>
|
||||||
#include <map>
|
#include <map>
|
||||||
|
|||||||
Reference in New Issue
Block a user