1 -- XEP-0313: Message Archive Management for Prosody |
|
2 -- Copyright (C) 2011-2016 Kim Alvefur |
|
3 -- |
|
4 -- This file is MIT/X11 licensed. |
|
5 |
|
6 local xmlns_mam0 = "urn:xmpp:mam:0"; |
|
7 local xmlns_mam1 = "urn:xmpp:mam:1"; |
|
8 local xmlns_mam2 = "urn:xmpp:mam:2"; |
|
9 local xmlns_delay = "urn:xmpp:delay"; |
|
10 local xmlns_forward = "urn:xmpp:forward:0"; |
|
11 local xmlns_st_id = "urn:xmpp:sid:0"; |
|
12 |
|
13 local um = require "core.usermanager"; |
|
14 local st = require "util.stanza"; |
|
15 local rsm = module:require "rsm"; |
|
16 local get_prefs = module:require"mamprefs".get; |
|
17 local set_prefs = module:require"mamprefs".set; |
|
18 local prefs_to_stanza = module:require"mamprefsxml".tostanza; |
|
19 local prefs_from_stanza = module:require"mamprefsxml".fromstanza; |
|
20 local jid_bare = require "util.jid".bare; |
|
21 local jid_split = require "util.jid".split; |
|
22 local jid_prepped_split = require "util.jid".prepped_split; |
|
23 local dataform = require "util.dataforms".new; |
|
24 local host = module.host; |
|
25 |
|
26 local rm_load_roster = require "core.rostermanager".load_roster; |
|
27 |
|
28 local getmetatable = getmetatable; |
|
29 local function is_stanza(x) |
|
30 return getmetatable(x) == st.stanza_mt; |
|
31 end |
|
32 |
|
33 local tostring = tostring; |
|
34 local time_now = os.time; |
|
35 local m_min = math.min; |
|
36 local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse; |
|
37 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50); |
|
38 local global_default_policy = module:get_option("default_archive_policy", true); |
|
39 if global_default_policy ~= "roster" then |
|
40 global_default_policy = module:get_option_boolean("default_archive_policy", global_default_policy); |
|
41 end |
|
42 |
|
43 local archive_store = module:get_option_string("archive_store", "archive2"); |
|
44 local archive = module:open_store(archive_store, "archive"); |
|
45 |
|
46 if archive.name == "null" or not archive.find then |
|
47 -- luacheck: ignore 631 |
|
48 if not archive.find then |
|
49 module:log("debug", "Attempt to open archive storage returned a valid driver but it does not seem to implement the storage API"); |
|
50 module:log("debug", "mod_%s does not support archiving", archive._provided_by or archive.name and "storage_"..archive.name.."(?)" or "<unknown>"); |
|
51 else |
|
52 module:log("debug", "Attempt to open archive storage returned null driver"); |
|
53 end |
|
54 module:log("debug", "See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information"); |
|
55 module:log("info", "Using in-memory fallback archive driver"); |
|
56 archive = module:require "fallback_archive"; |
|
57 end |
|
58 |
|
59 local use_total = true; |
|
60 |
|
61 local cleanup; |
|
62 |
|
63 local function schedule_cleanup(username) |
|
64 if cleanup and not cleanup[username] then |
|
65 table.insert(cleanup, username); |
|
66 cleanup[username] = true; |
|
67 end |
|
68 end |
|
69 |
|
70 -- Handle prefs. |
|
71 local function handle_prefs(event) |
|
72 local origin, stanza = event.origin, event.stanza; |
|
73 local xmlns_mam = stanza.tags[1].attr.xmlns; |
|
74 local user = origin.username; |
|
75 if stanza.attr.type == "get" then |
|
76 local prefs = prefs_to_stanza(get_prefs(user), xmlns_mam); |
|
77 local reply = st.reply(stanza):add_child(prefs); |
|
78 origin.send(reply); |
|
79 else -- type == "set" |
|
80 local new_prefs = stanza:get_child("prefs", xmlns_mam); |
|
81 local prefs = prefs_from_stanza(new_prefs); |
|
82 local ok, err = set_prefs(user, prefs); |
|
83 if not ok then |
|
84 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err))); |
|
85 else |
|
86 origin.send(st.reply(stanza)); |
|
87 end |
|
88 end |
|
89 return true; |
|
90 end |
|
91 |
|
92 module:hook("iq/self/"..xmlns_mam0..":prefs", handle_prefs); |
|
93 module:hook("iq/self/"..xmlns_mam1..":prefs", handle_prefs); |
|
94 module:hook("iq/self/"..xmlns_mam2..":prefs", handle_prefs); |
|
95 |
|
96 local query_form = dataform { |
|
97 { name = "FORM_TYPE"; type = "hidden"; value = xmlns_mam0; }; |
|
98 { name = "with"; type = "jid-single"; }; |
|
99 { name = "start"; type = "text-single" }; |
|
100 { name = "end"; type = "text-single"; }; |
|
101 }; |
|
102 |
|
103 -- Serve form |
|
104 local function handle_get_form(event) |
|
105 local origin, stanza = event.origin, event.stanza; |
|
106 local xmlns_mam = stanza.tags[1].attr.xmlns; |
|
107 query_form[1].value = xmlns_mam; |
|
108 origin.send(st.reply(stanza):query(xmlns_mam):add_child(query_form:form())); |
|
109 return true; |
|
110 end |
|
111 |
|
112 module:hook("iq-get/self/"..xmlns_mam0..":query", handle_get_form); |
|
113 module:hook("iq-get/self/"..xmlns_mam1..":query", handle_get_form); |
|
114 module:hook("iq-get/self/"..xmlns_mam2..":query", handle_get_form); |
|
115 |
|
116 -- Handle archive queries |
|
117 local function handle_mam_query(event) |
|
118 local origin, stanza = event.origin, event.stanza; |
|
119 local xmlns_mam = stanza.tags[1].attr.xmlns; |
|
120 local query = stanza.tags[1]; |
|
121 local qid = query.attr.queryid; |
|
122 |
|
123 origin.mam_requested = true; |
|
124 |
|
125 schedule_cleanup(origin.username); |
|
126 |
|
127 -- Search query parameters |
|
128 local qwith, qstart, qend; |
|
129 local form = query:get_child("x", "jabber:x:data"); |
|
130 if form then |
|
131 local err; |
|
132 query_form[1].value = xmlns_mam; |
|
133 form, err = query_form:data(form); |
|
134 if err then |
|
135 origin.send(st.error_reply(stanza, "modify", "bad-request", select(2, next(err)))); |
|
136 return true; |
|
137 end |
|
138 qwith, qstart, qend = form["with"], form["start"], form["end"]; |
|
139 qwith = qwith and jid_bare(qwith); -- dataforms does jidprep |
|
140 end |
|
141 |
|
142 if qstart or qend then -- Validate timestamps |
|
143 local vstart, vend = (qstart and timestamp_parse(qstart)), (qend and timestamp_parse(qend)); |
|
144 if (qstart and not vstart) or (qend and not vend) then |
|
145 origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid timestamp")) |
|
146 return true; |
|
147 end |
|
148 qstart, qend = vstart, vend; |
|
149 end |
|
150 |
|
151 module:log("debug", "Archive query, id %s with %s from %s until %s)", |
|
152 tostring(qid), qwith or "anyone", qstart or "the dawn of time", qend or "now"); |
|
153 |
|
154 -- RSM stuff |
|
155 local qset = rsm.get(query); |
|
156 local qmax = m_min(qset and qset.max or default_max_items, max_max_items); |
|
157 local reverse = qset and qset.before or false; |
|
158 local before, after = qset and qset.before, qset and qset.after; |
|
159 if type(before) ~= "string" then before = nil; end |
|
160 |
|
161 -- Load all the data! |
|
162 local data, err = archive:find(origin.username, { |
|
163 start = qstart; ["end"] = qend; -- Time range |
|
164 with = qwith; |
|
165 limit = qmax + 1; |
|
166 before = before; after = after; |
|
167 reverse = reverse; |
|
168 total = use_total; |
|
169 }); |
|
170 |
|
171 if not data then |
|
172 origin.send(st.error_reply(stanza, "cancel", "internal-server-error", err)); |
|
173 return true; |
|
174 end |
|
175 local total = tonumber(err); |
|
176 |
|
177 if xmlns_mam == xmlns_mam0 then |
|
178 origin.send(st.reply(stanza)); |
|
179 end |
|
180 local msg_reply_attr = { to = stanza.attr.from, from = stanza.attr.to }; |
|
181 |
|
182 local results = {}; |
|
183 |
|
184 -- Wrap it in stuff and deliver |
|
185 local first, last; |
|
186 local count = 0; |
|
187 local complete = "true"; |
|
188 for id, item, when in data do |
|
189 count = count + 1; |
|
190 if count > qmax then |
|
191 complete = nil; |
|
192 break; |
|
193 end |
|
194 local fwd_st = st.message(msg_reply_attr) |
|
195 :tag("result", { xmlns = xmlns_mam, queryid = qid, id = id }) |
|
196 :tag("forwarded", { xmlns = xmlns_forward }) |
|
197 :tag("delay", { xmlns = xmlns_delay, stamp = timestamp(when) }):up(); |
|
198 |
|
199 if not is_stanza(item) then |
|
200 item = st.deserialize(item); |
|
201 end |
|
202 item.attr.xmlns = "jabber:client"; |
|
203 fwd_st:add_child(item); |
|
204 |
|
205 if not first then first = id; end |
|
206 last = id; |
|
207 |
|
208 if reverse then |
|
209 results[count] = fwd_st; |
|
210 else |
|
211 origin.send(fwd_st); |
|
212 end |
|
213 end |
|
214 |
|
215 if reverse then |
|
216 for i = #results, 1, -1 do |
|
217 origin.send(results[i]); |
|
218 end |
|
219 first, last = last, first; |
|
220 end |
|
221 |
|
222 -- That's all folks! |
|
223 module:log("debug", "Archive query %s completed", tostring(qid)); |
|
224 |
|
225 local fin; |
|
226 if xmlns_mam == xmlns_mam0 then |
|
227 fin = st.message(msg_reply_attr); |
|
228 else |
|
229 fin = st.reply(stanza); |
|
230 end |
|
231 do |
|
232 fin:tag("fin", { xmlns = xmlns_mam, queryid = qid, complete = complete }) |
|
233 :add_child(rsm.generate { |
|
234 first = first, last = last, count = total }) |
|
235 end |
|
236 origin.send(fin); |
|
237 return true; |
|
238 end |
|
239 module:hook("iq-set/self/"..xmlns_mam0..":query", handle_mam_query); |
|
240 module:hook("iq-set/self/"..xmlns_mam1..":query", handle_mam_query); |
|
241 module:hook("iq-set/self/"..xmlns_mam2..":query", handle_mam_query); |
|
242 |
|
243 local function has_in_roster(user, who) |
|
244 local roster = rm_load_roster(user, host); |
|
245 module:log("debug", "%s has %s in roster? %s", user, who, roster[who] and "yes" or "no"); |
|
246 return roster[who]; |
|
247 end |
|
248 |
|
249 local function shall_store(user, who) |
|
250 -- TODO Cache this? |
|
251 if not um.user_exists(user, host) then |
|
252 return false; |
|
253 end |
|
254 local prefs = get_prefs(user); |
|
255 local rule = prefs[who]; |
|
256 module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule)); |
|
257 if rule ~= nil then |
|
258 return rule; |
|
259 end |
|
260 -- Below could be done by a metatable |
|
261 local default = prefs[false]; |
|
262 module:log("debug", "%s's default rule is %s", user, tostring(default)); |
|
263 if default == nil then |
|
264 default = global_default_policy; |
|
265 module:log("debug", "Using global default rule, %s", tostring(default)); |
|
266 end |
|
267 if default == "roster" then |
|
268 return has_in_roster(user, who); |
|
269 end |
|
270 return default; |
|
271 end |
|
272 |
|
273 -- Handle messages |
|
274 local function message_handler(event, c2s) |
|
275 local origin, stanza = event.origin, event.stanza; |
|
276 local log = c2s and origin.log or module._log; |
|
277 local orig_type = stanza.attr.type or "normal"; |
|
278 local orig_from = stanza.attr.from; |
|
279 local orig_to = stanza.attr.to or orig_from; |
|
280 -- Stanza without 'to' are treated as if it was to their own bare jid |
|
281 |
|
282 -- Whos storage do we put it in? |
|
283 local store_user = c2s and origin.username or jid_split(orig_to); |
|
284 -- And who are they chatting with? |
|
285 local with = jid_bare(c2s and orig_to or orig_from); |
|
286 |
|
287 -- Filter out <stanza-id> that claim to be from us |
|
288 if stanza:get_child("stanza-id", xmlns_st_id) then |
|
289 stanza = st.clone(stanza); |
|
290 stanza:maptags(function (tag) |
|
291 if tag.name == "stanza-id" and tag.attr.xmlns == xmlns_st_id then |
|
292 local by_user, by_host, res = jid_prepped_split(tag.attr.by); |
|
293 if not res and by_host == module.host and by_user == store_user then |
|
294 return nil; |
|
295 end |
|
296 end |
|
297 return tag; |
|
298 end); |
|
299 event.stanza = stanza; |
|
300 end |
|
301 |
|
302 -- We store chat messages or normal messages that have a body |
|
303 if not(orig_type == "chat" or (orig_type == "normal" and stanza:get_child("body")) ) then |
|
304 log("debug", "Not archiving stanza: %s (type)", stanza:top_tag()); |
|
305 return; |
|
306 end |
|
307 |
|
308 -- or if hints suggest we shouldn't |
|
309 if not stanza:get_child("store", "urn:xmpp:hints") then -- No hint telling us we should store |
|
310 if stanza:get_child("no-permanent-store", "urn:xmpp:hints") |
|
311 or stanza:get_child("no-store", "urn:xmpp:hints") then -- Hint telling us we should NOT store |
|
312 log("debug", "Not archiving stanza: %s (hint)", stanza:top_tag()); |
|
313 return; |
|
314 end |
|
315 end |
|
316 |
|
317 -- Check with the users preferences |
|
318 if shall_store(store_user, with) then |
|
319 log("debug", "Archiving stanza: %s", stanza:top_tag()); |
|
320 |
|
321 -- And stash it |
|
322 local ok = archive:append(store_user, nil, stanza, time_now(), with); |
|
323 if ok then |
|
324 local clone_for_other_handlers = st.clone(stanza); |
|
325 local id = ok; |
|
326 clone_for_other_handlers:tag("stanza-id", { xmlns = xmlns_st_id, by = store_user.."@"..host, id = id }):up(); |
|
327 event.stanza = clone_for_other_handlers; |
|
328 schedule_cleanup(store_user); |
|
329 module:fire_event("archive-message-added", { origin = origin, stanza = stanza, for_user = store_user, id = id }); |
|
330 end |
|
331 else |
|
332 log("debug", "Not archiving stanza: %s (prefs)", stanza:top_tag()); |
|
333 end |
|
334 end |
|
335 |
|
336 local function c2s_message_handler(event) |
|
337 return message_handler(event, true); |
|
338 end |
|
339 |
|
340 local function strip_stanza_id(event) |
|
341 local strip_by = jid_bare(event.origin.full_jid); |
|
342 event.stanza = st.clone(event.stanza); |
|
343 event.stanza:maptags(function(tag) |
|
344 if not ( tag.attr.xmlns == xmlns_st_id and tag.attr.by == strip_by ) then |
|
345 return tag; |
|
346 end |
|
347 end); |
|
348 end |
|
349 |
|
350 module:hook("pre-message/bare", strip_stanza_id, 0.01); |
|
351 module:hook("pre-message/full", strip_stanza_id, 0.01); |
|
352 |
|
353 local cleanup_after = module:get_option_string("archive_expires_after", "1w"); |
|
354 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60); |
|
355 if cleanup_after ~= "never" then |
|
356 local day = 86400; |
|
357 local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day }; |
|
358 local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)"); |
|
359 if not n then |
|
360 module:log("error", "Could not parse archive_expires_after string %q", cleanup_after); |
|
361 return false; |
|
362 end |
|
363 |
|
364 cleanup_after = tonumber(n) * ( multipliers[m] or 1 ); |
|
365 |
|
366 module:log("debug", "archive_expires_after = %d -- in seconds", cleanup_after); |
|
367 |
|
368 if not archive.delete then |
|
369 module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by); |
|
370 return false; |
|
371 end |
|
372 |
|
373 -- Set of known users to do message expiry for |
|
374 -- Populated either below or when new messages are added |
|
375 cleanup = {}; |
|
376 |
|
377 -- Iterating over users is not supported by all authentication modules |
|
378 -- Catch and ignore error if not supported |
|
379 pcall(function () |
|
380 -- If this works, then we schedule cleanup for all known users on startup |
|
381 for user in um.users(module.host) do |
|
382 schedule_cleanup(user); |
|
383 end |
|
384 end); |
|
385 |
|
386 -- At odd intervals, delete old messages for one user |
|
387 module:add_timer(math.random(10, 60), function() |
|
388 local user = table.remove(cleanup, 1); |
|
389 if user then |
|
390 module:log("debug", "Removing old messages for user %q", user); |
|
391 local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; }) |
|
392 if not ok then |
|
393 module:log("warn", "Could not expire archives for user %s: %s", user, err); |
|
394 else |
|
395 -- :affected() is a recent addition for eg SQLite3 in LuaDBI |
|
396 pcall(function(stmt) |
|
397 module:log("debug", "Removed %d messages", stmt:affected()); |
|
398 end, err); |
|
399 end |
|
400 cleanup[user] = nil; |
|
401 end |
|
402 return math.random(cleanup_interval, cleanup_interval * 2); |
|
403 end); |
|
404 else |
|
405 -- Don't ask the backend to count the potentially unbounded number of items, |
|
406 -- it'll get slow. |
|
407 use_total = false; |
|
408 end |
|
409 |
|
410 -- Stanzas sent by local clients |
|
411 local priority = 0.075 |
|
412 assert(priority < 0.1, "priority must be after mod_firewall"); |
|
413 assert(priority > 0.05, "priority must be before mod_carbons"); |
|
414 assert(priority > 0.01, "priority must be before strip_stanza_id"); |
|
415 module:hook("pre-message/bare", c2s_message_handler, priority); |
|
416 module:hook("pre-message/full", c2s_message_handler, priority); |
|
417 -- Stanszas to local clients |
|
418 priority = 0.075 |
|
419 assert(priority > 0, "priority must be before mod_message"); |
|
420 assert(priority < 0.1, "priority must be after mod_firewall"); |
|
421 assert(priority > 0.05, "priority must be before mod_carbons"); |
|
422 module:hook("message/bare", message_handler, priority); |
|
423 module:hook("message/full", message_handler, priority); |
|
424 |
|
425 module:add_feature(xmlns_mam0); -- COMPAT with XEP-0313 v 0.1 |
|
426 |
|
427 module:hook("account-disco-info", function(event) |
|
428 (event.reply or event.stanza):tag("feature", {var=xmlns_mam0}):up(); |
|
429 (event.reply or event.stanza):tag("feature", {var=xmlns_mam1}):up(); |
|
430 (event.reply or event.stanza):tag("feature", {var=xmlns_mam2}):up(); |
|
431 (event.reply or event.stanza):tag("feature", {var=xmlns_st_id}):up(); |
|
432 end); |
|
433 |
|