mod_smacks/mod_smacks.lua
changeset 1593 3e4d15ae2133
parent 1539 05fa54404012
--- a/mod_smacks/mod_smacks.lua	Tue Jan 20 11:02:14 2015 +0000
+++ b/mod_smacks/mod_smacks.lua	Sun Jan 25 13:04:02 2015 +0100
@@ -24,7 +24,7 @@
 local sessionmanager = require"core.sessionmanager";
 
 local c2s_sessions = module:shared("/*/c2s/sessions");
-local session_registry = {};
+local session_registry = module:shared("sessions");
 
 local function can_do_smacks(session, advertise_only)
 	if session.smacks then return false, "unexpected-request", "Stream management is already enabled"; end
@@ -58,7 +58,7 @@
 		end);
 
 local function outgoing_stanza_filter(stanza, session)
-	local is_stanza = stanza.attr and not stanza.attr.xmlns;
+	local is_stanza = stanza.attr and not stanza.attr.xmlns and not stanza.name:find":";
 	if is_stanza and not stanza._cached then -- Stanza in default stream namespace
 		local queue = session.outgoing_stanza_queue;
 		local cached_stanza = st.clone(stanza);
@@ -151,22 +151,20 @@
 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
 
 module:hook_stanza("http://etherx.jabber.org/streams", "features",
-		function (session, stanza)
-			module:add_timer(0, function ()
-				if can_do_smacks(session) then
-					if stanza:get_child("sm", xmlns_sm3) then
-						session.sends2s(st.stanza("enable", sm3_attr));
-						session.smacks = xmlns_sm3;
-					elseif stanza:get_child("sm", xmlns_sm2) then
-						session.sends2s(st.stanza("enable", sm2_attr));
-						session.smacks = xmlns_sm2;
-					else
-						return;
-					end
-					wrap_session_out(session, false);
-				end
-			end);
-		end);
+function (session, stanza)
+	if can_do_smacks(session) then
+		if stanza:get_child("sm", xmlns_sm3) then
+			session.sends2s(st.stanza("enable", sm3_attr));
+			session.smacks = xmlns_sm3;
+		elseif stanza:get_child("sm", xmlns_sm2) then
+			session.sends2s(st.stanza("enable", sm2_attr));
+			session.smacks = xmlns_sm2;
+		else
+			return;
+		end
+		wrap_session_out(session, false);
+	end
+end);
 
 function handle_enabled(session, stanza, xmlns_sm)
 	module:log("debug", "Enabling stream management");
@@ -233,12 +231,10 @@
 	local error_attr = { type = "cancel" };
 	if #queue > 0 then
 		session.outgoing_stanza_queue = {};
+		local reply;
 		for i=1,#queue do
-			local reply = st.reply(queue[i]);
-			if reply.attr.to ~= session.full_jid then
-				reply.attr.type = "error";
-				reply:tag("error", error_attr)
-					:tag("recipient-unavailable", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"});
+			if queue[i].attr.from ~= session.full_jid then
+				queue[i], reply = nil, st.error_reply(queue[i], "recipient-unavailable");
 				core_process_stanza(session, reply);
 			end
 		end
@@ -266,7 +262,7 @@
 				-- (for example, the client may have bound a new resource and
 				-- started a new smacks session, or not be using smacks)
 				local curr_session = full_sessions[session.full_jid];
-				if false and session.destroyed then
+				if session.destroyed then
 					session.log("debug", "The session has already been destroyed");
 				elseif curr_session and curr_session.resumption_token == resumption_token
 				-- Check the hibernate time still matches what we think it is,
@@ -351,3 +347,26 @@
 end
 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
+
+local function handle_read_timeout(event)
+	local session = event.session;
+	local xmlns_sm = session.smacks;
+	if xmlns_sm then
+		session.awaiting_ack = true;
+		return (session.sends2s or session.send)(st.stanza("r", { xmlns = xmlns_sm }));
+	end
+end
+
+module:hook("s2s-read-timeout", handle_read_timeout, 10);
+module:hook("c2s-read-timeout", handle_read_timeout, 10);
+
+local function handle_s2s_destroyed(event)
+	local session = event.session;
+	local queue = session.outgoing_stanza_queue;
+	if queue and #queue > 0 then
+		session.log("warn", "Destroying session with %d unacked stanzas", #queue);
+		handle_unacked_stanzas(session);
+	end
+end;
+module:hook("s2sout-destroyed", handle_s2s_destroyed);
+module:hook("s2sin-destroyed", handle_s2s_destroyed);