mod_rest: Add special handling to catch MAM results from remote hosts
Makes MAM queries to remote hosts works.
As the comment says, MAM results from users' local archives or local
MUCs are returned via origin.send() which is provided in the event and
thus already worked. Results from remote hosts go via normal stanza
routing and events, which need this extra handling to catch.
This pattern of iq-set, message+, iq-result is generally limited to MAM.
Closest similar thing might be MUC join, but to really handle that you
would need the webhook callback mechanism.
local st = require "util.stanza";
local uuid = require "util.uuid";
local mt = require "util.multitable";
local cache = require "util.cache";
local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event";
-- TODO persist
-- TODO query known pubsub nodes to sync current subscriptions
-- TODO subscription ids per 'item' would be handy
local pending_subscription = cache.new(256); -- uuid → node
local pending_unsubscription = cache.new(256); -- uuid → node
local active_subscriptions = mt.new() -- service | node | uuid | { item }
function module.save()
return { active_subscriptions = active_subscriptions.data }
end
function module.restore(data)
if data and data.active_subscriptions then
active_subscriptions.data = data.active_subscriptions
end
end
local valid_events = {"subscribed"; "unsubscribed"; "error"; "item"; "retract"; "purge"; "delete"}
local function subscription_added(item_event)
local item = item_event.item;
assert(item.service, "pubsub subscription item MUST have a 'service' field.");
assert(item.node, "pubsub subscription item MUST have a 'node' field.");
local already_subscibed = false;
for _ in active_subscriptions:iter(item.service, item.node, nil) do -- luacheck: ignore 512
already_subscibed = true;
break
end
item._id = uuid.generate();
local iq_id = uuid.generate();
pending_subscription:set(iq_id, item._id);
active_subscriptions:set(item.service, item.node, item._id, item);
if not already_subscibed then
module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service })
:tag("pubsub", { xmlns = xmlns_pubsub })
:tag("subscribe", { jid = module.host, node = item.node }));
end
end
for _, event_name in ipairs(valid_events) do
module:hook("pubsub-event/host/"..event_name, function (event)
for _, _, _, _, cb in active_subscriptions:iter(event.service, event.node, nil, "on_"..event_name) do
pcall(cb, event);
end
end);
end
module:hook("iq/host", function (event)
local stanza = event.stanza;
local service = stanza.attr.from;
if not stanza.attr.id then return end -- shouldn't be possible
local subscribed_node = pending_subscription:get(stanza.attr.id);
pending_subscription:set(stanza.attr.id, nil);
local unsubscribed_node = pending_unsubscription:get(stanza.attr.id);
pending_unsubscription:set(stanza.attr.id, nil);
if stanza.attr.type == "result" then
local pubsub_wrapper = stanza:get_child("pubsub", xmlns_pubsub);
local subscription = pubsub_wrapper and pubsub_wrapper:get_child("subscription");
if not subscription then return end
local node = subscription.attr.node;
local what;
if subscription.attr.subscription == "subscribed" then
what = "on_subscribed";
elseif subscription.attr.subscription == "none" then
what = "on_unsubscribed";
end
if not what then return end -- there are other states but we don't handle them
for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, what) do
cb(event);
end
return true;
elseif stanza.attr.type == "error" then
local node = subscribed_node or unsubscribed_node;
local error_type, error_condition, reason, pubsub_error = stanza:get_error();
local err = { type = error_type, condition = error_condition, text = reason, extra = pubsub_error };
if active_subscriptions:get(service) then
for _, _, _, _, cb in active_subscriptions:iter(service, node, nil, "on_error") do
cb(err);
end
return true;
end
end
end, 1);
local function subscription_removed(item_event)
local item = item_event.item;
active_subscriptions:set(item.service, item.node, item._id, nil);
local node_subs = active_subscriptions:get(item.service, item.node);
if node_subs and next(node_subs) then return end
local iq_id = uuid.generate();
pending_unsubscription:set(iq_id, item._id);
module:send(st.iq({ type = "set", id = iq_id, from = module.host, to = item.service })
:tag("pubsub", { xmlns = xmlns_pubsub })
:tag("unsubscribe", { jid = module.host, node = item.node }))
end
module:handle_items("pubsub-subscription", subscription_added, subscription_removed, true);
module:hook("message/host", function(event)
local origin, stanza = event.origin, event.stanza;
local ret = nil;
local service = stanza.attr.from;
module:log("debug", "Got message/host: %s", stanza:top_tag());
for event_container in stanza:childtags("event", xmlns_pubsub_event) do
for pubsub_event in event_container:childtags() do
module:log("debug", "Got pubsub event %s", pubsub_event:top_tag());
local node = pubsub_event.attr.node;
module:fire_event("pubsub-event/host/"..pubsub_event.name, {
stanza = stanza;
origin = origin;
event = pubsub_event;
service = service;
node = node;
});
ret = true;
end
end
return ret;
end);
module:hook("pubsub-event/host/items", function (event)
for item in event.event:childtags() do
module:log("debug", "Got pubsub item event %s", item:top_tag());
event.item = item;
event.payload = item.tags[1];
module:fire_event("pubsub-event/host/"..item.name, event);
end
end);