This chapter provides a detailed breakdown of how
optimises its
communications with the Broker with multiflight mode for simultaneous message
handling and strategies for efficient bandwidth usage. These techniques are
key to getting the most out of MQTT in scenarios demanding fast and dependable
message delivery, all while meeting the protocol's quality of service requirements
and network efficiency standards.
mqtt_client
The Async.MQTT5
library introduces a multiflight feature. This allows the initiation of multiple
asynchronous requests simultaneously, without waiting for the completion
of the previous requests. With this feature, you can repeatedly call mqtt_client::async_publish
or any similar async_xxx
function without waiting for the handler invocation of the previous async_xxx
calls.
This feature is particularly helpful when using
with callbacks,
as it allows you to quickly dispatch multiple requests one after the other,
instead of nesting them in callbacks.
mqtt_client
Consider the example below, mqtt_client::async_publish
with QoS 2 is called 5
times
in a for
loop. QoS level 2 ensures
that each message is delivered exactly once and involves a four-step communication
process: sending a PUBLISH
packet, receiving a PUBREC
acknowledgement from the
Broker, transmitting a PUBREL
packet, and finally receiving
a PUBCOMP
packet, confirming successful
message delivery.
Despite the complexity of initiating several such message exchange sequences
consecutively, the
will manage
all intermediate packet exchanges between the mqtt_client
and the Broker
correctly and complete the message delivery.
mqtt_client
It is important to note that there is no guarantee that the final handlers
will be invoked in the same order as the corresponding async_xxx
calls were initiated.
Source: multiflight_client.cpp
#include <iostream>
#include <string>
#include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>
#include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>
struct config {
std::string brokers = "broker.hivemq.com";
uint16_t port = 1883;
std::string client_id = "async_mqtt5_tester";
};
int main(int argc, char** argv) {
config cfg;
if (argc == 4) {
cfg.brokers = argv[1];
cfg.port = uint16_t(std::stoi(argv[2]));
cfg.client_id = argv[3];
}
boost::asio::io_context ioc;
// Construct the Client with boost::asio::ip::tcp::socket
as the underlying stream and enabled logging.
// Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
async_mqtt5::mqtt_client<
boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
> client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));
client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
.credentials(cfg.client_id) // Set the Client Identifier. (optional)
.async_run(boost::asio::detached); // Start the client.
// Publish with QoS 2 five times in a row without waiting for the handler
// of the previous async_publish call to be invoked.
for (auto i = 1; i <= 5; ++i)
client.async_publish<async_mqtt5::qos_e::exactly_once>(
"async-mqtt5/test", "Hello world!",
async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
[i](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::pubcomp_props) {
std::cout << "Publish number " << i << " completed with: " << std::endl;
std::cout << "\t ec: " << ec.message() << std::endl;
std::cout << "\t rc: " << rc.message() << std::endl;
}
);
// We can stop the Client by using signals.
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
signals.async_wait([&client](async_mqtt5::error_code, int) {
client.async_disconnect(boost::asio::detached);
});
ioc.run();
}
The
employs a strategic queuing mechanism crucial in optimising network usage
and performance for the user's requests. This mechanism bundles multiple
MQTT packets for transmission within a single TCP packet whenever feasible
[2]. This significantly reduces performance overhead, enhances data
output, and reduces the latency associated with individual packet transmissions.
This results in fast performance and efficient use of network resources.
mqtt_client
Additionally, the queuing mechanism ensures that
complies with
the mqtt_client
Receive Maximum
value set by the Broker. This value is used to implement a send quota to
restrict the number of PUBLISH
packets with QoS > 0
that have not received an acknowledgement (PUBACK
for QoS 1 and PUBCOMP
for QoS 2) (see Flow Control
).
When mqtt_client::async_publish
with QoS > 0 is invoked,
evaluates
the current count of unacknowledged mqtt_client
PUBLISH
packets against the Broker's
Receive Maximum
threshold. If the count is below this threshold,
dispatches
the mqtt_client
PUBLISH
packet. Otherwise, it remains
in the queue until the count decreases below the threshold.
As a result, in the multiflight_client.cpp
example, the
will transmit
all mqtt_client
5
PUBLISH
packets in a single TCP
packet if possible [3].
The
uses a packet ordering mechanism to manage the queued packets pending dispatch
to the Broker. The most important ordering rules are:
mqtt_client
- The PUBLISH
packets are transmitted
in the order they were initiated through mqtt_client::async_publish
calls. This sequential integrity is preserved even in instances requiring
packet retransmission, ensuring consistency in message delivery order. However,
it is important to note that sequentiality is not preserved between QoS 0
and QoS > 0 packets when the Broker sets up the Receive
Maximum
value. A Broker can set
this value to limit the number of simultaneous QoS > 0 messages they can
process, potentially causing QoS 0 messages to be transmitted ahead of QoS
> 0 messages in the delivery order.
- The DISCONNECT
packet is sent in a single TCP packet before any other packets in
the queue. See Disconnecting
the client for more information about disconnecting.
[2]
Requests are queued and bundled whenever the
is in progress
of writing previous request(s) to the transport.
mqtt_client
[3]
The Broker's Receive Maximum
is equal to or greater than 5
.