mod_pubsub_mqtt/mod_pubsub_mqtt.lua
changeset 5837 58df53eefa28
parent 5836 5afc8273c5ef
equal deleted inserted replaced
5836:5afc8273c5ef 5837:58df53eefa28
    57 	end
    57 	end
    58 	handler(session, packet);
    58 	handler(session, packet);
    59 end
    59 end
    60 
    60 
    61 function packet_handlers.connect(session, packet)
    61 function packet_handlers.connect(session, packet)
       
    62 	module:log("info", "MQTT client connected (sending connack)");
       
    63 	module:log("debug", "MQTT version: %02x", packet.version);
       
    64 	if packet.version ~= 0x04 then -- Version mismatch
       
    65 		session.conn:write(mqtt.serialize_packet{
       
    66 			type = "connack";
       
    67 			data = string.char(0x00, 0x01);
       
    68 		});
       
    69 		return;
       
    70 	end
    62 	session.conn:write(mqtt.serialize_packet{
    71 	session.conn:write(mqtt.serialize_packet{
    63 		type = "connack";
    72 		type = "connack";
    64 		data = string.char(0x00, 0x00);
    73 		data = string.char(0x00, 0x00);
    65 	});
    74 	});
    66 end
    75 end
    94 		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
   103 		module:log("warn", "Error publishing MQTT data: %s", tostring(err));
    95 	end
   104 	end
    96 end
   105 end
    97 
   106 
    98 function packet_handlers.subscribe(session, packet)
   107 function packet_handlers.subscribe(session, packet)
    99 	for _, topic in ipairs(packet.topics) do
   108 	local results = {};
       
   109 	for i, topic in ipairs(packet.topics) do
   100 		module:log("info", "SUBSCRIBE to %s", topic);
   110 		module:log("info", "SUBSCRIBE to %s", topic);
   101 		local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
   111 		local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
   102 		if not host then
   112 		if not host then
   103 			module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
   113 			module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
   104 			return;
   114 			results[i] = 0x80; -- Failure
   105 		end
   115 		else
   106 		local pubsub = pubsub_subscribers[host];
   116 			local pubsub = pubsub_subscribers[host];
   107 		if not pubsub then
   117 			if not pubsub then
   108 			module:log("warn", "Unable to locate host/node: %s", topic);
   118 				module:log("warn", "Unable to locate host/node: %s", topic);
   109 			return;
   119 				results[i] = 0x80; -- Failure
   110 		end
   120 			else
   111 		local node_subs = pubsub[node];
   121 				local node_subs = pubsub[node];
   112 		if not node_subs then
   122 				if not node_subs then
   113 			node_subs = {};
   123 					node_subs = {};
   114 			pubsub[node] = node_subs;
   124 					pubsub[node] = node_subs;
   115 		end
   125 				end
   116 		session.subscriptions[topic] = payload_type;
   126 				session.subscriptions[topic] = payload_type;
   117 		node_subs[session] = payload_type;
   127 				node_subs[session] = payload_type;
   118 	end
   128 				module:log("debug", "Successfully subscribed to %s", topic);
   119 
   129 				results[i] = 0x00; -- Success
       
   130 			end
       
   131 		end
       
   132 	end
       
   133 	local ack = mqtt.serialize_packet{ type = "suback", id = packet.id, results = results };
       
   134 	session.conn:write(ack);
   120 end
   135 end
   121 
   136 
   122 function packet_handlers.pingreq(session, packet)
   137 function packet_handlers.pingreq(session, packet)
   123 	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
   138 	session.conn:write(mqtt.serialize_packet{type = "pingresp"});
   124 end
   139 end
   189 						type = "publish";
   204 						type = "publish";
   190 						id = "\000\000";
   205 						id = "\000\000";
   191 						topic = module.host.."/"..payload_type.."/"..event.node;
   206 						topic = module.host.."/"..payload_type.."/"..event.node;
   192 						data = data_translators[payload_type].from_item(event.item) or "";
   207 						data = data_translators[payload_type].from_item(event.item) or "";
   193 					};
   208 					};
   194 					rawset(self, packet);
   209 					rawset(self, payload_type, packet);
   195 					return packet;
   210 					return packet;
   196 				end;
   211 				end;
   197 			});
   212 			});
   198 			-- Broadcast to subscribers
   213 			-- Broadcast to subscribers
   199 			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node);
   214 			module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node);