--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_push2/mod_push2.lua Tue Sep 19 21:21:17 2023 -0500
@@ -0,0 +1,587 @@
+local os_time = os.time;
+local st = require"util.stanza";
+local jid = require"util.jid";
+local hashes = require"util.hashes";
+local random = require"util.random";
+local watchdog = require "util.watchdog";
+local uuid = require "util.uuid";
+local base64 = require "util.encodings".base64;
+local ciphers = require "openssl.cipher";
+local pkey = require "openssl.pkey";
+local kdf = require "openssl.kdf";
+local jwt = require "util.jwt";
+
+local xmlns_push = "urn:xmpp:push2:0";
+
+-- configuration
+local contact_uri = module:get_option_string("contact_uri", "xmpp:" .. module.host)
+local extended_hibernation_timeout = module:get_option_number("push_max_hibernation_timeout", 72*3600) -- use same timeout like ejabberd
+
+local host_sessions = prosody.hosts[module.host].sessions
+local push2_registrations = module:open_store("push2_registrations", "keyval")
+
+if _VERSION:match("5%.1") or _VERSION:match("5%.2") then
+ module:log("warn", "This module may behave incorrectly on Lua before 5.3. It is recommended to upgrade to a newer Lua version.")
+end
+
+local function account_dico_info(event)
+ (event.reply or event.stanza):tag("feature", {var=xmlns_push}):up()
+end
+module:hook("account-disco-info", account_dico_info);
+
+local function parse_match(matchel)
+ local match = { match = matchel.attr.profile }
+ local send = matchel:get_child("send", "urn:xmpp:push2:send:notify-only:0")
+ if send then
+ match.send = send.attr.xmlns
+ return match
+ end
+
+ send = matchel:get_child("send", "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0")
+ if send then
+ match.send = send.attr.xmlns
+ match.ua_public = send:get_child_text("ua-public")
+ match.auth_secret = send:get_child_text("auth-secret")
+ match.jwt_alg = send:get_child_text("jwt-alg")
+ match.jwt_key = send:get_child_text("jwt-key")
+ match.jwt_claims = {}
+ for claim in send:childtags("jwt-claim") do
+ match.jwt_claims[claim.attr.name] = claim:get_text()
+ end
+ return match
+ end
+
+ return nil
+end
+
+local function push_enable(event)
+ local origin, stanza = event.origin, event.stanza;
+ local enable = stanza.tags[1];
+ origin.log("debug", "Attempting to enable push notifications")
+ -- MUST contain a jid of the push service being enabled
+ local service_jid = enable:get_child_text("service")
+ -- MUST contain a string to identify the client fo the push service
+ local client = enable:get_child_text("client")
+ if not service_jid then
+ origin.log("debug", "Push notification enable request missing service")
+ origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing service"))
+ return true
+ end
+ if not client then
+ origin.log("debug", "Push notification enable request missing client")
+ origin.send(st.error_reply(stanza, "modify", "bad-request", "Missing client"))
+ return true
+ end
+ if service_jid == stanza.attr.from then
+ origin.log("debug", "Push notification enable request service JID identical to our own")
+ origin.send(st.error_reply(stanza, "modify", "bad-request", "JID must be different from ours"))
+ return true
+ end
+ local matches = {}
+ for matchel in enable:childtags("match") do
+ local match = parse_match(matchel)
+ if match then
+ matches[#matches + 1] = match
+ end
+ end
+ -- Tie registration to client, via client_id with sasl2 or else fallback to resource
+ local registration_id = origin.client_id or origin.resource
+ local push_registration = {
+ service = service_jid;
+ client = client;
+ timestamp = os_time();
+ matches = matches;
+ };
+ -- TODO: can we move to keyval+ on trunk?
+ local registrations = push2_registrations:get(origin.username) or {}
+ registrations[registration_id] = push_registration
+ if not push2_registrations:set(origin.username, registrations) then
+ origin.send(st.error_reply(stanza, "wait", "internal-server-error"));
+ else
+ origin.push_registration_id = registration_id
+ origin.push_registration = push_registration
+ origin.first_hibernated_push = nil
+ origin.log("info", "Push notifications enabled for %s (%s)", tostring(stanza.attr.from), tostring(service_jid))
+ origin.send(st.reply(stanza))
+ end
+ return true
+end
+module:hook("iq-set/self/"..xmlns_push..":enable", push_enable)
+
+-- urgent stanzas should be delivered without delay
+local function is_voip(stanza)
+ if stanza.name == "message" then
+ if stanza:get_child("propose", "urn:xmpp:jingle-message:0") then
+ return true, "jingle call"
+ end
+ end
+end
+
+local function has_body(stanza)
+ -- We can't check for body contents in encrypted messages, so let's treat them as important
+ -- Some clients don't even set a body or an empty body for encrypted messages
+
+ -- check omemo https://xmpp.org/extensions/inbox/omemo.html
+ if stanza:get_child("encrypted", "eu.siacs.conversations.axolotl") or stanza:get_child("encrypted", "urn:xmpp:omemo:0") then return true; end
+
+ -- check xep27 pgp https://xmpp.org/extensions/xep-0027.html
+ if stanza:get_child("x", "jabber:x:encrypted") then return true; end
+
+ -- check xep373 pgp (OX) https://xmpp.org/extensions/xep-0373.html
+ if stanza:get_child("openpgp", "urn:xmpp:openpgp:0") then return true; end
+
+ local body = stanza:get_child_text("body");
+
+ return body ~= nil and body ~= ""
+end
+
+-- is this push a high priority one
+local function is_important(stanza)
+ local is_voip_stanza, urgent_reason = is_voip(stanza)
+ if is_voip_stanza then return true; end
+
+ local st_name = stanza and stanza.name or nil
+ if not st_name then return false; end -- nonzas are never important here
+ if st_name == "presence" then
+ return false; -- same for presences
+ elseif st_name == "message" then
+ -- unpack carbon copied message stanzas
+ local carbon = stanza:find("{urn:xmpp:carbons:2}/{urn:xmpp:forward:0}/{jabber:client}message")
+ local stanza_direction = carbon and stanza:child_with_name("sent") and "out" or "in"
+ if carbon then stanza = carbon; end
+ local st_type = stanza.attr.type
+
+ -- headline message are always not important
+ if st_type == "headline" then return false; end
+
+ -- carbon copied outgoing messages are not important
+ if carbon and stanza_direction == "out" then return false; end
+
+ -- groupchat subjects are not important here
+ if st_type == "groupchat" and stanza:get_child_text("subject") then
+ return false
+ end
+
+ -- empty bodies are not important
+ return has_body(stanza)
+ end
+ return false; -- this stanza wasn't one of the above cases --> it is not important, too
+end
+
+local function add_sce_rfc8291(match, stanza, push_notification_payload)
+ local max_data_size = 2847 -- https://github.com/web-push-libs/web-push-php/issues/108
+ local stanza_clone = st.clone(stanza)
+ stanza_clone.attr.xmlns = "jabber:client"
+ local envelope = st.stanza("envelope", { xmlns = "urn:xmpp:sce:1" })
+ :tag("content")
+ :tag("forwarded", { xmlns = "urn:xmpp:forward:0" })
+ :add_child(stanza_clone)
+ :up():up():up()
+ local envelope_bytes = tostring(envelope)
+ if string.len(envelope_bytes) > max_data_size then
+ -- If stanza is too big, remove extra elements
+ stanza_clone:maptags(function(el)
+ if el.attr.xmlns == nil or
+ el.attr.xmlns == "jabber:client" or
+ el.attr.xmlns == "jabber:x:oob" or
+ (el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
+ el.attr.xmlns == "eu.siacs.conversations.axolotl" or
+ el.attr.xmlns == "urn:xmpp:omemo:0" or
+ el.attr.xmlns == "jabber:x:encrypted" or
+ el.attr.xmlns == "urn:xmpp:openpgp:0" or
+ el.attr.xmlns == "urn:xmpp:sce:1" or
+ el.attr.xmlns == "urn:xmpp:jingle-message:0" or
+ el.attr.xmlns == "jabber:x:conference"
+ then
+ return el
+ else
+ return nil
+ end
+ end)
+ envelope_bytes = tostring(envelope)
+ end
+ if string.len(envelope_bytes) > max_data_size then
+ -- If still too big, get aggressive
+ stanza_clone:maptags(function(el)
+ if el.name == "body" or
+ (el.attr.xmlns == "urn:xmpp:sid:0" and el.name == "stanza-id") or
+ el.attr.xmlns == "urn:xmpp:jingle-message:0" or
+ el.attr.xmlns == "jabber:x:conference"
+ then
+ return el
+ else
+ return nil
+ end
+ end)
+ envelope_bytes = tostring(envelope)
+ end
+ if string.len(envelope_bytes) < max_data_size/2 then
+ envelope:text_tag("rpad", base64.encode(random.bytes(math.min(150, max_data_size/3 - string.len(envelope_bytes)))))
+ envelope_bytes = tostring(envelope)
+ end
+
+ local p256dh_raw = base64.decode(match.ua_public .. "==")
+ local p256dh = pkey.new(p256dh_raw, "*", "public", "prime256v1")
+ local one_time_key = pkey.new({ type = "EC", curve = "prime256v1" })
+ local one_time_key_public = one_time_key:getParameters().pub_key:toBinary()
+ local info = "WebPush: info\0" .. p256dh_raw .. one_time_key_public
+ local auth_secret = base64.decode(match.auth_secret .. "==")
+ local salt = random.bytes(16)
+ local shared_secret = one_time_key:derive(p256dh)
+ local ikm = kdf.derive({
+ type = "HKDF",
+ outlen = 32,
+ salt = auth_secret,
+ key = shared_secret,
+ info = info,
+ md = "sha256"
+ })
+ local key = kdf.derive({
+ type = "HKDF",
+ outlen = 16,
+ salt = salt,
+ key = ikm,
+ info = "Content-Encoding: aes128gcm\0",
+ md = "sha256"
+ })
+ local nonce = kdf.derive({
+ type = "HKDF",
+ outlen = 12,
+ salt = salt,
+ key = ikm,
+ info = "Content-Encoding: nonce\0",
+ md = "sha256"
+ })
+ local header = salt .. "\0\0\16\0" .. string.char(string.len(one_time_key_public)) .. one_time_key_public
+ local encryptor = ciphers.new("AES-128-GCM"):encrypt(key, nonce)
+
+ push_notification_payload
+ :tag("encrypted", { xmlns = "urn:xmpp:sce:rfc8291:0" })
+ :text_tag("payload", base64.encode(header .. encryptor:final(envelope_bytes .. "\2") .. encryptor:getTag(16)))
+ :up()
+end
+
+local function add_rfc8292(match, stanza, push_notification_payload)
+ if not match.jwt_alg then return; end
+ local key = match.jwt_key
+ if match.jwt_alg ~= "HS256" then
+ -- keypairs are in PKCS#8 PEM format without header/footer
+ key = "-----BEGIN PRIVATE KEY-----\n"..key.."\n-----END PRIVATE KEY-----"
+ end
+
+ local signer = jwt.new_signer(match.jwt_alg, key)
+ local payload = {}
+ for k, v in pairs(match.jwt_claims or {}) do
+ payload[k] = v
+ end
+ payload.sub = contact_uri
+ push_notification_payload:text_tag("jwt", signer(payload))
+end
+
+local function handle_notify_request(stanza, node, user_push_services, log_push_decline)
+ local pushes = 0;
+ if not #user_push_services then return pushes end
+
+ local notify_push_services = {};
+ if is_important(stanza) then
+ notify_push_services = user_push_services
+ else
+ for identifier, push_info in pairs(user_push_services) do
+ for _, match in ipairs(push_info.matches) do
+ if match.match == "urn:xmpp:push2:match:important" then
+ identifier_found.log("debug", "Not pushing because not important")
+ else
+ notify_push_services[identifier] = push_info;
+ end
+ end
+ end
+ end
+
+ for push_registration_id, push_info in pairs(notify_push_services) do
+ local send_push = true; -- only send push to this node when not already done for this stanza or if no stanza is given at all
+ if stanza then
+ if not stanza._push_notify2 then stanza._push_notify2 = {}; end
+ if stanza._push_notify2[push_registration_id] then
+ if log_push_decline then
+ module:log("debug", "Already sent push notification for %s@%s to %s (%s)", node, module.host, push_info.jid, tostring(push_info.node));
+ end
+ send_push = false;
+ end
+ stanza._push_notify2[push_registration_id] = true;
+ end
+
+ if send_push then
+ local push_notification_payload = st.stanza("notification", { xmlns = xmlns_push })
+ push_notification_payload:text_tag("client", push_info.client)
+ push_notification_payload:text_tag("priority", is_voip(stanza) and "high" or (is_important(stanza) and "normal" or "low"))
+ if is_voip(stanza) then
+ push_notification_payload:tag("voip"):up()
+ end
+
+ local sends_added = {};
+ for _, match in ipairs(push_info.matches) do
+ local does_match = false;
+ if match.match == "urn:xmpp:push2:match:all" then
+ does_match = true
+ elseif match.match == "urn:xmpp:push2:match:important" then
+ does_match = is_important(stanza)
+ elseif match.match == "urn:xmpp:push2:match:archived" then
+ does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0")
+ elseif match.match == "urn:xmpp:push2:match:archived-with-body" then
+ does_match = stanza:get_child("stana-id", "urn:xmpp:sid:0") and has_body(stanza)
+ end
+
+ if does_match and not sends_added[match.send] then
+ sends_added[match.send] = true
+ if match.send == "urn:xmpp:push2:send:notify-only" then
+ -- Nothing more to add
+ elseif match.send == "urn:xmpp:push2:send:sce+rfc8291+rfc8292:0" then
+ add_sce_rfc8291(match, stanza, push_notification_payload)
+ add_rfc8292(match, stanza, push_notification_payload)
+ else
+ module:log("debug", "Unkonwn send profile: " .. push_info.send)
+ end
+ end
+ end
+
+ local push_publish = st.message({ to = push_info.service, from = module.host, id = uuid.generate() })
+ :add_child(push_notification_payload):up()
+
+ -- TODO: watch for message error replies and count or something
+ module:send(push_publish)
+ pushes = pushes + 1
+ end
+ end
+
+ return pushes
+end
+
+-- small helper function to extract relevant push settings
+local function get_push_settings(stanza, session)
+ local to = stanza.attr.to
+ local node = to and jid.split(to) or session.username
+ local user_push_services = push2_registrations:get(node)
+ return node, (user_push_services or {})
+end
+
+-- publish on bare groupchat
+-- this picks up MUC messages when there are no devices connected
+module:hook("message/bare/groupchat", function(event)
+ local node, user_push_services = get_push_settings(event.stanza, event.origin);
+ local notify_push_services = {};
+ for identifier, push_info in pairs(user_push_services) do
+ for _, match in ipairs(push_info.matches) do
+ if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
+ identifier_found.log("debug", "Not pushing because we are not archiving this stanza")
+ else
+ notify_push_services[identifier] = push_info;
+ end
+ end
+ end
+
+ handle_notify_request(event.stanza, node, notify_push_services, true);
+end, 1);
+
+local function process_stanza_queue(queue, session, queue_type)
+ if not session.push_registration_id then return; end
+ for _, match in ipairs(session.push_settings.matches) do
+ if match.match == "urn:xmpp:push2:match:archived-with-body" or match.match == "urn:xmpp:push2:match:archived" then
+ module:log("debug", "Not pushing because we are not archiving this stanza: %s", session.push_registration_id)
+ return
+ end
+ end
+ local user_push_services = {[session.push_registration_id] = session.push_settings};
+ local notified = { unimportant = false; important = false }
+ for i=1, #queue do
+ local stanza = queue[i];
+ -- fast ignore of already pushed stanzas
+ if stanza and not (stanza._push_notify2 and stanza._push_notify2[session.push_registration_id]) then
+ local node = get_push_settings(stanza, session);
+ local stanza_type = "unimportant";
+ if is_important(stanza) then stanza_type = "important"; end
+ if not notified[stanza_type] then -- only notify if we didn't try to push for this stanza type already
+ if handle_notify_request(stanza, node, user_push_services, false) ~= 0 then
+ if session.hibernating and not session.first_hibernated_push then
+ -- if the message was important
+ -- then record the time of first push in the session for the smack module which will extend its hibernation
+ -- timeout based on the value of session.first_hibernated_push
+ if is_important(stanza) then
+ session.first_hibernated_push = os_time();
+ -- check for prosody 0.12 mod_smacks
+ if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
+ -- restore old smacks watchdog (--> the start of our original timeout will be delayed until first push)
+ session.hibernating_watchdog:cancel();
+ session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
+ end
+ end
+ end
+ notified[stanza_type] = true
+ end
+ end
+ end
+ if notified.unimportant and notified.important then break; end -- stop processing the queue if all push types are exhausted
+ end
+end
+
+-- publish on unacked smacks message (use timer to send out push for all stanzas submitted in a row only once)
+local function process_stanza(session, stanza)
+ if session.push_registration_id then
+ session.log("debug", "adding new stanza to push_queue");
+ if not session.push_queue then session.push_queue = {}; end
+ local queue = session.push_queue;
+ queue[#queue+1] = st.clone(stanza);
+ if not session.awaiting_push_timer then -- timer not already running --> start new timer
+ session.awaiting_push_timer = module:add_timer(1.0, function ()
+ process_stanza_queue(session.push_queue, session, "push");
+ session.push_queue = {}; -- clean up queue after push
+ session.awaiting_push_timer = nil;
+ end);
+ end
+ end
+ return stanza;
+end
+
+local function process_smacks_stanza(event)
+ local session = event.origin;
+ local stanza = event.stanza;
+ if not session.push_registration_id then
+ session.log("debug", "NOT invoking handle_notify_request() for newly smacks queued stanza (session.push_registration_id is not set: %s)",
+ session.push_registration_id
+ );
+ else
+ process_stanza(session, stanza)
+ end
+end
+
+-- smacks hibernation is started
+local function hibernate_session(event)
+ local session = event.origin;
+ local queue = event.queue;
+ session.first_hibernated_push = nil;
+ if session.push_registration_id and session.hibernating_watchdog then -- check for prosody 0.12 mod_smacks
+ -- save old watchdog callback and timeout
+ session.original_smacks_callback = session.hibernating_watchdog.callback;
+ session.original_smacks_timeout = session.hibernating_watchdog.timeout;
+ -- cancel old watchdog and create a new watchdog with extended timeout
+ session.hibernating_watchdog:cancel();
+ session.hibernating_watchdog = watchdog.new(extended_hibernation_timeout, function()
+ session.log("debug", "Push-extended smacks watchdog triggered");
+ if session.original_smacks_callback then
+ session.log("debug", "Calling original smacks watchdog handler");
+ session.original_smacks_callback();
+ end
+ end);
+ end
+ -- process unacked stanzas
+ process_stanza_queue(queue, session, "smacks");
+end
+
+-- smacks hibernation is ended
+local function restore_session(event)
+ local session = event.resumed;
+ if session then -- older smacks module versions send only the "intermediate" session in event.session and no session.resumed one
+ if session.awaiting_push_timer then
+ session.awaiting_push_timer:stop();
+ session.awaiting_push_timer = nil;
+ end
+ session.first_hibernated_push = nil;
+ -- the extended smacks watchdog will be canceled by the smacks module, no need to anything here
+ end
+end
+
+-- smacks ack is delayed
+local function ack_delayed(event)
+ local session = event.origin;
+ local queue = event.queue;
+ local stanza = event.stanza;
+ if not session.push_registration_id then return; end
+ if stanza then process_stanza(session, stanza); return; end -- don't iterate through smacks queue if we know which stanza triggered this
+ for i=1, #queue do
+ local queued_stanza = queue[i];
+ -- process unacked stanzas (handle_notify_request() will only send push requests for new stanzas)
+ process_stanza(session, queued_stanza);
+ end
+end
+
+-- archive message added
+local function archive_message_added(event)
+ -- event is: { origin = origin, stanza = stanza, for_user = store_user, id = id }
+ if not event.for_user then return; end
+ -- Note that the stanza in the event is a clone not the same as other hooks, so dedupe doesn't work
+ -- This is a problem if you wan to to also hook offline message storage for example
+ local stanza = st.clone(event.stanza)
+ stanza:tag("stanza-id", { xmlns = "urn:xmpp:sid:0", by = event.for_user.."@"..module.host, id = event.id }):up()
+ local user_session = host_sessions[event.for_user] and host_sessions[event.for_user].sessions or {}
+ local to = stanza.attr.to
+ to = to and jid.split(to) or event.origin.username
+
+ -- only notify if the stanza destination is the mam user we store it for
+ if event.for_user == to then
+ local user_push_services = push2_registrations:get(to)
+
+ -- Urgent stanzas are time-sensitive (e.g. calls) and should
+ -- be pushed immediately to avoid getting stuck in the smacks
+ -- queue in case of dead connections, for example
+ local is_voip_stanza, urgent_reason = is_voip(stanza);
+
+ local notify_push_services;
+ if is_voip_stanza then
+ module:log("debug", "Urgent push for %s (%s)", to, urgent_reason);
+ notify_push_services = user_push_services;
+ else
+ -- only notify nodes with no active sessions (smacks is counted as active and handled separate)
+ notify_push_services = {};
+ for identifier, push_info in pairs(user_push_services) do
+ local identifier_found = nil;
+ for _, session in pairs(user_session) do
+ if session.push_registration_id == identifier then
+ identifier_found = session;
+ break;
+ end
+ end
+ if identifier_found then
+ identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (session still alive)", identifier)
+ elseif not has_body(stanza) then
+ for _, match in ipairs(push_info.matches) do
+ if match.match == "urn:xmpp:push2:match:archived-with-body" then
+ identifier_found.log("debug", "Not pushing '%s' of new MAM stanza (no body)", identifier)
+ else
+ notify_push_services[identifier] = push_info
+ end
+ end
+ else
+ notify_push_services[identifier] = push_info
+ end
+ end
+ end
+
+ handle_notify_request(stanza, to, notify_push_services, true);
+ end
+end
+
+module:hook("smacks-hibernation-start", hibernate_session);
+module:hook("smacks-hibernation-end", restore_session);
+module:hook("smacks-ack-delayed", ack_delayed);
+module:hook("smacks-hibernation-stanza-queued", process_smacks_stanza);
+module:hook("archive-message-added", archive_message_added);
+
+module:log("info", "Module loaded");
+function module.unload()
+ module:log("info", "Unloading module");
+ -- cleanup some settings, reloading this module can cause process_smacks_stanza() to stop working otherwise
+ for user, _ in pairs(host_sessions) do
+ for _, session in pairs(host_sessions[user].sessions) do
+ if session.awaiting_push_timer then session.awaiting_push_timer:stop(); end
+ session.awaiting_push_timer = nil;
+ session.push_queue = nil;
+ session.first_hibernated_push = nil;
+ -- check for prosody 0.12 mod_smacks
+ if session.hibernating_watchdog and session.original_smacks_callback and session.original_smacks_timeout then
+ -- restore old smacks watchdog
+ session.hibernating_watchdog:cancel();
+ session.hibernating_watchdog = watchdog.new(session.original_smacks_timeout, session.original_smacks_callback);
+ end
+ end
+ end
+ module:log("info", "Module unloaded");
+end