plugins/mod_smacks.lua
changeset 11939 4d0d10fabb82
parent 11938 65cdb1b21db3
child 11940 3f49c35607ca
--- a/plugins/mod_smacks.lua	Tue Nov 16 21:15:22 2021 +0100
+++ b/plugins/mod_smacks.lua	Wed Nov 24 21:27:49 2021 +0100
@@ -12,8 +12,7 @@
 --
 
 local st = require "util.stanza";
-local dep = require "util.dependencies";
-local cache = dep.softreq("util.cache");	-- only available in prosody 0.10+
+local cache = require "util.cache";
 local uuid_generate = require "util.uuid".generate;
 local jid = require "util.jid";
 
@@ -44,7 +43,7 @@
 local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10);
 local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10);
 local core_process_stanza = prosody.core_process_stanza;
-local sessionmanager = require"core.sessionmanager";
+local sessionmanager = require "core.sessionmanager";
 
 assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0");
 assert(max_old_sessions > 0, "smacks_max_old_sessions must be greater than 0");
@@ -92,7 +91,7 @@
 end
 local old_session_registry = init_session_cache(max_old_sessions, nil);
 local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session)
-	if session.destroyed then return true; end		-- destroyed session can always be removed from cache
+	if session.destroyed then return true; end -- destroyed session can always be removed from cache
 	session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token);
 	-- store old session's h values on force delete
 	-- save only actual h value and username/host (for security)
@@ -101,23 +100,10 @@
 		username = session.username,
 		host = session.host
 	});
-	return true;	-- allow session to be removed from full cache to make room for new one
+	return true; -- allow session to be removed from full cache to make room for new one
 end);
 
-local function stoppable_timer(delay, callback)
-	local stopped = false;
-	local timer = module:add_timer(delay, function (t)
-		if stopped then return; end
-		return callback(t);
-	end);
-	if timer and timer.stop then return timer; end		-- new prosody api includes stop() function
-	return {
-		stop = function(self) stopped = true end;
-		timer;
-	};
-end
-
-local function delayed_ack_function(session, stanza)
+local function ack_delayed(session, stanza)
 	-- fire event only if configured to do so and our session is not already hibernated or destroyed
 	if delayed_ack_timeout > 0 and session.awaiting_ack
 	and not session.hibernating and not session.destroyed then
@@ -162,20 +148,18 @@
 local function request_ack_if_needed(session, force, reason, stanza)
 	local queue = session.outgoing_stanza_queue;
 	local expected_h = session.last_acknowledged_stanza + #queue;
-	-- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
 	local max_unacked = max_unacked_stanzas;
-	if session.state == "inactive"  then
+	if session.state == "inactive" then
 		max_unacked = max_inactive_unacked_stanzas;
 	end
 	if session.awaiting_ack == nil and not session.hibernating then
 		-- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong
 		-- stanza counts. it is set when an <r> is really sent (e.g. inside timer), preventing any
 		-- further requests until a higher h-value would be expected.
-		-- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h));
 		if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then
 			session.log("debug", "Queuing <r> (in a moment) from %s - #queue=%d", reason, #queue);
 			session.awaiting_ack = false;
-			session.awaiting_ack_timer = stoppable_timer(1e-06, function ()
+			session.awaiting_ack_timer = module:add_timer(1e-06, function ()
 				-- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating));
 				-- only request ack if needed and our session is not already hibernated or destroyed
 				if not session.awaiting_ack and not session.hibernating and not session.destroyed then
@@ -187,8 +171,8 @@
 					session.last_requested_h = session.last_acknowledged_stanza + #queue;
 					session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, #queue);
 					if not session.delayed_ack_timer then
-						session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
-							delayed_ack_function(session, nil);		-- we don't know if this is the only new stanza in the queue
+						session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
+							ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
 						end);
 					end
 				end
@@ -201,8 +185,8 @@
 	-- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event
 	-- would not trigger this event (again).
 	if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then
-		session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)");
-		delayed_ack_function(session, stanza);		-- this is the only new stanza in the queue --> provide it to other modules
+		session.log("debug", "Calling ack_delayed directly (still waiting for ack)");
+		ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules
 	end
 end
 
@@ -309,12 +293,12 @@
 	(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) }));
 	return true;
 end
-module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
-module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
+module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
+module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
 
