--- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Fri Dec 16 12:12:01 2022 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Fri Dec 16 22:16:45 2022 +0000
@@ -1,8 +1,46 @@
module:set_global();
local mqtt = module:require "mqtt";
+local id = require "util.id";
local st = require "util.stanza";
+local function tostring_content(item)
+ return tostring(item[1]);
+end
+
+local data_translators = setmetatable({
+ utf8 = {
+ from_item = function (item)
+ return item:find("{https://prosody.im/protocol/data}data#");
+ end;
+ to_item = function (payload)
+ return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
+ :text_tag("data", payload, { xmlns = "https://prosody.im/protocol/data" })
+ end;
+ };
+ json = {
+ from_item = function (item)
+ return item:find("{urn:xmpp:json:0}json#");
+ end;
+ to_item = function (payload)
+ return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
+ :text_tag("json", payload, { xmlns = "urn:xmpp:json:0" });
+ end;
+ };
+ atom_title = {
+ from_item = function (item)
+ return item:find("{http://www.w3.org/2005/Atom}entry/title#");
+ end;
+ to_item = function (payload)
+ return st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id.medium() })
+ :tag("entry", { xmlns = "http://www.w3.org/2005/Atom" })
+ :text_tag("title", payload, { type = "text" });
+ end;
+ };
+}, {
+ __index = function () return { from_item = tostring }; end;
+});
+
local pubsub_services = {};
local pubsub_subscribers = {};
local packet_handlers = {};
@@ -33,17 +71,25 @@
function packet_handlers.publish(session, packet)
module:log("info", "PUBLISH to %s", packet.topic);
- local host, node = packet.topic:match("^([^/]+)/(.+)$");
+ local host, payload_type, node = packet.topic:match("^([^/]+)/([^/]+)/(.+)$");
+ if not host then
+ module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
+ return;
+ end
local pubsub = pubsub_services[host];
if not pubsub then
module:log("warn", "Unable to locate host/node: %s", packet.topic);
return;
end
- local id = "mqtt";
- local ok, err = pubsub:publish(node, true, id,
- st.stanza("item", { xmlns = "http://jabber.org/protocol/pubsub", id = id })
- :text_tag("data", packet.data, { xmlns = "https://prosody.im/protocol/data" })
- );
+
+ local payload_translator = data_translators[payload_type];
+ if not payload_translator or not payload_translator.to_item then
+ module:log("warn", "Unsupported payload type '%s' on topic '%s'", payload_type, packet.topic);
+ return;
+ end
+
+ local payload_item = payload_translator.to_item(packet.data);
+ local ok, err = pubsub:publish(node, true, payload_item.attr.id, payload_item);
if not ok then
module:log("warn", "Error publishing MQTT data: %s", tostring(err));
end
@@ -52,7 +98,11 @@
function packet_handlers.subscribe(session, packet)
for _, topic in ipairs(packet.topics) do
module:log("info", "SUBSCRIBE to %s", topic);
- local host, node = topic:match("^([^/]+)/(.+)$");
+ local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
+ if not host then
+ module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
+ return;
+ end
local pubsub = pubsub_subscribers[host];
if not pubsub then
module:log("warn", "Unable to locate host/node: %s", topic);
@@ -63,8 +113,8 @@
node_subs = {};
pubsub[node] = node_subs;
end
- session.subscriptions[topic] = true;
- node_subs[session] = true;
+ session.subscriptions[topic] = payload_type;
+ node_subs[session] = payload_type;
end
end
@@ -116,17 +166,6 @@
listener = mqtt_listener;
});
-local function tostring_content(item)
- return tostring(item[1]);
-end
-
-local data_translators = setmetatable({
- ["data https://prosody.im/protocol/data"] = tostring_content;
- ["json urn:xmpp:json:0"] = tostring_content;
-}, {
- __index = function () return tostring; end;
-});
-
function module.add_host(module)
local pubsub_module = hosts[module.host].modules.pubsub
if pubsub_module then
@@ -137,16 +176,22 @@
pubsub_subscribers[module.host] = subscribers;
local function handle_publish(event)
-- Build MQTT packet
- local packet = mqtt.serialize_packet{
- type = "publish";
- id = "\000\000";
- topic = module.host.."/"..event.node;
- data = data_translators[tostring(event.item.name).." "..tostring(event.item.attr.xmlns)](event.item);
- };
+ local packet_types = setmetatable({}, {
+ __index = function (self, payload_type)
+ local packet = mqtt.serialize_packet{
+ type = "publish";
+ id = "\000\000";
+ topic = module.host.."/"..payload_type.."/"..event.node;
+ data = data_translators[payload_type].from_item(event.item) or "";
+ };
+ rawset(self, packet);
+ return packet;
+ end;
+ });
-- Broadcast to subscribers
- module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
- for session in pairs(subscribers[event.node] or {}) do
- session.conn:write(packet);
+ module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node);
+ for session, payload_type in pairs(subscribers[event.node] or {}) do
+ session.conn:write(packet_types[payload_type]);
module:log("debug", "Sent to %s", tostring(session));
end
end