plugins/mod_smacks.lua
changeset 11941 364c3f018e3a
parent 11940 3f49c35607ca
child 11942 6da703cb4c04
--- a/plugins/mod_smacks.lua	Wed Nov 24 21:27:45 2021 +0100
+++ b/plugins/mod_smacks.lua	Wed Nov 24 21:27:49 2021 +0100
@@ -145,45 +145,63 @@
 			end
 		end);
 
-local function request_ack_if_needed(session, force, reason, stanza)
+local function should_ack(session, force)
+	if not session then return end -- shouldn't be possible
+	if session.destroyed then return end -- gone
+	if not session.smacks then return end -- not using
+	if session.hibernating then return end -- can't ack when asleep
+	if session.awaiting_ack then return end -- already waiting
+	if force then return force end
 	local queue = session.outgoing_stanza_queue;
 	local expected_h = session.last_acknowledged_stanza + #queue;
 	local max_unacked = max_unacked_stanzas;
 	if session.state == "inactive" then
 		max_unacked = max_inactive_unacked_stanzas;
 	end
-	if session.awaiting_ack == nil and not session.hibernating then
-		-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
-		-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
-		-- further requests until a higher h-value would be expected.
-		if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
-			session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
-			session.awaiting_ack = false;
-			session.awaiting_ack_timer = timer.add_task(1e-06, function ()
-				-- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
-				-- only request ack if needed and our session is not already hibernated or destroyed
-				if not session.awaiting_ack and not session.hibernating and not session.destroyed then
-					session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
-					(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
-					if session.destroyed then return end -- sending something can trigger destruction
-					session.awaiting_ack = true;
-					-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
-					session.last_requested_h = session.last_acknowledged_stanza + #queue;
-					session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
-					if not session.delayed_ack_timer then
-						session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
-							ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
-						end);
-					end
-				end
-			end);
-		end
+	-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
+	-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
+	-- further requests until a higher h-value would be expected.
+	return #queue > max_unacked and expected_h ~= session.last_requested_h;
+end
+
+local function request_ack(session, reason)
+	local queue = session.outgoing_stanza_queue;
+	session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, #queue);
+	(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
+	if session.destroyed then return end -- sending something can trigger destruction
+	session.awaiting_ack = true;
+	-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
+	session.last_requested_h = session.last_acknowledged_stanza + #queue;
+	session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
+	if not session.delayed_ack_timer then
+		session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
+			ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
+		end);
+	end
+end
+
+local function request_ack_now_if_needed(session, force, reason)
+	if should_ack(session, force) then
+		request_ack(session, reason);
+	end
+end
+
+local function request_ack_if_needed(session, force, reason, stanza)
+	if should_ack(session, force) then
+		timer.add_task(0, function ()
+			request_ack_now_if_needed(session, force, reason, stanza);
+		end);
 	end
 
 	-- Trigger "smacks-ack-delayed"-event if we added new (ackable) stanzas to the outgoing queue
 	-- and there isn't already a timer for this event running.
 	-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
 	-- would not trigger this event (again).
+	local queue = session.outgoing_stanza_queue;
+	local max_unacked = max_unacked_stanzas;
+	if session.state == "inactive" then
+		max_unacked = max_inactive_unacked_stanzas;
+	end
 	if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
 		session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
 		ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
@@ -338,10 +356,7 @@
 	-- Reply with <a>
 	(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = string.format("%d", origin.handled_stanza_count) }));
 	-- piggyback our own ack request if needed (see request_ack_if_needed() for explanation of last_requested_h)
-	local expected_h = origin.last_acknowledged_stanza + #origin.outgoing_stanza_queue;
-	if #origin.outgoing_stanza_queue > 0 and expected_h ~= origin.last_requested_h then
-		request_ack_if_needed(origin, true, "piggybacked by handle_r", nil);
-	end
+	request_ack_now_if_needed(origin, false, "piggybacked by handle_r", nil);
 	return true;
 end
 module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
@@ -385,7 +400,7 @@
 
 	origin.log("debug", "#queue = %d", #queue);
 	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
-	request_ack_if_needed(origin, false, "handle_a", nil)
+	request_ack_now_if_needed(origin, false, "handle_a", nil)
 	return true;
 end
 module:hook_tag(xmlns_sm2, "a", handle_a);
@@ -632,22 +647,20 @@
 module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
 module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
 
-module:hook("csi-client-active", function (event)
-	if event.origin.smacks then
-		request_ack_if_needed(event.origin, true, "csi-active", nil);
-	end
-end);
+-- Events when it's sensible to request an ack
+-- Could experiment with forcing (ignoring max_unacked) <r>, but when and why?
+local request_ack_events = {
+	["csi-client-active"] = true;
+	["csi-flushing"] = false;
+};
 
-module:hook("csi-flushing", function(event)
-	local session = event.session;
-	if session.smacks then
-		if not session.awaiting_ack and not session.hibernating and not session.destroyed then
-			session.log("debug", "Sending <r> (csi-flushing)");
-			session.awaiting_ack = true; -- The send() call may invoke this event again, so set this first
-			(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
-		end
-	end
-end);
+for event_name, force in pairs(request_ack_events) do
+	module:log("info", "module:hook(%q, function)");
+	module:hook(event_name, function(event)
+		local session = event.session or event.origin;
+		request_ack_now_if_needed(session, force, event_name);
+	end);
+end
 
 local function handle_read_timeout(event)
 	local session = event.session;
@@ -663,14 +676,7 @@
 			end
 			return false; -- Kick the session
 		end
-		session.log("debug", "Sending <r> (read timeout)");
-		(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
-		session.awaiting_ack = true;
-		if not session.delayed_ack_timer then
-			session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
-				ack_delayed(session, nil);
-			end);
-		end
+		request_ack_now_if_needed(session, true, "read timeout");
 		return true;
 	end
 end