|
1 local array = require "util.array"; |
|
2 local id = require "util.id"; |
|
3 local it = require "util.iterators"; |
|
4 local set = require "util.set"; |
|
5 local st = require "util.stanza"; |
|
6 |
|
7 module:depends("pubsub_subscription"); |
|
8 |
|
9 local function new_rtbl_subscription(rtbl_service_jid, rtbl_node, handlers) |
|
10 local items = {}; |
|
11 |
|
12 local function notify(event_type, hash) |
|
13 local handler = handlers[event_type]; |
|
14 if not handler then return; end |
|
15 handler(hash); |
|
16 end |
|
17 |
|
18 module:add_item("pubsub-subscription", { |
|
19 service = rtbl_service_jid; |
|
20 node = rtbl_node; |
|
21 |
|
22 -- Callbacks: |
|
23 on_subscribed = function() |
|
24 module:log("info", "RTBL active: %s:%s", rtbl_service_jid, rtbl_node); |
|
25 end; |
|
26 |
|
27 on_error = function(err) |
|
28 module:log( |
|
29 "error", |
|
30 "Failed to subscribe to RTBL: %s:%s %s::%s: %s", |
|
31 rtbl_service_jid, |
|
32 rtbl_node, |
|
33 err.type, |
|
34 err.condition, |
|
35 err.text |
|
36 ); |
|
37 end; |
|
38 |
|
39 on_item = function(event) |
|
40 local hash = event.item.attr.id; |
|
41 if not hash then return; end |
|
42 module:log("debug", "Received new hash from %s:%s: %s", rtbl_service_jid, rtbl_node, hash); |
|
43 items[hash] = true; |
|
44 notify("added", hash); |
|
45 end; |
|
46 |
|
47 on_retract = function (event) |
|
48 local hash = event.item.attr.id; |
|
49 if not hash then return; end |
|
50 module:log("debug", "Retracted hash from %s:%s: %s", rtbl_service_jid, rtbl_node, hash); |
|
51 items[hash] = nil; |
|
52 notify("removed", hash); |
|
53 end; |
|
54 |
|
55 purge = function() |
|
56 module:log("debug", "Purge all hashes from %s:%s", rtbl_service_jid, rtbl_node); |
|
57 for hash in pairs(items) do |
|
58 items[hash] = nil; |
|
59 notify("removed", hash); |
|
60 end |
|
61 end; |
|
62 }); |
|
63 |
|
64 local request_id = "rtbl-request-"..id.short(); |
|
65 |
|
66 local function request_list() |
|
67 local items_request = st.iq({ to = rtbl_service_jid, from = module.host, type = "get", id = request_id }) |
|
68 :tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" }) |
|
69 :tag("items", { node = rtbl_node }):up() |
|
70 :up(); |
|
71 module:send(items_request); |
|
72 end |
|
73 |
|
74 local function update_list(event) |
|
75 local from_jid = event.stanza.attr.from; |
|
76 if from_jid ~= rtbl_service_jid then |
|
77 module:log("debug", "Ignoring RTBL response from unknown sender: %s", from_jid); |
|
78 return; |
|
79 end |
|
80 local items_el = event.stanza:find("{http://jabber.org/protocol/pubsub}pubsub/items"); |
|
81 if not items_el then |
|
82 module:log("warn", "Invalid items response from RTBL service %s:%s", rtbl_service_jid, rtbl_node); |
|
83 return; |
|
84 end |
|
85 |
|
86 local old_entries = set.new(array.collect(it.keys(items))); |
|
87 |
|
88 local n_added, n_removed, n_total = 0, 0, 0; |
|
89 for item in items_el:childtags("item") do |
|
90 local hash = item.attr.id; |
|
91 if hash then |
|
92 n_total = n_total + 1; |
|
93 if not old_entries:contains(hash) then |
|
94 -- New entry |
|
95 n_added = n_added + 1; |
|
96 items[hash] = true; |
|
97 notify("added", hash); |
|
98 else |
|
99 -- Entry already existed |
|
100 old_entries:remove(hash); |
|
101 end |
|
102 end |
|
103 end |
|
104 |
|
105 -- Remove old entries that weren't in the received list |
|
106 for hash in old_entries do |
|
107 n_removed = n_removed + 1; |
|
108 items[hash] = nil; |
|
109 notify("removed", hash); |
|
110 end |
|
111 |
|
112 module:log("info", "%d RTBL entries received from %s:%s (%d added, %d removed)", n_total, from_jid, rtbl_node, n_added, n_removed); |
|
113 return true; |
|
114 end |
|
115 |
|
116 module:hook("iq-result/host/"..request_id, update_list); |
|
117 module:add_timer(0, request_list); |
|
118 end |
|
119 |
|
120 return { |
|
121 new_rtbl_subscription = new_rtbl_subscription; |
|
122 } |