mod_pubsub_mqtt: Update to MQTT 3.1.1
authorMatthew Wild <mwild1@gmail.com>
Wed, 07 Feb 2024 11:57:30 +0000
changeset 5837 58df53eefa28
parent 5836 5afc8273c5ef
child 5838 866a49f5aa61
mod_pubsub_mqtt: Update to MQTT 3.1.1
mod_pubsub_mqtt/README.markdown
mod_pubsub_mqtt/mod_pubsub_mqtt.lua
mod_pubsub_mqtt/mqtt.lib.lua
--- a/mod_pubsub_mqtt/README.markdown	Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/README.markdown	Wed Feb 07 11:57:30 2024 +0000
@@ -11,35 +11,38 @@
 to embedded devices. This module provides a way for MQTT clients to
 connect to Prosody and publish or subscribe to local pubsub nodes.
 
+The module currently implements MQTT version 3.1.1.
+
 Details
 -------
 
 MQTT has the concept of 'topics' (similar to XMPP's pubsub 'nodes').
 mod\_pubsub\_mqtt maps pubsub nodes to MQTT topics of the form
-`HOST/NODE`, e.g.`pubsub.example.org/mynode`.
+`<HOST>/<TYPE>/<NODE>`, e.g.`pubsub.example.org/json/mynode`.
+
+The 'TYPE' parameter in the topic allows the client to choose the payload
+format it will send/receive. For the supported values of 'TYPE' see the
+'Payloads' section below.
 
 ### Limitations
 
 The current implementation is quite basic, and in particular:
 
 -   Authentication is not supported
--   SSL/TLS is not supported
 -   Only QoS level 0 is supported
 
 ### Payloads
 
 XMPP payloads are always XML, but MQTT does not define a payload format.
-Therefore mod\_pubsub\_mqtt will attempt to convert data of certain
-recognised payload types. Currently supported:
+Therefore mod\_pubsub\_mqtt has some built-in data format translators.
+
+Currently supported data types:
 
--   JSON (see [XEP-0335](http://xmpp.org/extensions/xep-0335.html) for
-    the format)
--   Plain UTF-8 text (wrapped inside
+-   `json`: See [XEP-0335](http://xmpp.org/extensions/xep-0335.html) for
+    the format.
+-   `utf8`: Plain UTF-8 text (wrapped inside
     `<data xmlns="https://prosody.im/protocol/mqtt"/>`)
-
-All other XMPP payload types are sent to the client directly as XML.
-Data published by MQTT clients is currently never translated, and always
-treated as UTF-8 text.
+-   `atom_title`: Returns the title of an Atom entry as UTF-8 data
 
 Configuration
 -------------
@@ -51,16 +54,15 @@
         modules_enabled = { "pubsub_mqtt" }
 
 You may also configure which port(s) mod\_pubsub\_mqtt listens on using
-Prosody's standard config directives, such as `mqtt_ports`. Network
-settings **must** be specified in the global section of the config file,
-not under any particular pubsub component. The default port is 1883
-(MQTT's standard port number).
+Prosody's standard config directives, such as `mqtt_ports` and
+`mqtt_tls_ports`. Network settings **must** be specified in the global section
+of the config file, not under any particular pubsub component. The default
+port is 1883 (MQTT's standard port number) and 8883 for TLS connections.
 
 Compatibility
 -------------
 
   ------- --------------
   trunk   Works
-  0.9     Works
-  0.8     Doesn't work
+  0.12    Works
   ------- --------------
--- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Wed Feb 07 11:57:30 2024 +0000
@@ -59,6 +59,15 @@
 end
 
 function packet_handlers.connect(session, packet)
+	module:log("info", "MQTT client connected (sending connack)");
+	module:log("debug", "MQTT version: %02x", packet.version);
+	if packet.version ~= 0x04 then -- Version mismatch
+		session.conn:write(mqtt.serialize_packet{
+			type = "connack";
+			data = string.char(0x00, 0x01);
+		});
+		return;
+	end
 	session.conn:write(mqtt.serialize_packet{
 		type = "connack";
 		data = string.char(0x00, 0x00);
@@ -96,27 +105,33 @@
 end
 
 function packet_handlers.subscribe(session, packet)
-	for _, topic in ipairs(packet.topics) do
+	local results = {};
+	for i, topic in ipairs(packet.topics) do
 		module:log("info", "SUBSCRIBE to %s", topic);
 		local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
 		if not host then
 			module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
-			return;
-		end
-		local pubsub = pubsub_subscribers[host];
-		if not pubsub then
-			module:log("warn", "Unable to locate host/node: %s", topic);
-			return;
+			results[i] = 0x80; -- Failure
+		else
+			local pubsub = pubsub_subscribers[host];
+			if not pubsub then
+				module:log("warn", "Unable to locate host/node: %s", topic);
+				results[i] = 0x80; -- Failure
+			else
+				local node_subs = pubsub[node];
+				if not node_subs then
+					node_subs = {};
+					pubsub[node] = node_subs;
+				end
+				session.subscriptions[topic] = payload_type;
+				node_subs[session] = payload_type;
+				module:log("debug", "Successfully subscribed to %s", topic);
+				results[i] = 0x00; -- Success
+			end
 		end
-		local node_subs = pubsub[node];
-		if not node_subs then
-			node_subs = {};
-			pubsub[node] = node_subs;
-		end
-		session.subscriptions[topic] = payload_type;
-		node_subs[session] = payload_type;
 	end
-
+	local ack = mqtt.serialize_packet{ type = "suback", id = packet.id, results = results };
+	session.conn:write(ack);
 end
 
 function packet_handlers.pingreq(session, packet)
@@ -191,7 +206,7 @@
 						topic = module.host.."/"..payload_type.."/"..event.node;
 						data = data_translators[payload_type].from_item(event.item) or "";
 					};
-					rawset(self, packet);
+					rawset(self, payload_type, packet);
 					return packet;
 				end;
 			});
--- a/mod_pubsub_mqtt/mqtt.lib.lua	Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/mqtt.lib.lua	Wed Feb 07 11:57:30 2024 +0000
@@ -1,4 +1,4 @@
-local bit = require "bit";
+local bit = require "util.bitcompat";
 
 local stream_mt = {};
 stream_mt.__index = stream_mt;
@@ -29,10 +29,25 @@
 	return self:read_bytes(len), len+2;
 end
 
+function stream_mt:read_word()
+	local len1, len2 = self:read_bytes(2):byte(1,2);
+	local result = bit.lshift(len1, 8) + len2;
+	module:log("debug", "read_word(%02x, %02x) = %04x (%d)", len1, len2, result, result);
+	return result;
+end
+
+local function hasbit(byte, n_bit)
+	return bit.band(byte, 2^n_bit) ~= 0;
+end
+
+local function encode_string(str)
+	return string.char(bit.band(#str, 0xff00), bit.band(#str, 0x00ff))..str;
+end
+
 local packet_type_codes = {
 	"connect", "connack",
 	"publish", "puback", "pubrec", "pubrel", "pubcomp",
-	"subscribe", "subak", "unsubscribe", "unsuback",
+	"subscribe", "suback", "unsubscribe", "unsuback",
 	"pingreq", "pingresp",
 	"disconnect"
 };
@@ -59,9 +74,46 @@
 			packet.type = nil; -- Invalid packet
 		else
 			packet.version = self:read_bytes(1):byte();
-			packet.connect_flags = self:read_bytes(1):byte();
-			packet.keepalive_timer = self:read_bytes(1):byte();
+			module:log("debug", "ver: %02x", packet.version);
+			if packet.version ~= 0x04 then
+				module:log("warn", "MQTT version mismatch (got %02x, we support %02x", packet.version, 0x04);
+			end
+			local flags = self:read_bytes(1):byte();
+			module:log("debug", "flags: %02x", flags);
+			packet.keepalive_timer = self:read_bytes(2):byte();
+			module:log("debug", "keepalive: %d", packet.keepalive_timer);
+			packet.connect_flags = {};
 			length = length - 11;
+			packet.connect_flags = {
+				clean_session = hasbit(flags, 1);
+				will = hasbit(flags, 2);
+				will_qos = bit.band(bit.rshift(flags, 2), 0x02);
+				will_retain = hasbit(flags, 5);
+				user_name = hasbit(flags, 7);
+				password = hasbit(flags, 6);
+			};
+			module:log("debug", "%s", require "util.serialization".serialize(packet.connect_flags, "debug"));
+			module:log("debug", "Reading client_id...");
+			packet.client_id = self:read_string();
+			if packet.connect_flags.will then
+				module:log("debug", "Reading will...");
+				packet.will = {
+					topic = self:read_string();
+					message = self:read_string();
+					qos = packet.connect_flags.will_qos;
+					retain = packet.connect_flags.will_retain;
+				};
+			end
+			if packet.connect_flags.user_name then
+				module:log("debug", "Reading username...");
+				packet.username = self:read_string();
+			end
+			if packet.connect_flags.password then
+				module:log("debug", "Reading password...");
+				packet.password = self:read_string();
+			end
+			module:log("debug", "Done parsing connect!");
+			length = 0; -- No payload left
 		end
 	elseif packet.type == "publish" then
 		packet.topic = self:read_string();
@@ -87,6 +139,7 @@
 	if length > 0 then
 		packet.data = self:read_bytes(length);
 	end
+	module:log("debug", "MQTT packet complete!");
 	return packet;
 end
 
@@ -102,7 +155,6 @@
 end
 
 function stream_mt:feed(data)
-	module:log("debug", "Feeding %d bytes", #data);
 	local packets = {};
 	local packet = self.parser(data);
 	while packet do
@@ -135,10 +187,10 @@
 		packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
 	elseif packet.type == "suback" then
 		local t = {};
-		for _, topic in ipairs(packet.topics) do
-			table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
+		for i, result_code in ipairs(packet.results) do
+			table.insert(t, string.char(result_code));
 		end
-		packet.data = table.concat(t);
+		packet.data = packet.id..table.concat(t);
 	end
 
 	-- Get length