mod_pubsub_mqtt/mod_pubsub_mqtt.lua
changeset 5837 58df53eefa28
parent 5836 5afc8273c5ef
--- a/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Tue Jan 30 14:26:14 2024 +0000
+++ b/mod_pubsub_mqtt/mod_pubsub_mqtt.lua	Wed Feb 07 11:57:30 2024 +0000
@@ -59,6 +59,15 @@
 end
 
 function packet_handlers.connect(session, packet)
+	module:log("info", "MQTT client connected (sending connack)");
+	module:log("debug", "MQTT version: %02x", packet.version);
+	if packet.version ~= 0x04 then -- Version mismatch
+		session.conn:write(mqtt.serialize_packet{
+			type = "connack";
+			data = string.char(0x00, 0x01);
+		});
+		return;
+	end
 	session.conn:write(mqtt.serialize_packet{
 		type = "connack";
 		data = string.char(0x00, 0x00);
@@ -96,27 +105,33 @@
 end
 
 function packet_handlers.subscribe(session, packet)
-	for _, topic in ipairs(packet.topics) do
+	local results = {};
+	for i, topic in ipairs(packet.topics) do
 		module:log("info", "SUBSCRIBE to %s", topic);
 		local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$");
 		if not host then
 			module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE");
-			return;
-		end
-		local pubsub = pubsub_subscribers[host];
-		if not pubsub then
-			module:log("warn", "Unable to locate host/node: %s", topic);
-			return;
+			results[i] = 0x80; -- Failure
+		else
+			local pubsub = pubsub_subscribers[host];
+			if not pubsub then
+				module:log("warn", "Unable to locate host/node: %s", topic);
+				results[i] = 0x80; -- Failure
+			else
+				local node_subs = pubsub[node];
+				if not node_subs then
+					node_subs = {};
+					pubsub[node] = node_subs;
+				end
+				session.subscriptions[topic] = payload_type;
+				node_subs[session] = payload_type;
+				module:log("debug", "Successfully subscribed to %s", topic);
+				results[i] = 0x00; -- Success
+			end
 		end
-		local node_subs = pubsub[node];
-		if not node_subs then
-			node_subs = {};
-			pubsub[node] = node_subs;
-		end
-		session.subscriptions[topic] = payload_type;
-		node_subs[session] = payload_type;
 	end
-
+	local ack = mqtt.serialize_packet{ type = "suback", id = packet.id, results = results };
+	session.conn:write(ack);
 end
 
 function packet_handlers.pingreq(session, packet)
@@ -191,7 +206,7 @@
 						topic = module.host.."/"..payload_type.."/"..event.node;
 						data = data_translators[payload_type].from_item(event.item) or "";
 					};
-					rawset(self, packet);
+					rawset(self, payload_type, packet);
 					return packet;
 				end;
 			});