--- a/mod_pubsub_mqtt/README.markdown Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/README.markdown Wed Feb 07 11:57:30 2024 +0000
@@ -11,35 +11,38 @@
to embedded devices. This module provides a way for MQTT clients to
connect to Prosody and publish or subscribe to local pubsub nodes.
+The module currently implements MQTT version 3.1.1.
+
Details
-------
MQTT has the concept of 'topics' (similar to XMPP's pubsub 'nodes').
mod\_pubsub\_mqtt maps pubsub nodes to MQTT topics of the form
-`HOST/NODE`, e.g.`pubsub.example.org/mynode`.
+`<HOST>/<TYPE>/<NODE>`, e.g.`pubsub.example.org/json/mynode`.
+
+The 'TYPE' parameter in the topic allows the client to choose the payload
+format it will send/receive. For the supported values of 'TYPE' see the
+'Payloads' section below.
### Limitations
The current implementation is quite basic, and in particular:
- Authentication is not supported
-- SSL/TLS is not supported
- Only QoS level 0 is supported
### Payloads
XMPP payloads are always XML, but MQTT does not define a payload format.
-Therefore mod\_pubsub\_mqtt will attempt to convert data of certain
-recognised payload types. Currently supported:
+Therefore mod\_pubsub\_mqtt has some built-in data format translators.
+
+Currently supported data types:
-- JSON (see [XEP-0335](http://xmpp.org/extensions/xep-0335.html) for
- the format)
-- Plain UTF-8 text (wrapped inside
+- `json`: See [XEP-0335](http://xmpp.org/extensions/xep-0335.html) for
+ the format.
+- `utf8`: Plain UTF-8 text (wrapped inside
`<data xmlns="https://prosody.im/protocol/mqtt"/>`)
-
-All other XMPP payload types are sent to the client directly as XML.
-Data published by MQTT clients is currently never translated, and always
-treated as UTF-8 text.
+- `atom_title`: Returns the title of an Atom entry as UTF-8 data
Configuration
-------------
@@ -51,16 +54,15 @@
modules_enabled = { "pubsub_mqtt" }
You may also configure which port(s) mod\_pubsub\_mqtt listens on using
-Prosody's standard config directives, such as `mqtt_ports`. Network
-settings **must** be specified in the global section of the config file,
-not under any particular pubsub component. The default port is 1883
-(MQTT's standard port number).
+Prosody's standard config directives, such as `mqtt_ports` and
+`mqtt_tls_ports`. Network settings **must** be specified in the global section
+of the config file, not under any particular pubsub component. The default
+port is 1883 (MQTT's standard port number) and 8883 for TLS connections.
Compatibility
-------------
------- --------------
trunk Works
- 0.9 Works
- 0.8 Doesn't work
+ 0.12 Works
------- --------------
--- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Wed Feb 07 11:57:30 2024 +0000
@@ -59,6 +59,15 @@
end
function packet_handlers.connect(session, packet)
+ module:log("info", "MQTT client connected (sending connack)");
+ module:log("debug", "MQTT version: %02x", packet.version);
+ if packet.version ~= 0x04 then -- Version mismatch
+ session.conn:write(mqtt.serialize_packet{
+ type = "connack";
+ data = string.char(0x00, 0x01);
+ });
+ return;
+ end
session.conn:write(mqtt.serialize_packet{
type = "connack";
data = string.char(0x00, 0x00);
@@ -96,27 +105,33 @@
end
function packet_handlers.subscribe(session, packet)
- for _, topic in ipairs(packet.topics) do
+ local results = {};
+ for i, topic in ipairs(packet.topics) do
module:log("info", "SUBSCRIBE to %s", topic);
local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
if not host then
module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
- return;
- end
- local pubsub = pubsub_subscribers[host];
- if not pubsub then
- module:log("warn", "Unable to locate host/node: %s", topic);
- return;
+ results[i] = 0x80; -- Failure
+ else
+ local pubsub = pubsub_subscribers[host];
+ if not pubsub then
+ module:log("warn", "Unable to locate host/node: %s", topic);
+ results[i] = 0x80; -- Failure
+ else
+ local node_subs = pubsub[node];
+ if not node_subs then
+ node_subs = {};
+ pubsub[node] = node_subs;
+ end
+ session.subscriptions[topic] = payload_type;
+ node_subs[session] = payload_type;
+ module:log("debug", "Successfully subscribed to %s", topic);
+ results[i] = 0x00; -- Success
+ end
end
- local node_subs = pubsub[node];
- if not node_subs then
- node_subs = {};
- pubsub[node] = node_subs;
- end
- session.subscriptions[topic] = payload_type;
- node_subs[session] = payload_type;
end
-
+ local ack = mqtt.serialize_packet{ type = "suback", id = packet.id, results = results };
+ session.conn:write(ack);
end
function packet_handlers.pingreq(session, packet)
@@ -191,7 +206,7 @@
topic = module.host.."/"..payload_type.."/"..event.node;
data = data_translators[payload_type].from_item(event.item) or "";
};
- rawset(self, packet);
+ rawset(self, payload_type, packet);
return packet;
end;
});
--- a/mod_pubsub_mqtt/mqtt.lib.lua Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/mqtt.lib.lua Wed Feb 07 11:57:30 2024 +0000
@@ -1,4 +1,4 @@
-local bit = require "bit";
+local bit = require "util.bitcompat";
local stream_mt = {};
stream_mt.__index = stream_mt;
@@ -29,10 +29,25 @@
return self:read_bytes(len), len+2;
end
+function stream_mt:read_word()
+ local len1, len2 = self:read_bytes(2):byte(1,2);
+ local result = bit.lshift(len1, 8) + len2;
+ module:log("debug", "read_word(%02x, %02x) = %04x (%d)", len1, len2, result, result);
+ return result;
+end
+
+local function hasbit(byte, n_bit)
+ return bit.band(byte, 2^n_bit) ~= 0;
+end
+
+local function encode_string(str)
+ return string.char(bit.band(#str, 0xff00), bit.band(#str, 0x00ff))..str;
+end
+
local packet_type_codes = {
"connect", "connack",
"publish", "puback", "pubrec", "pubrel", "pubcomp",
- "subscribe", "subak", "unsubscribe", "unsuback",
+ "subscribe", "suback", "unsubscribe", "unsuback",
"pingreq", "pingresp",
"disconnect"
};
@@ -59,9 +74,46 @@
packet.type = nil; -- Invalid packet
else
packet.version = self:read_bytes(1):byte();
- packet.connect_flags = self:read_bytes(1):byte();
- packet.keepalive_timer = self:read_bytes(1):byte();
+ module:log("debug", "ver: %02x", packet.version);
+ if packet.version ~= 0x04 then
+ module:log("warn", "MQTT version mismatch (got %02x, we support %02x", packet.version, 0x04);
+ end
+ local flags = self:read_bytes(1):byte();
+ module:log("debug", "flags: %02x", flags);
+ packet.keepalive_timer = self:read_bytes(2):byte();
+ module:log("debug", "keepalive: %d", packet.keepalive_timer);
+ packet.connect_flags = {};
length = length - 11;
+ packet.connect_flags = {
+ clean_session = hasbit(flags, 1);
+ will = hasbit(flags, 2);
+ will_qos = bit.band(bit.rshift(flags, 2), 0x02);
+ will_retain = hasbit(flags, 5);
+ user_name = hasbit(flags, 7);
+ password = hasbit(flags, 6);
+ };
+ module:log("debug", "%s", require "util.serialization".serialize(packet.connect_flags, "debug"));
+ module:log("debug", "Reading client_id...");
+ packet.client_id = self:read_string();
+ if packet.connect_flags.will then
+ module:log("debug", "Reading will...");
+ packet.will = {
+ topic = self:read_string();
+ message = self:read_string();
+ qos = packet.connect_flags.will_qos;
+ retain = packet.connect_flags.will_retain;
+ };
+ end
+ if packet.connect_flags.user_name then
+ module:log("debug", "Reading username...");
+ packet.username = self:read_string();
+ end
+ if packet.connect_flags.password then
+ module:log("debug", "Reading password...");
+ packet.password = self:read_string();
+ end
+ module:log("debug", "Done parsing connect!");
+ length = 0; -- No payload left
end
elseif packet.type == "publish" then
packet.topic = self:read_string();
@@ -87,6 +139,7 @@
if length > 0 then
packet.data = self:read_bytes(length);
end
+ module:log("debug", "MQTT packet complete!");
return packet;
end
@@ -102,7 +155,6 @@
end
function stream_mt:feed(data)
- module:log("debug", "Feeding %d bytes", #data);
local packets = {};
local packet = self.parser(data);
while packet do
@@ -135,10 +187,10 @@
packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
elseif packet.type == "suback" then
local t = {};
- for _, topic in ipairs(packet.topics) do
- table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
+ for i, result_code in ipairs(packet.results) do
+ table.insert(t, string.char(result_code));
end
- packet.data = table.concat(t);
+ packet.data = packet.id..table.concat(t);
end
-- Get length