mod_pubsub_feeds: Track latest timestamp seen in feeds instead of last poll
This should ensure that an entry that has a publish timestmap after the
previously oldest post, but before the time of the last poll check, is
published to the node.
Previously if an entry would be skipped if it was published at 13:00
with a timestamp of 12:30, where the last poll was at 12:45.
For feeds that lack a timestamp, it now looks for the first post that is
not published, assuming that the feed is in reverse chronological order,
then iterates back up from there.
local bit = require "bit";
local stream_mt = {};
stream_mt.__index = stream_mt;
function stream_mt:read_bytes(n_bytes)
module:log("debug", "Reading %d bytes... (buffer: %d)", n_bytes, #self.buffer);
local data = self.buffer;
if not data then
module:log("debug", "No data, pausing.");
data = coroutine.yield();
module:log("debug", "Have %d bytes of data now (want %d)", #data, n_bytes);
end
if #data >= n_bytes then
data, self.buffer = data:sub(1, n_bytes), data:sub(n_bytes+1);
elseif #data < n_bytes then
module:log("debug", "Not enough data (only %d bytes out of %d), pausing.", #data, n_bytes);
self.buffer = data..coroutine.yield();
module:log("debug", "Now we have %d bytes, reading...", #data);
return self:read_bytes(n_bytes);
end
module:log("debug", "Returning %d bytes (buffer: %d)", #data, #self.buffer);
return data;
end
function stream_mt:read_string()
local len1, len2 = self:read_bytes(2):byte(1,2);
local len = bit.lshift(len1, 8) + len2;
return self:read_bytes(len), len+2;
end
local packet_type_codes = {
"connect", "connack",
"publish", "puback", "pubrec", "pubrel", "pubcomp",
"subscribe", "subak", "unsubscribe", "unsuback",
"pingreq", "pingresp",
"disconnect"
};
function stream_mt:read_packet()
local packet = {};
local header = self:read_bytes(1):byte();
packet.type = packet_type_codes[bit.rshift(bit.band(header, 0xf0), 4)];
packet.dup = bit.band(header, 0x08) == 0x08;
packet.qos = bit.rshift(bit.band(header, 0x06), 1);
packet.retain = bit.band(header, 0x01) == 0x01;
-- Get length
local length, multiplier = 0, 1;
repeat
local digit = self:read_bytes(1):byte();
length = length + bit.band(digit, 0x7f)*multiplier;
multiplier = multiplier*128;
until bit.band(digit, 0x80) == 0;
packet.length = length;
if packet.type == "connect" then
if self:read_string() ~= "MQTT" then
module:log("warn", "Unexpected packet signature!");
packet.type = nil; -- Invalid packet
else
packet.version = self:read_bytes(1):byte();
packet.connect_flags = self:read_bytes(1):byte();
packet.keepalive_timer = self:read_bytes(1):byte();
length = length - 11;
end
elseif packet.type == "publish" then
packet.topic = self:read_string();
length = length - (#packet.topic+2);
if packet.qos == 1 or packet.qos == 2 then
packet.id = self:read_bytes(2);
length = length - 2;
end
elseif packet.type == "subscribe" then
if packet.qos == 1 or packet.qos == 2 then
packet.id = self:read_bytes(2);
length = length - 2;
end
local topics = {};
while length > 0 do
local topic, len = self:read_string();
table.insert(topics, topic);
self:read_bytes(1); -- QoS not used
length = length - (len+1);
end
packet.topics = topics;
end
if length > 0 then
packet.data = self:read_bytes(length);
end
return packet;
end
local function new_parser(self)
return coroutine.wrap(function (data)
self.buffer = data;
while true do
data = coroutine.yield(self:read_packet());
module:log("debug", "Parser: %d new bytes", #data);
self.buffer = (self.buffer or "")..data;
end
end);
end
function stream_mt:feed(data)
module:log("debug", "Feeding %d bytes", #data);
local packets = {};
local packet = self.parser(data);
while packet do
module:log("debug", "Received packet");
table.insert(packets, packet);
packet = self.parser("");
end
module:log("debug", "Returning %d packets", #packets);
return packets;
end
local function new_stream()
local stream = setmetatable({}, stream_mt);
stream.parser = new_parser(stream);
return stream;
end
local function serialize_packet(packet)
local type_num = 0;
for i, v in ipairs(packet_type_codes) do -- FIXME: I'm so tired right now.
if v == packet.type then
type_num = i;
break;
end
end
local header = string.char(bit.lshift(type_num, 4));
if packet.type == "publish" then
local topic = packet.topic or "";
packet.data = string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic..packet.data;
elseif packet.type == "suback" then
local t = {};
for _, topic in ipairs(packet.topics) do
table.insert(t, string.char(bit.band(#topic, 0xff00), bit.band(#topic, 0x00ff))..topic.."\000");
end
packet.data = table.concat(t);
end
-- Get length
local length = #(packet.data or "");
repeat
local digit = length%128;
length = math.floor(length/128);
if length > 0 then
digit = bit.bor(digit, 0x80);
end
header = header..string.char(digit); -- FIXME: ...
until length <= 0;
return header..(packet.data or "");
end
return {
new_stream = new_stream;
serialize_packet = serialize_packet;
};