--- a/mod_smacks/mod_smacks.lua Tue Jan 20 11:02:14 2015 +0000
+++ b/mod_smacks/mod_smacks.lua Sun Jan 25 13:04:02 2015 +0100
@@ -24,7 +24,7 @@
local sessionmanager = require"core.sessionmanager";
local c2s_sessions = module:shared("/*/c2s/sessions");
-local session_registry = {};
+local session_registry = module:shared("sessions");
local function can_do_smacks(session, advertise_only)
if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end
@@ -58,7 +58,7 @@
end);
local function outgoing_stanza_filter(stanza, session)
- local is_stanza = stanza.attr and not stanza.attr.xmlns;
+ local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":";
if is_stanza and not stanza._cached then -- Stanza in default stream namespace
local queue = session.outgoing_stanza_queue;
local cached_stanza = st.clone(stanza);
@@ -151,22 +151,20 @@
module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
module:hook_stanza("http://etherx.jabber.org/streams", "features",
- function (session, stanza)
- module:add_timer(0, function ()
- if can_do_smacks(session) then
- if stanza:get_child("sm", xmlns_sm3) then
- session.sends2s(st.stanza("enable", sm3_attr));
- session.smacks = xmlns_sm3;
- elseif stanza:get_child("sm", xmlns_sm2) then
- session.sends2s(st.stanza("enable", sm2_attr));
- session.smacks = xmlns_sm2;
- else
- return;
- end
- wrap_session_out(session, false);
- end
- end);
- end);
+function (session, stanza)
+ if can_do_smacks(session) then
+ if stanza:get_child("sm", xmlns_sm3) then
+ session.sends2s(st.stanza("enable", sm3_attr));
+ session.smacks = xmlns_sm3;
+ elseif stanza:get_child("sm", xmlns_sm2) then
+ session.sends2s(st.stanza("enable", sm2_attr));
+ session.smacks = xmlns_sm2;
+ else
+ return;
+ end
+ wrap_session_out(session, false);
+ end
+end);
function handle_enabled(session, stanza, xmlns_sm)
module:log("debug", "Enabling stream management");
@@ -233,12 +231,10 @@
local error_attr = { type = "cancel" };
if #queue > 0 then
session.outgoing_stanza_queue = {};
+ local reply;
for i=1,#queue do
- local reply = st.reply(queue[i]);
- if reply.attr.to ~= session.full_jid then
- reply.attr.type = "error";
- reply:tag("error", error_attr)
- :tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"});
+ if queue[i].attr.from ~= session.full_jid then
+ queue[i], reply = nil, st.error_reply(queue[i], "recipient-unavailable");
core_process_stanza(session, reply);
end
end
@@ -266,7 +262,7 @@
-- (for example, the client may have bound a new resource and
-- started a new smacks session, or not be using smacks)
local curr_session = full_sessions[session.full_jid];
- if false and session.destroyed then
+ if session.destroyed then
session.log("debug", "The session has already been destroyed");
elseif curr_session and curr_session.resumption_token == resumption_token
-- Check the hibernate time still matches what we think it is,
@@ -351,3 +347,26 @@
end
module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
+
+local function handle_read_timeout(event)
+ local session = event.session;
+ local xmlns_sm = session.smacks;
+ if xmlns_sm then
+ session.awaiting_ack = true;
+ return (session.sends2s or session.send)(st.stanza("r", { xmlns = xmlns_sm }));
+ end
+end
+
+module:hook("s2s-read-timeout", handle_read_timeout, 10);
+module:hook("c2s-read-timeout", handle_read_timeout, 10);
+
+local function handle_s2s_destroyed(event)
+ local session = event.session;
+ local queue = session.outgoing_stanza_queue;
+ if queue and #queue > 0 then
+ session.log("warn", "Destroying session with %d unacked stanzas", #queue);
+ handle_unacked_stanzas(session);
+ end
+end;
+module:hook("s2sout-destroyed", handle_s2s_destroyed);
+module:hook("s2sin-destroyed", handle_s2s_destroyed);