-module:hook_stanza("http://etherx.jabber.org/streams", "features",
+module:hook_tag("http://etherx.jabber.org/streams", "features",
 		function (session, stanza)
-			stoppable_timer(1e-6, function ()
+			module:add_timer(1e-6, function ()
 				if can_do_smacks(session) then
 					if stanza:get_child("sm", xmlns_sm3) then
 						session.sends2s(st.stanza("enable", sm3_attr));
@@ -330,7 +314,7 @@
 			end);
 		end);
 
-function handle_enabled(session, stanza, xmlns_sm)
+function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
 	module:log("debug", "Enabling stream management");
 	session.smacks = xmlns_sm;
 
@@ -340,10 +324,10 @@
 
 	return true;
 end
-module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
-module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
+module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
+module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
 
-function handle_r(origin, stanza, xmlns_sm)
+function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza
 	if not origin.smacks then
 		module:log("debug", "Received ack request from non-smack-enabled session");
 		return;
@@ -358,8 +342,8 @@
 	end
 	return true;
 end
-module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
-module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
+module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
+module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
 
 function handle_a(origin, stanza)
 	if not origin.smacks then return; end
@@ -387,11 +371,11 @@
 		for i=1,#queue do
 			origin.log("debug", "Q item %d: %s", i, tostring(queue[i]));
 		end
-        origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; };
-        return;
+		origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; };
+		return;
 	end
 
-	for i=1,math_min(handled_stanza_count,#queue) do
+	for _=1,math_min(handled_stanza_count,#queue) do
 		local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1);
 		module:fire_event("delivery/success", { session = origin, stanza = handled_stanza });
 	end
@@ -401,8 +385,8 @@
 	request_ack_if_needed(origin, false, "handle_a", nil)
 	return true;
 end
-module:hook_stanza(xmlns_sm2, "a", handle_a);
-module:hook_stanza(xmlns_sm3, "a", handle_a);
+module:hook_tag(xmlns_sm2, "a", handle_a);
+module:hook_tag(xmlns_sm3, "a", handle_a);
 
 --TODO: Optimise... incoming stanzas should be handled by a per-session
 -- function that has a counter as an upvalue (no table indexing for increments,
@@ -450,14 +434,14 @@
 			-- don't store messages in offline store if they are mam results
 			local mam_result = stanza:get_child("result", xmlns_mam2);
 			if mam_result ~= nil then
-				return true;		-- stanza already "handled", don't send an error and don't add it to offline storage
+				return true; -- stanza already "handled", don't send an error and don't add it to offline storage
 			end
 			-- do nothing here for normal messages and don't send out "message delivery errors",
 			-- because messages are already in MAM at this point (no need to frighten users)
 			local stanza_id = get_stanza_id(stanza, jid.bare(session.full_jid));
 			if session.mam_requested and stanza_id ~= nil then
 				session.log("debug", "mod_smacks delivery/failure returning true for mam-handled stanza: mam-archive-id=%s", tostring(stanza_id));
-				return true;		-- stanza handled, don't send an error
+				return true; -- stanza handled, don't send an error
 			end
 			-- store message in offline store, if this client does not use mam *and* was the last client online
 			local sessions = prosody.hosts[module.host].sessions[session.username] and
@@ -465,14 +449,14 @@
 			if sessions and next(sessions) == session.resource and next(sessions, session.resource) == nil then
 				local ok = module:fire_event("message/offline/handle", { origin = session, username = session.username, stanza = stanza });
 				session.log("debug", "mod_smacks delivery/failuere returning %s for offline-handled stanza", tostring(ok));
-				return ok;			-- if stanza was handled, don't send an error
+				return ok; -- if stanza was handled, don't send an error
 			end
 		end
 	end
 end);
 
 module:hook("pre-resource-unbind", function (event)
-	local session, err = event.session, event.error;
+	local session = event.session;
 	if session.smacks then
 		if not session.resumption_token then
 			local queue = session.outgoing_stanza_queue;
@@ -492,7 +476,7 @@
 				-- matches the smacks session this timer is for in case it changed
 				-- (for example, the client may have bound a new resource and
 				-- started a new smacks session, or not be using smacks)
-				local curr_session = full_sessions[session.full_jid];
+				local curr_session = prosody.full_sessions[session.full_jid];
 				if session.destroyed then
 					session.log("debug", "The session has already been destroyed");
 				elseif curr_session and curr_session.resumption_token == resumption_token
@@ -509,8 +493,9 @@
 						return resume_timeout;
 					end
 					if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then
-						session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout-(current_time-timeout_start));
-						return resume_timeout-(current_time-timeout_start);		-- time left to wait
+						session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds",
+							resume_timeout - (current_time - timeout_start));
+						return resume_timeout-(current_time-timeout_start); -- time left to wait
 					end
 					session.log("debug", "Destroying session for hibernating too long");
 					session_registry.set(session.username, session.resumption_token, nil);
@@ -626,7 +611,7 @@
 			session.send(queue[i]);
 		end
 		session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue);
-		function session.send(stanza)
+		function session.send(stanza) -- luacheck: ignore 432
 			migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza));
 			return false;
 		end
@@ -641,8 +626,8 @@
 	end
 	return true;
 end
-module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
-module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
+module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
+module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);
 
 module:hook("csi-client-active", function (event)
 	if event.origin.smacks then
@@ -678,8 +663,8 @@
 		(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }));
 		session.awaiting_ack = true;
 		if not session.delayed_ack_timer then
-			session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function()
-				delayed_ack_function(session, nil);
+			session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function()
+				ack_delayed(session, nil);
 			end);
 		end
 		return true;