PrevUpHomeNext

Optimising Communication

This chapter provides a detailed breakdown of how mqtt_client optimises its communications with the Broker with multiflight mode for simultaneous message handling and strategies for efficient bandwidth usage. These techniques are key to getting the most out of MQTT in scenarios demanding fast and dependable message delivery, all while meeting the protocol's quality of service requirements and network efficiency standards.

The Async.MQTT5 library introduces a multiflight feature. This allows the initiation of multiple asynchronous requests simultaneously, without waiting for the completion of the previous requests. With this feature, you can repeatedly call mqtt_client::async_publish or any similar async_xxx function without waiting for the handler invocation of the previous async_xxx calls.

This feature is particularly helpful when using mqtt_client with callbacks, as it allows you to quickly dispatch multiple requests one after the other, instead of nesting them in callbacks.

Consider the example below, mqtt_client::async_publish with QoS 2 is called 5 times in a for loop. QoS level 2 ensures that each message is delivered exactly once and involves a four-step communication process: sending a PUBLISH packet, receiving a PUBREC acknowledgement from the Broker, transmitting a PUBREL packet, and finally receiving a PUBCOMP packet, confirming successful message delivery.

Despite the complexity of initiating several such message exchange sequences consecutively, the mqtt_client will manage all intermediate packet exchanges between the mqtt_client and the Broker correctly and complete the message delivery.

It is important to note that there is no guarantee that the final handlers will be invoked in the same order as the corresponding async_xxx calls were initiated.

Source: multiflight_client.cpp

#include <iostream>
#include <string>

#include <boost/asio/io_context.hpp>
#include <boost/asio/detached.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/signal_set.hpp>

#include <async_mqtt5/logger.hpp>
#include <async_mqtt5/mqtt_client.hpp>
#include <async_mqtt5/reason_codes.hpp>
#include <async_mqtt5/types.hpp>

struct config {
	std::string brokers = "broker.hivemq.com";
	uint16_t port = 1883;
	std::string client_id = "async_mqtt5_tester";
};

int main(int argc, char** argv) {
	config cfg;

	if (argc == 4) {
		cfg.brokers = argv[1];
		cfg.port = uint16_t(std::stoi(argv[2]));
		cfg.client_id = argv[3];
	}

	boost::asio::io_context ioc;

	// Construct the Client with boost::asio::ip::tcp::socket as the underlying stream and enabled logging.
	// Since we are not establishing a secure connection, set the TlsContext template parameter to std::monostate.
	async_mqtt5::mqtt_client<
		boost::asio::ip::tcp::socket, std::monostate /* TlsContext */, async_mqtt5::logger
	> client(ioc, {} /* tls_context */, async_mqtt5::logger(async_mqtt5::log_level::info));

	client.brokers(cfg.brokers, cfg.port) // Broker that we want to connect to.
		.credentials(cfg.client_id) // Set the Client Identifier. (optional)
		.async_run(boost::asio::detached); // Start the client.

	// Publish with QoS 2 five times in a row without waiting for the handler
	// of the previous async_publish call to be invoked.
	for (auto i = 1; i <= 5; ++i)
		client.async_publish<async_mqtt5::qos_e::exactly_once>(
			"async-mqtt5/test", "Hello world!",
			async_mqtt5::retain_e::no, async_mqtt5::publish_props {},
			[i](async_mqtt5::error_code ec, async_mqtt5::reason_code rc, async_mqtt5::pubcomp_props) {
				std::cout << "Publish number " << i << " completed with: " << std::endl;
				std::cout << "\t ec: " << ec.message() << std::endl;
				std::cout << "\t rc: " << rc.message() << std::endl;
			}
		);

	// We can stop the Client by using signals.
	boost::asio::signal_set signals(ioc, SIGINT, SIGTERM);
	signals.async_wait([&client](async_mqtt5::error_code, int) {
		client.async_disconnect(boost::asio::detached);
	});

	ioc.run();
}

The mqtt_client employs a strategic queuing mechanism crucial in optimising network usage and performance for the user's requests. This mechanism bundles multiple MQTT packets for transmission within a single TCP packet whenever feasible [2]. This significantly reduces performance overhead, enhances data output, and reduces the latency associated with individual packet transmissions. This results in fast performance and efficient use of network resources.

Additionally, the queuing mechanism ensures that mqtt_client complies with the Receive Maximum value set by the Broker. This value is used to implement a send quota to restrict the number of PUBLISH packets with QoS > 0 that have not received an acknowledgement (PUBACK for QoS 1 and PUBCOMP for QoS 2) (see Flow Control). When mqtt_client::async_publish with QoS > 0 is invoked, mqtt_client evaluates the current count of unacknowledged PUBLISH packets against the Broker's Receive Maximum threshold. If the count is below this threshold, mqtt_client dispatches the PUBLISH packet. Otherwise, it remains in the queue until the count decreases below the threshold.

As a result, in the multiflight_client.cpp example, the mqtt_client will transmit all 5 PUBLISH packets in a single TCP packet if possible [3].

The mqtt_client uses a packet ordering mechanism to manage the queued packets pending dispatch to the Broker. The most important ordering rules are:

- The PUBLISH packets are transmitted in the order they were initiated through mqtt_client::async_publish calls. This sequential integrity is preserved even in instances requiring packet retransmission, ensuring consistency in message delivery order. However, it is important to note that sequentiality is not preserved between QoS 0 and QoS > 0 packets when the Broker sets up the Receive Maximum value. A Broker can set this value to limit the number of simultaneous QoS > 0 messages they can process, potentially causing QoS 0 messages to be transmitted ahead of QoS > 0 messages in the delivery order.

- The DISCONNECT packet is sent in a single TCP packet before any other packets in the queue. See Disconnecting the client for more information about disconnecting.



[2] Requests are queued and bundled whenever the mqtt_client is in progress of writing previous request(s) to the transport.

[3] The Broker's Receive Maximum is equal to or greater than 5.


PrevUpHomeNext