--- a/mod_smacks/mod_smacks.lua Thu Jul 22 19:14:19 2010 +0100
+++ b/mod_smacks/mod_smacks.lua Sat Jul 24 11:07:38 2010 +0100
@@ -1,6 +1,7 @@
local st = require "util.stanza";
local t_insert, t_remove = table.insert, table.remove;
+local math_min = math.min;
local tonumber, tostring = tonumber, tostring;
local add_filter = require "util.filters".add_filter;
@@ -24,20 +25,19 @@
function (session, stanza)
module:log("debug", "Enabling stream management");
session.smacks = true;
- session.handled_stanza_count = 0;
+
-- Overwrite process_stanza() and send()
- local queue, queue_length = {}, 0;
- session.outgoing_stanza_queue, session.outgoing_stanza_count = queue, queue_length;
+ local queue = {};
+ session.outgoing_stanza_queue = queue;
+ session.last_acknowledged_stanza = 0;
local _send = session.send;
function session.send(stanza)
local attr = stanza.attr;
if attr and not attr.xmlns then -- Stanza in default stream namespace
- queue_length = queue_length + 1;
- session.outgoing_stanza_count = queue_length;
- queue[queue_length] = st.reply(stanza);
+ queue[#queue+1] = st.reply(stanza);
end
local ok, err = _send(stanza);
- if ok and queue_length > max_unacked_stanzas and not session.awaiting_ack then
+ if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
session.awaiting_ack = true;
return _send(st.stanza("r", { xmlns = xmlns_sm }));
end
@@ -74,10 +74,19 @@
if not origin.smacks then return; end
origin.awaiting_ack = nil;
-- Remove handled stanzas from outgoing_stanza_queue
- local handled_stanza_count = tonumber(stanza.attr.h)+1;
- for i=1,handled_stanza_count do
+ local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
+ local queue = origin.outgoing_stanza_queue;
+ if handled_stanza_count > #queue then
+ module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
+ handled_stanza_count, #queue);
+ for i=1,#queue do
+ module:log("debug", "Q item %d: %s", i, tostring(queue[i]));
+ end
+ end
+ for i=1,math_min(handled_stanza_count,#queue) do
t_remove(origin.outgoing_stanza_queue, 1);
end
+ origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
return true;
end);