diff -r 65cdb1b21db3 -r 4d0d10fabb82 plugins/mod_smacks.lua --- a/plugins/mod_smacks.lua Tue Nov 16 21:15:22 2021 +0100 +++ b/plugins/mod_smacks.lua Wed Nov 24 21:27:49 2021 +0100 @@ -12,8 +12,7 @@ -- local st = require "util.stanza"; -local dep = require "util.dependencies"; -local cache = dep.softreq("util.cache"); -- only available in prosody 0.10+ +local cache = require "util.cache"; local uuid_generate = require "util.uuid".generate; local jid = require "util.jid"; @@ -44,7 +43,7 @@ local max_hibernated_sessions = module:get_option_number("smacks_max_hibernated_sessions", 10); local max_old_sessions = module:get_option_number("smacks_max_old_sessions", 10); local core_process_stanza = prosody.core_process_stanza; -local sessionmanager = require"core.sessionmanager"; +local sessionmanager = require "core.sessionmanager"; assert(max_hibernated_sessions > 0, "smacks_max_hibernated_sessions must be greater than 0"); assert(max_old_sessions > 0, "smacks_max_old_sessions must be greater than 0"); @@ -92,7 +91,7 @@ end local old_session_registry = init_session_cache(max_old_sessions, nil); local session_registry = init_session_cache(max_hibernated_sessions, function(resumption_token, session) - if session.destroyed then return true; end -- destroyed session can always be removed from cache + if session.destroyed then return true; end -- destroyed session can always be removed from cache session.log("warn", "User has too much hibernated sessions, removing oldest session (token: %s)", resumption_token); -- store old session's h values on force delete -- save only actual h value and username/host (for security) @@ -101,23 +100,10 @@ username = session.username, host = session.host }); - return true; -- allow session to be removed from full cache to make room for new one + return true; -- allow session to be removed from full cache to make room for new one end); -local function stoppable_timer(delay, callback) - local stopped = false; - local timer = module:add_timer(delay, function (t) - if stopped then return; end - return callback(t); - end); - if timer and timer.stop then return timer; end -- new prosody api includes stop() function - return { - stop = function(self) stopped = true end; - timer; - }; -end - -local function delayed_ack_function(session, stanza) +local function ack_delayed(session, stanza) -- fire event only if configured to do so and our session is not already hibernated or destroyed if delayed_ack_timeout > 0 and session.awaiting_ack and not session.hibernating and not session.destroyed then @@ -162,20 +148,18 @@ local function request_ack_if_needed(session, force, reason, stanza) local queue = session.outgoing_stanza_queue; local expected_h = session.last_acknowledged_stanza + #queue; - -- session.log("debug", "*** SMACKS(1) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); local max_unacked = max_unacked_stanzas; - if session.state == "inactive" then + if session.state == "inactive" then max_unacked = max_inactive_unacked_stanzas; end if session.awaiting_ack == nil and not session.hibernating then -- this check of last_requested_h prevents ack-loops if missbehaving clients report wrong -- stanza counts. it is set when an is really sent (e.g. inside timer), preventing any -- further requests until a higher h-value would be expected. - -- session.log("debug", "*** SMACKS(2) ***: #queue=%s, max_unacked_stanzas=%s, expected_h=%s, last_requested_h=%s", tostring(#queue), tostring(max_unacked_stanzas), tostring(expected_h), tostring(session.last_requested_h)); if (#queue > max_unacked and expected_h ~= session.last_requested_h) or force then session.log("debug", "Queuing (in a moment) from %s - #queue=%d", reason, #queue); session.awaiting_ack = false; - session.awaiting_ack_timer = stoppable_timer(1e-06, function () + session.awaiting_ack_timer = module:add_timer(1e-06, function () -- session.log("debug", "*** SMACKS(3) ***: awaiting_ack=%s, hibernating=%s", tostring(session.awaiting_ack), tostring(session.hibernating)); -- only request ack if needed and our session is not already hibernated or destroyed if not session.awaiting_ack and not session.hibernating and not session.destroyed then @@ -187,8 +171,8 @@ session.last_requested_h = session.last_acknowledged_stanza + #queue; session.log("debug", "Sending (inside timer, after send) from %s - #queue=%d", reason, #queue); if not session.delayed_ack_timer then - session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() - delayed_ack_function(session, nil); -- we don't know if this is the only new stanza in the queue + session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() + ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue end); end end @@ -201,8 +185,8 @@ -- If we wouldn't do this, stanzas added to the queue after the first "smacks-ack-delayed"-event -- would not trigger this event (again). if #queue > max_unacked and session.awaiting_ack and session.delayed_ack_timer == nil then - session.log("debug", "Calling delayed_ack_function directly (still waiting for ack)"); - delayed_ack_function(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules + session.log("debug", "Calling ack_delayed directly (still waiting for ack)"); + ack_delayed(session, stanza); -- this is the only new stanza in the queue --> provide it to other modules end end @@ -309,12 +293,12 @@ (session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume, max = tostring(resume_timeout) })); return true; end -module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); -module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); +module:hook_tag(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100); +module:hook_tag(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100); -module:hook_stanza("http://etherx.jabber.org/streams", "features", +module:hook_tag("http://etherx.jabber.org/streams", "features", function (session, stanza) - stoppable_timer(1e-6, function () + module:add_timer(1e-6, function () if can_do_smacks(session) then if stanza:get_child("sm", xmlns_sm3) then session.sends2s(st.stanza("enable", sm3_attr)); @@ -330,7 +314,7 @@ end); end); -function handle_enabled(session, stanza, xmlns_sm) +function handle_enabled(session, stanza, xmlns_sm) -- luacheck: ignore 212/stanza module:log("debug", "Enabling stream management"); session.smacks = xmlns_sm; @@ -340,10 +324,10 @@ return true; end -module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); -module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); +module:hook_tag(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100); +module:hook_tag(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100); -function handle_r(origin, stanza, xmlns_sm) +function handle_r(origin, stanza, xmlns_sm) -- luacheck: ignore 212/stanza if not origin.smacks then module:log("debug", "Received ack request from non-smack-enabled session"); return; @@ -358,8 +342,8 @@ end return true; end -module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); -module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); +module:hook_tag(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end); +module:hook_tag(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end); function handle_a(origin, stanza) if not origin.smacks then return; end @@ -387,11 +371,11 @@ for i=1,#queue do origin.log("debug", "Q item %d: %s", i, tostring(queue[i])); end - origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; }; - return; + origin:close{ condition = "undefined-condition"; text = "Client acknowledged more stanzas than sent by server"; }; + return; end - for i=1,math_min(handled_stanza_count,#queue) do + for _=1,math_min(handled_stanza_count,#queue) do local handled_stanza = t_remove(origin.outgoing_stanza_queue, 1); module:fire_event("delivery/success", { session = origin, stanza = handled_stanza }); end @@ -401,8 +385,8 @@ request_ack_if_needed(origin, false, "handle_a", nil) return true; end -module:hook_stanza(xmlns_sm2, "a", handle_a); -module:hook_stanza(xmlns_sm3, "a", handle_a); +module:hook_tag(xmlns_sm2, "a", handle_a); +module:hook_tag(xmlns_sm3, "a", handle_a); --TODO: Optimise... incoming stanzas should be handled by a per-session -- function that has a counter as an upvalue (no table indexing for increments, @@ -450,14 +434,14 @@ -- don't store messages in offline store if they are mam results local mam_result = stanza:get_child("result", xmlns_mam2); if mam_result ~= nil then - return true; -- stanza already "handled", don't send an error and don't add it to offline storage + return true; -- stanza already "handled", don't send an error and don't add it to offline storage end -- do nothing here for normal messages and don't send out "message delivery errors", -- because messages are already in MAM at this point (no need to frighten users) local stanza_id = get_stanza_id(stanza, jid.bare(session.full_jid)); if session.mam_requested and stanza_id ~= nil then session.log("debug", "mod_smacks delivery/failure returning true for mam-handled stanza: mam-archive-id=%s", tostring(stanza_id)); - return true; -- stanza handled, don't send an error + return true; -- stanza handled, don't send an error end -- store message in offline store, if this client does not use mam *and* was the last client online local sessions = prosody.hosts[module.host].sessions[session.username] and @@ -465,14 +449,14 @@ if sessions and next(sessions) == session.resource and next(sessions, session.resource) == nil then local ok = module:fire_event("message/offline/handle", { origin = session, username = session.username, stanza = stanza }); session.log("debug", "mod_smacks delivery/failuere returning %s for offline-handled stanza", tostring(ok)); - return ok; -- if stanza was handled, don't send an error + return ok; -- if stanza was handled, don't send an error end end end end); module:hook("pre-resource-unbind", function (event) - local session, err = event.session, event.error; + local session = event.session; if session.smacks then if not session.resumption_token then local queue = session.outgoing_stanza_queue; @@ -492,7 +476,7 @@ -- matches the smacks session this timer is for in case it changed -- (for example, the client may have bound a new resource and -- started a new smacks session, or not be using smacks) - local curr_session = full_sessions[session.full_jid]; + local curr_session = prosody.full_sessions[session.full_jid]; if session.destroyed then session.log("debug", "The session has already been destroyed"); elseif curr_session and curr_session.resumption_token == resumption_token @@ -509,8 +493,9 @@ return resume_timeout; end if session.push_identifier ~= nil and current_time-timeout_start < resume_timeout then - session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", resume_timeout-(current_time-timeout_start)); - return resume_timeout-(current_time-timeout_start); -- time left to wait + session.log("debug", "A push happened since hibernation started, hibernating session for up to %d extra seconds", + resume_timeout - (current_time - timeout_start)); + return resume_timeout-(current_time-timeout_start); -- time left to wait end session.log("debug", "Destroying session for hibernating too long"); session_registry.set(session.username, session.resumption_token, nil); @@ -626,7 +611,7 @@ session.send(queue[i]); end session.log("debug", "all stanzas resent, now disabling send() in this migrated session, #queue = %d", #queue); - function session.send(stanza) + function session.send(stanza) -- luacheck: ignore 432 migrated_session_log("error", "Tried to send stanza on old session migrated by smacks resume (maybe there is a bug?): %s", tostring(stanza)); return false; end @@ -641,8 +626,8 @@ end return true; end -module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); -module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); +module:hook_tag(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end); +module:hook_tag(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end); module:hook("csi-client-active", function (event) if event.origin.smacks then @@ -678,8 +663,8 @@ (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })); session.awaiting_ack = true; if not session.delayed_ack_timer then - session.delayed_ack_timer = stoppable_timer(delayed_ack_timeout, function() - delayed_ack_function(session, nil); + session.delayed_ack_timer = module:add_timer(delayed_ack_timeout, function() + ack_delayed(session, nil); end); end return true;