mod_pubsub_mqtt: MQTT (a lightweight binary pubsub protocol) interface for mod_pubsub
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua Sun Dec 01 19:12:08 2013 +0000
@@ -0,0 +1,161 @@
+module:set_global();
+
+local mqtt = module:require "mqtt";
+local st = require "util.stanza";
+
+local pubsub_services = {};
+local pubsub_subscribers = {};
+local packet_handlers = {};
+
+function handle_packet(session, packet)
+ module:log("warn", "MQTT packet received! Length: %d", packet.length);
+ for k,v in pairs(packet) do
+ module:log("debug", "MQTT %s: %s", tostring(k), tostring(v));
+ end
+ local handler = packet_handlers[packet.type];
+ if not handler then
+ module:log("warn", "Unhandled command: %s", tostring(packet.type));
+ return;
+ end
+ handler(session, packet);
+end
+
+function packet_handlers.connect(session, packet)
+ session.conn:write(mqtt.serialize_packet{
+ type = "connack";
+ data = string.char(0x00, 0x00);
+ });
+end
+
+function packet_handlers.disconnect(session, packet)
+ session.conn:close();
+end
+
+function packet_handlers.publish(session, packet)
+ module:log("warn", "PUBLISH to %s", packet.topic);
+ local host, node = packet.topic:match("^([^/]+)/(.+)$");
+ local pubsub = pubsub_services[host];
+ if not pubsub then
+ module:log("warn", "Unable to locate host/node: %s", packet.topic);
+ return;
+ end
+ local id = "mqtt";
+ local ok, err = pubsub:publish(node, true, id,
+ st.stanza("data", { xmlns = "https://prosody.im/protocol/mqtt" })
+ :text(packet.data)
+ );
+ if not ok then
+ module:log("warn", "Error publishing MQTT data: %s", tostring(err));
+ end
+end
+
+function packet_handlers.subscribe(session, packet)
+ for _, topic in ipairs(packet.topics) do
+ module:log("warn", "SUBSCRIBE to %s", topic);
+ local host, node = topic:match("^([^/]+)/(.+)$");
+ local pubsub = pubsub_subscribers[host];
+ if not pubsub then
+ module:log("warn", "Unable to locate host/node: %s", topic);
+ return;
+ end
+ local node_subs = pubsub[node];
+ if not node_subs then
+ node_subs = {};
+ pubsub[node] = node_subs;
+ end
+ session.subscriptions[topic] = true;
+ node_subs[session] = true;
+ end
+
+end
+
+function packet_handlers.pingreq(session, packet)
+ session.conn:write(mqtt.serialize_packet{type = "pingresp"});
+end
+
+local sessions = {};
+
+local mqtt_listener = {};
+
+function mqtt_listener.onconnect(conn)
+ sessions[conn] = {
+ conn = conn;
+ stream = mqtt.new_stream();
+ subscriptions = {};
+ };
+end
+
+function mqtt_listener.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ local packets = session.stream:feed(data);
+ for i = 1, #packets do
+ handle_packet(session, packets[i]);
+ end
+ end
+end
+
+function mqtt_listener.ondisconnect(conn)
+ local session = sessions[conn];
+ for topic in pairs(session.subscriptions) do
+ local host, node = topic:match("^([^/]+)/(.+)$");
+ local subs = pubsub_subscribers[host];
+ if subs then
+ local node_subs = subs[node];
+ if node_subs then
+ node_subs[session] = nil;
+ end
+ end
+ end
+ sessions[conn] = nil;
+ module:log("debug", "MQTT client disconnected");
+end
+
+module:provides("net", {
+ default_port = 1883;
+ listener = mqtt_listener;
+});
+
+local function tostring_content(item)
+ return tostring(item[1]);
+end
+
+local data_translators = setmetatable({
+ ["data https://prosody.im/protocol/mqtt"] = tostring_content;
+ ["json urn:xmpp:json:0"] = tostring_content;
+}, {
+ __index = function () return tostring; end;
+});
+
+function module.add_host(module)
+ local pubsub_module = hosts[module.host].modules.pubsub
+ if pubsub_module then
+ module:log("debug", "MQTT enabled for %s", module.host);
+ module:depends("pubsub");
+ pubsub_services[module.host] = assert(pubsub_module.service);
+ local subscribers = {};
+ pubsub_subscribers[module.host] = subscribers;
+ local function handle_publish(event)
+ -- Build MQTT packet
+ local packet = mqtt.serialize_packet{
+ type = "publish";
+ id = "\000\000";
+ topic = module.host.."/"..event.node;
+ data = data_translators[event.item.name.." "..event.item.attr.xmlns](event.item);
+ };
+ -- Broadcast to subscribers
+ module:log("debug", "Broadcasting PUBLISH to subscribers of %s/%s", module.host, event.node);
+ for session in pairs(subscribers[event.node] or {}) do
+ session.conn:write(packet);
+ module:log("debug", "Sent to %s", tostring(session));
+ end
+ end
+ pubsub_services[module.host].events.add_handler("item-published", handle_publish);
+ function module.unload()
+ module:log("debug", "MQTT disabled for %s", module.host);
+ pubsub_module.service.remove_handler("item-published", handle_publish);
+ pubsub_services[module.host] = nil;
+ pubsub_subscribers[module.host] = nil;
+ end
+ end
+end
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_pubsub_mqtt/mqtt.lib.lua Sun Dec 01 19:12:08 2013 +0000
@@ -0,0 +1,161 @@
+local bit = require "bit";
+
+local stream_mt = {};
+stream_mt.__index = stream_mt;
+
+function stream_mt:read_bytes(n_bytes)
+ module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer);
+ local data = self.buffer;
+ if not data then
+ module:log("debug", "No data, pausing.");
+ data = coroutine.yield();
+ module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes);
+ end
+ if #data >= n_bytes then
+ data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1);
+ elseif #data < n_bytes then
+ module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes);
+ self.buffer = data..coroutine.yield();
+ module:log("debug", "Now we have %d bytes, reading...", #data);
+ return self:read_bytes(n_bytes);
+ end
+ module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer);
+ return data;
+end
+
+function stream_mt:read_string()
+ local len1, len2 = self:read_bytes(2):byte(1,2);
+ local len = bit.lshift(len1, 8) + len2;
+ return self:read_bytes(len), len+2;
+end
+
+local packet_type_codes = {
+ "connect", "connack",
+ "publish", "puback", "pubrec", "pubrel", "pubcomp",
+ "subscribe", "subak", "unsubscribe", "unsuback",
+ "pingreq", "pingresp",
+ "disconnect"
+};
+
+function stream_mt:read_packet()
+ local packet = {};
+ local header = self:read_bytes(1):byte();
+ packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)];
+ packet.dup = bit.band(header, 0x08) == 0x08;
+ packet.qos = bit.rshift(bit.band(header, 0x06), 1);
+ packet.retain = bit.band(header, 0x01) == 0x01;
+
+ -- Get length
+ local length, multiplier = 0, 1;
+ repeat
+ local digit = self:read_bytes(1):byte();
+ length = length + bit.band(digit, 0x7f)*multiplier;
+ multiplier = multiplier*128;
+ until bit.band(digit, 0x80) == 0;
+ packet.length = length;
+ if packet.type == "connect" then
+ if self:read_string() ~= "MQIsdp" then
+ module:log("warn", "Unexpected packet signature!");
+ packet.type = nil; -- Invalid packet
+ else
+ packet.version = self:read_bytes(1):byte();
+ packet.connect_flags = self:read_bytes(1):byte();
+ packet.keepalive_timer = self:read_bytes(1):byte();
+ length = length - 11;
+ end
+ elseif packet.type == "publish" then
+ packet.topic = self:read_string();
+ length = length - (#packet.topic+2);
+ if packet.qos == 1 or packet.qos == 2 then
+ packet.id = self:read_bytes(2);
+ length = length - 2;
+ end
+ elseif packet.type == "subscribe" then
+ if packet.qos == 1 or packet.qos == 2 then
+ packet.id = self:read_bytes(2);
+ length = length - 2;
+ end
+ local topics = {};
+ while length > 0 do
+ local topic, len = self:read_string();
+ table.insert(topics, topic);
+ self:read_bytes(1); -- QoS not used
+ length = length - (len+1);
+ end
+ packet.topics = topics;
+ end
+ if length > 0 then
+ packet.data = self:read_bytes(length);
+ end
+ return packet;
+end
+
+local function new_parser(self)
+ return coroutine.wrap(function (data)
+ self.buffer = data;
+ while true do
+ data = coroutine.yield(self:read_packet());
+ module:log("debug", "Parser: %d new bytes", #data);
+ self.buffer = (self.buffer or "")..data;
+ end
+ end);
+end
+
+function stream_mt:feed(data)
+ module:log("debug", "Feeding %d bytes", #data);
+ local packets = {};
+ local packet = self.parser(data);
+ while packet do
+ module:log("debug", "Received packet");
+ table.insert(packets, packet);
+ packet = self.parser("");
+ end
+ module:log("debug", "Returning %d packets", #packets);
+ return packets;
+end
+
+local function new_stream()
+ local stream = setmetatable({}, stream_mt);
+ stream.parser = new_parser(stream);
+ return stream;
+end
+
+local function serialize_packet(packet)
+ local type_num = 0;
+ for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now.
+ if v == packet.type then
+ type_num = i;
+ break;
+ end
+ end
+ local header = string.char(bit.lshift(type_num, 4));
+
+ if packet.type == "publish" then
+ local topic = packet.topic or "";
+ packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
+ elseif packet.type == "suback" then
+ local t = {};
+ for _, topic in ipairs(packet.topics) do
+ table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
+ end
+ packet.data = table.concat(t);
+ end
+
+ -- Get length
+ local length = #(packet.data or "");
+ repeat
+ local digit = length%128;
+ length = math.floor(length/128);
+ if length > 0 then
+ digit = bit.bor(digit, 0x80);
+ end
+ header = header..string.char(digit); -- FIXME: ...
+ until length <= 0;
+
+ return header..(packet.data or "");
+end
+
+return {
+ new_stream = new_stream;
+ serialize_packet = serialize_packet;
+};