57 end |
57 end |
58 handler(session, packet); |
58 handler(session, packet); |
59 end |
59 end |
60 |
60 |
61 function packet_handlers.connect(session, packet) |
61 function packet_handlers.connect(session, packet) |
|
62 module:log("info", "MQTT client connected (sending connack)"); |
|
63 module:log("debug", "MQTT version: %02x", packet.version); |
|
64 if packet.version ~= 0x04 then -- Version mismatch |
|
65 session.conn:write(mqtt.serialize_packet{ |
|
66 type = "connack"; |
|
67 data = string.char(0x00, 0x01); |
|
68 }); |
|
69 return; |
|
70 end |
62 session.conn:write(mqtt.serialize_packet{ |
71 session.conn:write(mqtt.serialize_packet{ |
63 type = "connack"; |
72 type = "connack"; |
64 data = string.char(0x00, 0x00); |
73 data = string.char(0x00, 0x00); |
65 }); |
74 }); |
66 end |
75 end |
94 module:log("warn", "Error publishing MQTT data: %s", tostring(err)); |
103 module:log("warn", "Error publishing MQTT data: %s", tostring(err)); |
95 end |
104 end |
96 end |
105 end |
97 |
106 |
98 function packet_handlers.subscribe(session, packet) |
107 function packet_handlers.subscribe(session, packet) |
99 for _, topic in ipairs(packet.topics) do |
108 local results = {}; |
|
109 for i, topic in ipairs(packet.topics) do |
100 module:log("info", "SUBSCRIBE to %s", topic); |
110 module:log("info", "SUBSCRIBE to %s", topic); |
101 local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$"); |
111 local host, payload_type, node = topic:match("^([^/]+)/([^/]+)/(.+)$"); |
102 if not host then |
112 if not host then |
103 module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); |
113 module:log("warn", "Invalid topic format - expected: HOST/TYPE/NODE"); |
104 return; |
114 results[i] = 0x80; -- Failure |
105 end |
115 else |
106 local pubsub = pubsub_subscribers[host]; |
116 local pubsub = pubsub_subscribers[host]; |
107 if not pubsub then |
117 if not pubsub then |
108 module:log("warn", "Unable to locate host/node: %s", topic); |
118 module:log("warn", "Unable to locate host/node: %s", topic); |
109 return; |
119 results[i] = 0x80; -- Failure |
110 end |
120 else |
111 local node_subs = pubsub[node]; |
121 local node_subs = pubsub[node]; |
112 if not node_subs then |
122 if not node_subs then |
113 node_subs = {}; |
123 node_subs = {}; |
114 pubsub[node] = node_subs; |
124 pubsub[node] = node_subs; |
115 end |
125 end |
116 session.subscriptions[topic] = payload_type; |
126 session.subscriptions[topic] = payload_type; |
117 node_subs[session] = payload_type; |
127 node_subs[session] = payload_type; |
118 end |
128 module:log("debug", "Successfully subscribed to %s", topic); |
119 |
129 results[i] = 0x00; -- Success |
|
130 end |
|
131 end |
|
132 end |
|
133 local ack = mqtt.serialize_packet{ type = "suback", id = packet.id, results = results }; |
|
134 session.conn:write(ack); |
120 end |
135 end |
121 |
136 |
122 function packet_handlers.pingreq(session, packet) |
137 function packet_handlers.pingreq(session, packet) |
123 session.conn:write(mqtt.serialize_packet{type = "pingresp"}); |
138 session.conn:write(mqtt.serialize_packet{type = "pingresp"}); |
124 end |
139 end |
189 type = "publish"; |
204 type = "publish"; |
190 id = "\000\000"; |
205 id = "\000\000"; |
191 topic = module.host.."/"..payload_type.."/"..event.node; |
206 topic = module.host.."/"..payload_type.."/"..event.node; |
192 data = data_translators[payload_type].from_item(event.item) or ""; |
207 data = data_translators[payload_type].from_item(event.item) or ""; |
193 }; |
208 }; |
194 rawset(self, packet); |
209 rawset(self, payload_type, packet); |
195 return packet; |
210 return packet; |
196 end; |
211 end; |
197 }); |
212 }); |
198 -- Broadcast to subscribers |
213 -- Broadcast to subscribers |
199 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node); |
214 module:log("debug", "Broadcasting PUBLISH to subscribers of %s/*/%s", module.host, event.node); |