mod_cloud_notify/mod_cloud_notify.lua
changeset 3623 74aa35aeb08a
parent 3112 cfcb020bcd1d
child 3626 21f870e1ba55
equal deleted inserted replaced
3622:f781a90018f4 3623:74aa35aeb08a
     1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections)
     1 -- XEP-0357: Push (aka: My mobile OS vendor won't let me have persistent TCP connections)
     2 -- Copyright (C) 2015-2016 Kim Alvefur
     2 -- Copyright (C) 2015-2016 Kim Alvefur
     3 -- Copyright (C) 2017-2018 Thilo Molitor
     3 -- Copyright (C) 2017-2019 Thilo Molitor
     4 --
     4 --
     5 -- This file is MIT/X11 licensed.
     5 -- This file is MIT/X11 licensed.
     6 
     6 
     7 local t_insert = table.insert;
     7 local t_insert = table.insert;
     8 local s_match = string.match;
     8 local s_match = string.match;
    89 		count = count + 1;
    89 		count = count + 1;
    90 		if count > maximum then break end
    90 		if count > maximum then break end
    91 		result[key] = value;
    91 		result[key] = value;
    92 	end
    92 	end
    93 	return result;
    93 	return result;
       
    94 end
       
    95 
       
    96 local function stoppable_timer(delay, callback)
       
    97 	local stopped = false;
       
    98 	local timer = module:add_timer(delay, function (t)
       
    99 		if stopped then return; end
       
   100 		return callback(t);
       
   101 	end);
       
   102 	if timer.stop then return timer; end		-- new prosody api includes stop() function
       
   103 	return {
       
   104 		stop = function () stopped = true end;
       
   105 		timer;
       
   106 	};
    94 end
   107 end
    95 
   108 
    96 -- For keeping state across reloads while caching reads
   109 -- For keeping state across reloads while caching reads
    97 local push_store = (function()
   110 local push_store = (function()
    98 	local store = module:open_store();
   111 	local store = module:open_store();
   193 	
   206 	
   194 	for push_identifier, _ in pairs(user_push_services) do
   207 	for push_identifier, _ in pairs(user_push_services) do
   195 		if hashes.sha256(push_identifier, true) == stanza.attr.id then
   208 		if hashes.sha256(push_identifier, true) == stanza.attr.id then
   196 			if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then
   209 			if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then
   197 				push_errors[push_identifier] = 0;
   210 				push_errors[push_identifier] = 0;
       
   211 				-- unhook iq handlers for this identifier (if possible)
       
   212 				if module.unhook then
       
   213 					module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error);
       
   214 					module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success);
       
   215 					id2node[stanza.attr.id] = nil;
       
   216 				end
   198 				module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier]));
   217 				module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier]));
   199 			end
   218 			end
   200 		end
   219 		end
   201 	end
   220 	end
   202 	return true;
   221 	return true;
   270 				origin.first_hibernated_push = nil;
   289 				origin.first_hibernated_push = nil;
   271 			end
   290 			end
   272 			user_push_services[key] = nil;
   291 			user_push_services[key] = nil;
   273 			push_errors[key] = nil;
   292 			push_errors[key] = nil;
   274 			if module.unhook then
   293 			if module.unhook then
   275 				module:unhook("iq-error/host/"..key, handle_push_error);
   294 				local stanza_id = hashes.sha256(key, true)
   276 				module:unhook("iq-result/host/"..key, handle_push_success);
   295 				module:unhook("iq-error/host/"..stanza_id, handle_push_error);
   277 				id2node[key] = nil;
   296 				module:unhook("iq-result/host/"..stanza_id, handle_push_success);
       
   297 				id2node[stanza_id] = nil;
   278 			end
   298 			end
   279 		end
   299 		end
   280 	end
   300 	end
   281 	local ok = push_store:set(origin.username, user_push_services);
   301 	local ok = push_store:set(origin.username, user_push_services);
   282 	if not ok then
   302 	if not ok then
   442 	local node, user_push_services = get_push_settings(event.stanza, event.origin);
   462 	local node, user_push_services = get_push_settings(event.stanza, event.origin);
   443 	module:log("debug", "Invoking cloud handle_notify_request() for offline stanza");
   463 	module:log("debug", "Invoking cloud handle_notify_request() for offline stanza");
   444 	handle_notify_request(event.stanza, node, user_push_services, true);
   464 	handle_notify_request(event.stanza, node, user_push_services, true);
   445 end, 1);
   465 end, 1);
   446 
   466 
   447 -- publish on unacked smacks message
   467 local function process_stanza_queue(queue, session, queue_type)
   448 local function process_smacks_stanza(stanza, session)
       
   449 	if session.push_identifier then
       
   450 		session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza");
       
   451 		local user_push_services = {[session.push_identifier] = session.push_settings};
       
   452 		local node = get_push_settings(stanza, session);
       
   453 		if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then
       
   454 			if session.hibernating and not session.first_hibernated_push then
       
   455 				-- if important stanzas are treated differently (pushed with last-message-body field set to dummy string)
       
   456 				-- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally,
       
   457 				-- then record the time of first push in the session for the smack module which will extend its hibernation
       
   458 				-- timeout based on the value of session.first_hibernated_push
       
   459 				if not dummy_body or (dummy_body and is_important(stanza)) then
       
   460 					session.first_hibernated_push = os_time();
       
   461 				end
       
   462 			end
       
   463 		end
       
   464 	end
       
   465 	return stanza;
       
   466 end
       
   467 
       
   468 local function process_smacks_queue(queue, session)
       
   469 	if not session.push_identifier then return; end
   468 	if not session.push_identifier then return; end
   470 	local user_push_services = {[session.push_identifier] = session.push_settings};
   469 	local user_push_services = {[session.push_identifier] = session.push_settings};
   471 	local notified = { unimportant = false; important = false }
   470 	local notified = { unimportant = false; important = false }
   472 	for i=1, #queue do
   471 	for i=1, #queue do
   473 		local stanza = queue[i];
   472 		local stanza = queue[i];
   484 					-- timeout based on the value of session.first_hibernated_push
   483 					-- timeout based on the value of session.first_hibernated_push
   485 					if not dummy_body or (dummy_body and is_important(stanza)) then
   484 					if not dummy_body or (dummy_body and is_important(stanza)) then
   486 						session.first_hibernated_push = os_time();
   485 						session.first_hibernated_push = os_time();
   487 					end
   486 					end
   488 				end
   487 				end
   489 				session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type);
   488 				session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type);
   490 				notified[stanza_type] = true
   489 				notified[stanza_type] = true
   491 			end
   490 			end
   492 		end
   491 		end
   493 	end
   492 	end
       
   493 end
       
   494 
       
   495 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
       
   496 local function process_smacks_stanza(stanza, session)
       
   497 	if session.push_identifier then
       
   498 		if not session.push_queue then session.push_queue = {}; end
       
   499 		local queue = session.push_queue;
       
   500 		queue[#queue+1] = st.clone(stanza);
       
   501 		if #queue == 1 then		-- first stanza --> start timer
       
   502 			session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)");
       
   503 			session.awaiting_push_timer = stoppable_timer(1e-06, function ()
       
   504 				session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)");
       
   505 				process_stanza_queue(session.push_queue, session, "push");
       
   506 				session.push_queue = {};		-- clean up queue after push
       
   507 			end);
       
   508 		end
       
   509 	end
       
   510 	return stanza;
   494 end
   511 end
   495 
   512 
   496 -- smacks hibernation is started
   513 -- smacks hibernation is started
   497 local function hibernate_session(event)
   514 local function hibernate_session(event)
   498 	local session = event.origin;
   515 	local session = event.origin;
   499 	local queue = event.queue;
   516 	local queue = event.queue;
   500 	session.first_hibernated_push = nil;
   517 	session.first_hibernated_push = nil;
   501 	-- process unacked stanzas
   518 	-- process unacked stanzas
   502 	process_smacks_queue(queue, session);
   519 	process_stanza_queue(queue, session, "smacks");
   503 	-- process future unacked (hibernated) stanzas
   520 	-- process future unacked (hibernated) stanzas
   504 	filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990);
   521 	filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990);
   505 end
   522 end
   506 
   523 
   507 -- smacks hibernation is ended
   524 -- smacks hibernation is ended
   508 local function restore_session(event)
   525 local function restore_session(event)
   509 	local session = event.resumed;
   526 	local session = event.resumed;
   510 	if session then		-- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
   527 	if session then		-- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
   511 		filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
   528 		filters.remove_filter(session, "stanzas/out", process_smacks_stanza);
       
   529 		if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
   512 		session.first_hibernated_push = nil;
   530 		session.first_hibernated_push = nil;
   513 	end
   531 	end
   514 end
   532 end
   515 
   533 
   516 -- smacks ack is delayed
   534 -- smacks ack is delayed
   517 local function ack_delayed(event)
   535 local function ack_delayed(event)
   518 	local session = event.origin;
   536 	local session = event.origin;
   519 	local queue = event.queue;
   537 	local queue = event.queue;
   520 	-- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
   538 	-- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
   521 	process_smacks_queue(queue, session);
   539 	process_stanza_queue(queue, session, "smacks");
   522 end
   540 end
   523 
   541 
   524 -- archive message added
   542 -- archive message added
   525 local function archive_message_added(event)
   543 local function archive_message_added(event)
   526 	-- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
   544 	-- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
   562 module:hook("smacks-hibernation-end", restore_session);
   580 module:hook("smacks-hibernation-end", restore_session);
   563 module:hook("smacks-ack-delayed", ack_delayed);
   581 module:hook("smacks-ack-delayed", ack_delayed);
   564 module:hook("archive-message-added", archive_message_added);
   582 module:hook("archive-message-added", archive_message_added);
   565 
   583 
   566 local function send_ping(event)
   584 local function send_ping(event)
   567 	local user = event.user;
   585 	local push_services = event.push_services;
   568 	local user_push_services = push_store:get(user);
   586 	if not push_services then
   569 	local push_services = event.push_services or user_push_services;
   587 		local user = event.user;
       
   588 		push_services = push_store:get(user);
       
   589 	end
   570 	handle_notify_request(nil, user, push_services, true);
   590 	handle_notify_request(nil, user, push_services, true);
   571 end
   591 end
   572 -- can be used by other modules to ping one or more (or all) push endpoints
   592 -- can be used by other modules to ping one or more (or all) push endpoints
   573 module:hook("cloud-notify-ping", send_ping);
   593 module:hook("cloud-notify-ping", send_ping);
   574 
   594