--- a/mod_cloud_notify/mod_cloud_notify.lua Thu Jun 13 00:25:12 2019 +0200
+++ b/mod_cloud_notify/mod_cloud_notify.lua Sat Jun 15 01:26:15 2019 +0200
@@ -1,6 +1,6 @@
-- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections)
-- Copyright (C) 2015-2016 Kim Alvefur
--- Copyright (C) 2017-2018 Thilo Molitor
+-- Copyright (C) 2017-2019 Thilo Molitor
--
-- This file is MIT/X11 licensed.
@@ -93,6 +93,19 @@
return result;
end
+local function stoppable_timer(delay, callback)
+ local stopped = false;
+ local timer = module:add_timer(delay, function (t)
+ if stopped then return; end
+ return callback(t);
+ end);
+ if timer.stop then return timer; end -- new prosody api includes stop() function
+ return {
+ stop = function () stopped = true end;
+ timer;
+ };
+end
+
-- For keeping state across reloads while caching reads
local push_store = (function()
local store = module:open_store();
@@ -195,6 +208,12 @@
if hashes.sha256(push_identifier, true) == stanza.attr.id then
if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then
push_errors[push_identifier] = 0;
+ -- unhook iq handlers for this identifier (if possible)
+ if module.unhook then
+ module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error);
+ module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success);
+ id2node[stanza.attr.id] = nil;
+ end
module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier]));
end
end
@@ -272,9 +291,10 @@
user_push_services[key] = nil;
push_errors[key] = nil;
if module.unhook then
- module:unhook("iq-error/host/"..key, handle_push_error);
- module:unhook("iq-result/host/"..key, handle_push_success);
- id2node[key] = nil;
+ local stanza_id = hashes.sha256(key, true)
+ module:unhook("iq-error/host/"..stanza_id, handle_push_error);
+ module:unhook("iq-result/host/"..stanza_id, handle_push_success);
+ id2node[stanza_id] = nil;
end
end
end
@@ -444,28 +464,7 @@
handle_notify_request(event.stanza, node, user_push_services, true);
end, 1);
--- publish on unacked smacks message
-local function process_smacks_stanza(stanza, session)
- if session.push_identifier then
- session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza");
- local user_push_services = {[session.push_identifier] = session.push_settings};
- local node = get_push_settings(stanza, session);
- if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then
- if session.hibernating and not session.first_hibernated_push then
- -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string)
- -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally,
- -- then record the time of first push in the session for the smack module which will extend its hibernation
- -- timeout based on the value of session.first_hibernated_push
- if not dummy_body or (dummy_body and is_important(stanza)) then
- session.first_hibernated_push = os_time();
- end
- end
- end
- end
- return stanza;
-end
-
-local function process_smacks_queue(queue, session)
+local function process_stanza_queue(queue, session, queue_type)
if not session.push_identifier then return; end
local user_push_services = {[session.push_identifier] = session.push_settings};
local notified = { unimportant = false; important = false }
@@ -486,20 +485,38 @@
session.first_hibernated_push = os_time();
end
end
- session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type);
+ session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type);
notified[stanza_type] = true
end
end
end
end
+-- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
+local function process_smacks_stanza(stanza, session)
+ if session.push_identifier then
+ if not session.push_queue then session.push_queue = {}; end
+ local queue = session.push_queue;
+ queue[#queue+1] = st.clone(stanza);
+ if #queue == 1 then -- first stanza --> start timer
+ session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)");
+ session.awaiting_push_timer = stoppable_timer(1e-06, function ()
+ session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)");
+ process_stanza_queue(session.push_queue, session, "push");
+ session.push_queue = {}; -- clean up queue after push
+ end);
+ end
+ end
+ return stanza;
+end
+
-- smacks hibernation is started
local function hibernate_session(event)
local session = event.origin;
local queue = event.queue;
session.first_hibernated_push = nil;
-- process unacked stanzas
- process_smacks_queue(queue, session);
+ process_stanza_queue(queue, session, "smacks");
-- process future unacked (hibernated) stanzas
filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990);
end
@@ -509,6 +526,7 @@
local session = event.resumed;
if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
+ if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
session.first_hibernated_push = nil;
end
end
@@ -518,7 +536,7 @@
local session = event.origin;
local queue = event.queue;
-- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
- process_smacks_queue(queue, session);
+ process_stanza_queue(queue, session, "smacks");
end
-- archive message added
@@ -564,9 +582,11 @@
module:hook("archive-message-added", archive_message_added);
local function send_ping(event)
- local user = event.user;
- local user_push_services = push_store:get(user);
- local push_services = event.push_services or user_push_services;
+ local push_services = event.push_services;
+ if not push_services then
+ local user = event.user;
+ push_services = push_store:get(user);
+ end
handle_notify_request(nil, user, push_services, true);
end
-- can be used by other modules to ping one or more (or all) push endpoints