193 |
206 |
194 for push_identifier, _ in pairs(user_push_services) do |
207 for push_identifier, _ in pairs(user_push_services) do |
195 if hashes.sha256(push_identifier, true) == stanza.attr.id then |
208 if hashes.sha256(push_identifier, true) == stanza.attr.id then |
196 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then |
209 if user_push_services[push_identifier] and user_push_services[push_identifier].jid == from and push_errors[push_identifier] > 0 then |
197 push_errors[push_identifier] = 0; |
210 push_errors[push_identifier] = 0; |
|
211 -- unhook iq handlers for this identifier (if possible) |
|
212 if module.unhook then |
|
213 module:unhook("iq-error/host/"..stanza.attr.id, handle_push_error); |
|
214 module:unhook("iq-result/host/"..stanza.attr.id, handle_push_success); |
|
215 id2node[stanza.attr.id] = nil; |
|
216 end |
198 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); |
217 module:log("debug", "Push succeeded, error count for identifier '%s' is now at %s again", push_identifier, tostring(push_errors[push_identifier])); |
199 end |
218 end |
200 end |
219 end |
201 end |
220 end |
202 return true; |
221 return true; |
270 origin.first_hibernated_push = nil; |
289 origin.first_hibernated_push = nil; |
271 end |
290 end |
272 user_push_services[key] = nil; |
291 user_push_services[key] = nil; |
273 push_errors[key] = nil; |
292 push_errors[key] = nil; |
274 if module.unhook then |
293 if module.unhook then |
275 module:unhook("iq-error/host/"..key, handle_push_error); |
294 local stanza_id = hashes.sha256(key, true) |
276 module:unhook("iq-result/host/"..key, handle_push_success); |
295 module:unhook("iq-error/host/"..stanza_id, handle_push_error); |
277 id2node[key] = nil; |
296 module:unhook("iq-result/host/"..stanza_id, handle_push_success); |
|
297 id2node[stanza_id] = nil; |
278 end |
298 end |
279 end |
299 end |
280 end |
300 end |
281 local ok = push_store:set(origin.username, user_push_services); |
301 local ok = push_store:set(origin.username, user_push_services); |
282 if not ok then |
302 if not ok then |
442 local node, user_push_services = get_push_settings(event.stanza, event.origin); |
462 local node, user_push_services = get_push_settings(event.stanza, event.origin); |
443 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); |
463 module:log("debug", "Invoking cloud handle_notify_request() for offline stanza"); |
444 handle_notify_request(event.stanza, node, user_push_services, true); |
464 handle_notify_request(event.stanza, node, user_push_services, true); |
445 end, 1); |
465 end, 1); |
446 |
466 |
447 -- publish on unacked smacks message |
467 local function process_stanza_queue(queue, session, queue_type) |
448 local function process_smacks_stanza(stanza, session) |
|
449 if session.push_identifier then |
|
450 session.log("debug", "Invoking cloud handle_notify_request() for smacks queued stanza"); |
|
451 local user_push_services = {[session.push_identifier] = session.push_settings}; |
|
452 local node = get_push_settings(stanza, session); |
|
453 if handle_notify_request(stanza, node, user_push_services, true) ~= 0 then |
|
454 if session.hibernating and not session.first_hibernated_push then |
|
455 -- if important stanzas are treated differently (pushed with last-message-body field set to dummy string) |
|
456 -- and the message was important (e.g. had a last-message-body field) OR if we treat all pushes equally, |
|
457 -- then record the time of first push in the session for the smack module which will extend its hibernation |
|
458 -- timeout based on the value of session.first_hibernated_push |
|
459 if not dummy_body or (dummy_body and is_important(stanza)) then |
|
460 session.first_hibernated_push = os_time(); |
|
461 end |
|
462 end |
|
463 end |
|
464 end |
|
465 return stanza; |
|
466 end |
|
467 |
|
468 local function process_smacks_queue(queue, session) |
|
469 if not session.push_identifier then return; end |
468 if not session.push_identifier then return; end |
470 local user_push_services = {[session.push_identifier] = session.push_settings}; |
469 local user_push_services = {[session.push_identifier] = session.push_settings}; |
471 local notified = { unimportant = false; important = false } |
470 local notified = { unimportant = false; important = false } |
472 for i=1, #queue do |
471 for i=1, #queue do |
473 local stanza = queue[i]; |
472 local stanza = queue[i]; |
484 -- timeout based on the value of session.first_hibernated_push |
483 -- timeout based on the value of session.first_hibernated_push |
485 if not dummy_body or (dummy_body and is_important(stanza)) then |
484 if not dummy_body or (dummy_body and is_important(stanza)) then |
486 session.first_hibernated_push = os_time(); |
485 session.first_hibernated_push = os_time(); |
487 end |
486 end |
488 end |
487 end |
489 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other queued stanzas of type %s", stanza_type); |
488 session.log("debug", "Cloud handle_notify_request() > 0, not notifying for other %s queued stanzas of type %s", queue_type, stanza_type); |
490 notified[stanza_type] = true |
489 notified[stanza_type] = true |
491 end |
490 end |
492 end |
491 end |
493 end |
492 end |
|
493 end |
|
494 |
|
495 -- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once) |
|
496 local function process_smacks_stanza(stanza, session) |
|
497 if session.push_identifier then |
|
498 if not session.push_queue then session.push_queue = {}; end |
|
499 local queue = session.push_queue; |
|
500 queue[#queue+1] = st.clone(stanza); |
|
501 if #queue == 1 then -- first stanza --> start timer |
|
502 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanza (in a moment)"); |
|
503 session.awaiting_push_timer = stoppable_timer(1e-06, function () |
|
504 session.log("debug", "Invoking cloud handle_notify_request() for newly smacks queued stanzas (now in timer)"); |
|
505 process_stanza_queue(session.push_queue, session, "push"); |
|
506 session.push_queue = {}; -- clean up queue after push |
|
507 end); |
|
508 end |
|
509 end |
|
510 return stanza; |
494 end |
511 end |
495 |
512 |
496 -- smacks hibernation is started |
513 -- smacks hibernation is started |
497 local function hibernate_session(event) |
514 local function hibernate_session(event) |
498 local session = event.origin; |
515 local session = event.origin; |
499 local queue = event.queue; |
516 local queue = event.queue; |
500 session.first_hibernated_push = nil; |
517 session.first_hibernated_push = nil; |
501 -- process unacked stanzas |
518 -- process unacked stanzas |
502 process_smacks_queue(queue, session); |
519 process_stanza_queue(queue, session, "smacks"); |
503 -- process future unacked (hibernated) stanzas |
520 -- process future unacked (hibernated) stanzas |
504 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); |
521 filters.add_filter(session, "stanzas/out", process_smacks_stanza, -990); |
505 end |
522 end |
506 |
523 |
507 -- smacks hibernation is ended |
524 -- smacks hibernation is ended |
508 local function restore_session(event) |
525 local function restore_session(event) |
509 local session = event.resumed; |
526 local session = event.resumed; |
510 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one |
527 if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one |
511 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); |
528 filters.remove_filter(session, "stanzas/out", process_smacks_stanza); |
|
529 if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end |
512 session.first_hibernated_push = nil; |
530 session.first_hibernated_push = nil; |
513 end |
531 end |
514 end |
532 end |
515 |
533 |
516 -- smacks ack is delayed |
534 -- smacks ack is delayed |
517 local function ack_delayed(event) |
535 local function ack_delayed(event) |
518 local session = event.origin; |
536 local session = event.origin; |
519 local queue = event.queue; |
537 local queue = event.queue; |
520 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) |
538 -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas) |
521 process_smacks_queue(queue, session); |
539 process_stanza_queue(queue, session, "smacks"); |
522 end |
540 end |
523 |
541 |
524 -- archive message added |
542 -- archive message added |
525 local function archive_message_added(event) |
543 local function archive_message_added(event) |
526 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } |
544 -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id } |
562 module:hook("smacks-hibernation-end", restore_session); |
580 module:hook("smacks-hibernation-end", restore_session); |
563 module:hook("smacks-ack-delayed", ack_delayed); |
581 module:hook("smacks-ack-delayed", ack_delayed); |
564 module:hook("archive-message-added", archive_message_added); |
582 module:hook("archive-message-added", archive_message_added); |
565 |
583 |
566 local function send_ping(event) |
584 local function send_ping(event) |
567 local user = event.user; |
585 local push_services = event.push_services; |
568 local user_push_services = push_store:get(user); |
586 if not push_services then |
569 local push_services = event.push_services or user_push_services; |
587 local user = event.user; |
|
588 push_services = push_store:get(user); |
|
589 end |
570 handle_notify_request(nil, user, push_services, true); |
590 handle_notify_request(nil, user, push_services, true); |
571 end |
591 end |
572 -- can be used by other modules to ping one or more (or all) push endpoints |
592 -- can be used by other modules to ping one or more (or all) push endpoints |
573 module:hook("cloud-notify-ping", send_ping); |
593 module:hook("cloud-notify-ping", send_ping); |
574 |
594 |