diff -r 5afc8273c5ef -r 58df53eefa28 mod_pubsub_mqtt/mqtt.lib.lua --- a/mod_pubsub_mqtt/mqtt.lib.lua Tue Jan 30 14:26:14 2024 +0000 +++ b/mod_pubsub_mqtt/mqtt.lib.lua Wed Feb 07 11:57:30 2024 +0000 @@ -1,4 +1,4 @@ -local bit = require "bit"; +local bit = require "util.bitcompat"; local stream_mt = {}; stream_mt.__index = stream_mt; @@ -29,10 +29,25 @@ return self:read_bytes(len), len+2; end +function stream_mt:read_word() + local len1, len2 = self:read_bytes(2):byte(1,2); + local result = bit.lshift(len1, 8) + len2; + module:log("debug", "read_word(%02x, %02x) = %04x (%d)", len1, len2, result, result); + return result; +end + +local function hasbit(byte, n_bit) + return bit.band(byte, 2^n_bit) ~= 0; +end + +local function encode_string(str) + return string.char(bit.band(#str, 0xff00), bit.band(#str, 0x00ff))..str; +end + local packet_type_codes = { "connect", "connack", "publish", "puback", "pubrec", "pubrel", "pubcomp", - "subscribe", "subak", "unsubscribe", "unsuback", + "subscribe", "suback", "unsubscribe", "unsuback", "pingreq", "pingresp", "disconnect" }; @@ -59,9 +74,46 @@ packet.type = nil; -- Invalid packet else packet.version = self:read_bytes(1):byte(); - packet.connect_flags = self:read_bytes(1):byte(); - packet.keepalive_timer = self:read_bytes(1):byte(); + module:log("debug", "ver: %02x", packet.version); + if packet.version ~= 0x04 then + module:log("warn", "MQTT version mismatch (got %02x, we support %02x", packet.version, 0x04); + end + local flags = self:read_bytes(1):byte(); + module:log("debug", "flags: %02x", flags); + packet.keepalive_timer = self:read_bytes(2):byte(); + module:log("debug", "keepalive: %d", packet.keepalive_timer); + packet.connect_flags = {}; length = length - 11; + packet.connect_flags = { + clean_session = hasbit(flags, 1); + will = hasbit(flags, 2); + will_qos = bit.band(bit.rshift(flags, 2), 0x02); + will_retain = hasbit(flags, 5); + user_name = hasbit(flags, 7); + password = hasbit(flags, 6); + }; + module:log("debug", "%s", require "util.serialization".serialize(packet.connect_flags, "debug")); + module:log("debug", "Reading client_id..."); + packet.client_id = self:read_string(); + if packet.connect_flags.will then + module:log("debug", "Reading will..."); + packet.will = { + topic = self:read_string(); + message = self:read_string(); + qos = packet.connect_flags.will_qos; + retain = packet.connect_flags.will_retain; + }; + end + if packet.connect_flags.user_name then + module:log("debug", "Reading username..."); + packet.username = self:read_string(); + end + if packet.connect_flags.password then + module:log("debug", "Reading password..."); + packet.password = self:read_string(); + end + module:log("debug", "Done parsing connect!"); + length = 0; -- No payload left end elseif packet.type == "publish" then packet.topic = self:read_string(); @@ -87,6 +139,7 @@ if length > 0 then packet.data = self:read_bytes(length); end + module:log("debug", "MQTT packet complete!"); return packet; end @@ -102,7 +155,6 @@ end function stream_mt:feed(data) - module:log("debug", "Feeding %d bytes", #data); local packets = {}; local packet = self.parser(data); while packet do @@ -135,10 +187,10 @@ packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data; elseif packet.type == "suback" then local t = {}; - for _, topic in ipairs(packet.topics) do - table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000"); + for i, result_code in ipairs(packet.results) do + table.insert(t, string.char(result_code)); end - packet.data = table.concat(t); + packet.data = packet.id..table.concat(t); end -- Get length