[Retain] Fix bug in last retained msg and add retain commands to tinytests
This commit is contained in:
@@ -448,6 +448,10 @@ void eval(string &cmd)
|
||||
Console << " " << fb.first << " : " << fb.second.url << ":" << fb.second.port << endl;
|
||||
}
|
||||
}
|
||||
else if (compare(s, "free"))
|
||||
{
|
||||
Console << "Free memory: " << ESP.getFreeHeap() << endl;
|
||||
}
|
||||
else if (compare(s, "delete"))
|
||||
{
|
||||
if (client == nullptr && broker == nullptr)
|
||||
@@ -497,7 +501,22 @@ void eval(string &cmd)
|
||||
{
|
||||
if (compare(s, "connect"))
|
||||
{
|
||||
Console << "NYI" << endl;
|
||||
string remote = getword(cmd);
|
||||
int port = getint(cmd);
|
||||
if (port == 0) port=1883;
|
||||
broker->connect(remote, port);
|
||||
if (broker->connected())
|
||||
Console << "Broker connected";
|
||||
else
|
||||
Console << red << "Unable to connect";
|
||||
|
||||
Console << " to " << remote << ':' << port << white << endl;
|
||||
}
|
||||
else if (compare(s, "retain"))
|
||||
{
|
||||
if (cmd.size())
|
||||
broker->retain(getint(cmd));
|
||||
Console << "retain=" << broker->retain() << ", retained msg=" << broker->retainCount() << endl;
|
||||
}
|
||||
else if (compare(s, "view"))
|
||||
{
|
||||
@@ -515,6 +534,10 @@ void eval(string &cmd)
|
||||
{
|
||||
clientConnect(client, cmd);
|
||||
}
|
||||
else if (compare(s, "rpublish"))
|
||||
{
|
||||
retval = client->publish(getword(cmd), getword(cmd), true);
|
||||
}
|
||||
else if (compare(s, "publish"))
|
||||
{
|
||||
retval = client->publish(getword(cmd), getword(cmd));
|
||||
@@ -760,7 +783,8 @@ void eval(string &cmd)
|
||||
Console << save_cursor << magenta;
|
||||
Console.gotoxy(1, 1);
|
||||
}
|
||||
Console << "--< " << '/' << clients.size() << " client/s. >--" << erase_to_end << endl;
|
||||
Console << "--< " << '/' << clients.size() << " client/s. >--" << erase_to_end;
|
||||
Console << " (FreeMem: " << ESP.getFreeHeap() << ')' << endl;
|
||||
for (auto it : clients)
|
||||
{
|
||||
it.second->dump(" ");
|
||||
@@ -786,19 +810,22 @@ void eval(string &cmd)
|
||||
{
|
||||
Console << "syntax:" << endl;
|
||||
Console << " MqttBroker:" << endl;
|
||||
Console << " broker {broker_name} {port} : create a new broker" << endl;
|
||||
Console << " broker_name can be one of 'list'" << endl;
|
||||
Console << " broker_name.delete : delete a broker (buggy)" << endl;
|
||||
Console << " broker_name.view : dump a broker" << endl;
|
||||
Console << " broker {name} {port} : create a new broker" << endl;
|
||||
Console << " name can be one of 'list'" << endl;
|
||||
Console << " name.delete : delete a broker (buggy)" << endl;
|
||||
Console << " name.retain [#] : show/set retain value" << endl;
|
||||
Console << " name.view : dump a broker" << endl;
|
||||
Console << endl;
|
||||
Console << " MqttClient:" << endl;
|
||||
Console << " client {name} {broker} : create a client then" << endl;
|
||||
Console << " name.connect [ip] [port] [alive]" << endl;
|
||||
Console << " name.[un]subscribe topic" << endl;
|
||||
Console << " name.publish topic [payload]" << endl;
|
||||
Console << " name.rpublish topic [payload] : publish a retained message" << endl;
|
||||
Console << " name.view" << endl;
|
||||
Console << " name.delete" << endl;
|
||||
Console << endl;
|
||||
Console << " free : view free mem" << endl;
|
||||
Console << " list : list of free brokers (debug 1 advised)" << endl;
|
||||
Console << " debug #" << endl;
|
||||
Console << " list : get list of free brokers" << endl;
|
||||
|
||||
@@ -957,14 +957,19 @@ void MqttBroker::retainDrop()
|
||||
|
||||
void MqttBroker::retain(const Topic& topic, const MqttMessage& msg)
|
||||
{
|
||||
debug("MqttBroker::retain msg_type=" << _HEX(msg.type()));
|
||||
debug("MqttBroker::retain msg_type=" << _HEX(msg.type()) << ", retain_size=" << retain_size);
|
||||
if (retain_size==0 or msg.type() != MqttMessage::Publish) return;
|
||||
if (msg.flags() & 1) // flag RETAIN
|
||||
{
|
||||
debug(" retaining " << topic.str());
|
||||
if (retained.find(topic) == retained.end()) retainDrop();
|
||||
auto old = retained.find(topic);
|
||||
if (old == retained.end())
|
||||
retainDrop();
|
||||
else
|
||||
retained.erase(old);
|
||||
// FIXME if payload size == 0 remove message from retained
|
||||
Retain r(micros(), msg);
|
||||
r.msg.retained();
|
||||
retained.insert({ topic, std::move(r)});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -133,6 +133,7 @@ class MqttMessage
|
||||
const char* end() const { return &buffer[0]+buffer.size(); }
|
||||
const char* getVHeader() const { return &buffer[vheader]; }
|
||||
void complete() { encodeLength(); }
|
||||
void retained() { if ((buffer[0] & 0xF)==Publish) buffer[0] |= 1; }
|
||||
|
||||
void reset();
|
||||
|
||||
@@ -346,7 +347,9 @@ class MqttBroker
|
||||
bool connected() const { return remote_broker ? remote_broker->connected() : false; }
|
||||
|
||||
size_t clientsCount() const { return clients.size(); }
|
||||
void retain(uint8_t size) { retain_size = size; }
|
||||
uint8_t retain() { return retain_size; }
|
||||
void retain(uint8_t size) { retain_size = size; if (size==0) retained.clear(); }
|
||||
uint8_t retainCount() const { return retained.size(); }
|
||||
|
||||
void dump(string indent="")
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user