mod_mam/mod_mam.lua
changeset 4787 6ca1117b81a5
parent 4786 b935276ab1b4
child 4788 9a41cf08de24
equal deleted inserted replaced
4786:b935276ab1b4 4787:6ca1117b81a5
     1 -- XEP-0313: Message Archive Management for Prosody
       
     2 -- Copyright (C) 2011-2016 Kim Alvefur
       
     3 --
       
     4 -- This file is MIT/X11 licensed.
       
     5 
       
     6 local xmlns_mam0    = "urn:xmpp:mam:0";
       
     7 local xmlns_mam1    = "urn:xmpp:mam:1";
       
     8 local xmlns_mam2    = "urn:xmpp:mam:2";
       
     9 local xmlns_delay   = "urn:xmpp:delay";
       
    10 local xmlns_forward = "urn:xmpp:forward:0";
       
    11 local xmlns_st_id   = "urn:xmpp:sid:0";
       
    12 
       
    13 local um = require "core.usermanager";
       
    14 local st = require "util.stanza";
       
    15 local rsm = module:require "rsm";
       
    16 local get_prefs = module:require"mamprefs".get;
       
    17 local set_prefs = module:require"mamprefs".set;
       
    18 local prefs_to_stanza = module:require"mamprefsxml".tostanza;
       
    19 local prefs_from_stanza = module:require"mamprefsxml".fromstanza;
       
    20 local jid_bare = require "util.jid".bare;
       
    21 local jid_split = require "util.jid".split;
       
    22 local jid_prepped_split = require "util.jid".prepped_split;
       
    23 local dataform = require "util.dataforms".new;
       
    24 local host = module.host;
       
    25 
       
    26 local rm_load_roster = require "core.rostermanager".load_roster;
       
    27 
       
    28 local getmetatable = getmetatable;
       
    29 local function is_stanza(x)
       
    30 	return getmetatable(x) == st.stanza_mt;
       
    31 end
       
    32 
       
    33 local tostring = tostring;
       
    34 local time_now = os.time;
       
    35 local m_min = math.min;
       
    36 local timestamp, timestamp_parse = require "util.datetime".datetime, require "util.datetime".parse;
       
    37 local default_max_items, max_max_items = 20, module:get_option_number("max_archive_query_results", 50);
       
    38 local global_default_policy = module:get_option("default_archive_policy", true);
       
    39 if global_default_policy ~= "roster" then
       
    40 	global_default_policy = module:get_option_boolean("default_archive_policy", global_default_policy);
       
    41 end
       
    42 
       
    43 local archive_store = module:get_option_string("archive_store", "archive2");
       
    44 local archive = module:open_store(archive_store, "archive");
       
    45 
       
    46 if archive.name == "null" or not archive.find then
       
    47 	-- luacheck: ignore 631
       
    48 	if not archive.find then
       
    49 		module:log("debug", "Attempt to open archive storage returned a valid driver but it does not seem to implement the storage API");
       
    50 		module:log("debug", "mod_%s does not support archiving", archive._provided_by or archive.name and "storage_"..archive.name.."(?)" or "<unknown>");
       
    51 	else
       
    52 		module:log("debug", "Attempt to open archive storage returned null driver");
       
    53 	end
       
    54 	module:log("debug", "See https://prosody.im/doc/storage and https://prosody.im/doc/archiving for more information");
       
    55 	module:log("info", "Using in-memory fallback archive driver");
       
    56 	archive = module:require "fallback_archive";
       
    57 end
       
    58 
       
    59 local use_total = true;
       
    60 
       
    61 local cleanup;
       
    62 
       
    63 local function schedule_cleanup(username)
       
    64 	if cleanup and not cleanup[username] then
       
    65 		table.insert(cleanup, username);
       
    66 		cleanup[username] = true;
       
    67 	end
       
    68 end
       
    69 
       
    70 -- Handle prefs.
       
    71 local function handle_prefs(event)
       
    72 	local origin, stanza = event.origin, event.stanza;
       
    73 	local xmlns_mam = stanza.tags[1].attr.xmlns;
       
    74 	local user = origin.username;
       
    75 	if stanza.attr.type == "get" then
       
    76 		local prefs = prefs_to_stanza(get_prefs(user), xmlns_mam);
       
    77 		local reply = st.reply(stanza):add_child(prefs);
       
    78 		origin.send(reply);
       
    79 	else -- type == "set"
       
    80 		local new_prefs = stanza:get_child("prefs", xmlns_mam);
       
    81 		local prefs = prefs_from_stanza(new_prefs);
       
    82 		local ok, err = set_prefs(user, prefs);
       
    83 		if not ok then
       
    84 			origin.send(st.error_reply(stanza, "cancel", "internal-server-error", "Error storing preferences: "..tostring(err)));
       
    85 		else
       
    86 			origin.send(st.reply(stanza));
       
    87 		end
       
    88 	end
       
    89 	return true;
       
    90 end
       
    91 
       
    92 module:hook("iq/self/"..xmlns_mam0..":prefs", handle_prefs);
       
    93 module:hook("iq/self/"..xmlns_mam1..":prefs", handle_prefs);
       
    94 module:hook("iq/self/"..xmlns_mam2..":prefs", handle_prefs);
       
    95 
       
    96 local query_form = dataform {
       
    97 	{ name = "FORM_TYPE"; type = "hidden"; value = xmlns_mam0; };
       
    98 	{ name = "with"; type = "jid-single"; };
       
    99 	{ name = "start"; type = "text-single" };
       
   100 	{ name = "end"; type = "text-single"; };
       
   101 };
       
   102 
       
   103 -- Serve form
       
   104 local function handle_get_form(event)
       
   105 	local origin, stanza = event.origin, event.stanza;
       
   106 	local xmlns_mam = stanza.tags[1].attr.xmlns;
       
   107 	query_form[1].value = xmlns_mam;
       
   108 	origin.send(st.reply(stanza):query(xmlns_mam):add_child(query_form:form()));
       
   109 	return true;
       
   110 end
       
   111 
       
   112 module:hook("iq-get/self/"..xmlns_mam0..":query", handle_get_form);
       
   113 module:hook("iq-get/self/"..xmlns_mam1..":query", handle_get_form);
       
   114 module:hook("iq-get/self/"..xmlns_mam2..":query", handle_get_form);
       
   115 
       
   116 -- Handle archive queries
       
   117 local function handle_mam_query(event)
       
   118 	local origin, stanza = event.origin, event.stanza;
       
   119 	local xmlns_mam = stanza.tags[1].attr.xmlns;
       
   120 	local query = stanza.tags[1];
       
   121 	local qid = query.attr.queryid;
       
   122 
       
   123 	origin.mam_requested = true;
       
   124 
       
   125 	schedule_cleanup(origin.username);
       
   126 
       
   127 	-- Search query parameters
       
   128 	local qwith, qstart, qend;
       
   129 	local form = query:get_child("x", "jabber:x:data");
       
   130 	if form then
       
   131 		local err;
       
   132 		query_form[1].value = xmlns_mam;
       
   133 		form, err = query_form:data(form);
       
   134 		if err then
       
   135 			origin.send(st.error_reply(stanza, "modify", "bad-request", select(2, next(err))));
       
   136 			return true;
       
   137 		end
       
   138 		qwith, qstart, qend = form["with"], form["start"], form["end"];
       
   139 		qwith = qwith and jid_bare(qwith); -- dataforms does jidprep
       
   140 	end
       
   141 
       
   142 	if qstart or qend then -- Validate timestamps
       
   143 		local vstart, vend = (qstart and timestamp_parse(qstart)), (qend and timestamp_parse(qend));
       
   144 		if (qstart and not vstart) or (qend and not vend) then
       
   145 			origin.send(st.error_reply(stanza, "modify", "bad-request", "Invalid timestamp"))
       
   146 			return true;
       
   147 		end
       
   148 		qstart, qend = vstart, vend;
       
   149 	end
       
   150 
       
   151 	module:log("debug", "Archive query, id %s with %s from %s until %s)",
       
   152 		tostring(qid), qwith or "anyone", qstart or "the dawn of time", qend or "now");
       
   153 
       
   154 	-- RSM stuff
       
   155 	local qset = rsm.get(query);
       
   156 	local qmax = m_min(qset and qset.max or default_max_items, max_max_items);
       
   157 	local reverse = qset and qset.before or false;
       
   158 	local before, after = qset and qset.before, qset and qset.after;
       
   159 	if type(before) ~= "string" then before = nil; end
       
   160 
       
   161 	-- Load all the data!
       
   162 	local data, err = archive:find(origin.username, {
       
   163 		start = qstart; ["end"] = qend; -- Time range
       
   164 		with = qwith;
       
   165 		limit = qmax + 1;
       
   166 		before = before; after = after;
       
   167 		reverse = reverse;
       
   168 		total = use_total;
       
   169 	});
       
   170 
       
   171 	if not data then
       
   172 		origin.send(st.error_reply(stanza, "cancel", "internal-server-error", err));
       
   173 		return true;
       
   174 	end
       
   175 	local total = tonumber(err);
       
   176 
       
   177 	if xmlns_mam == xmlns_mam0 then
       
   178 		origin.send(st.reply(stanza));
       
   179 	end
       
   180 	local msg_reply_attr = { to = stanza.attr.from, from = stanza.attr.to };
       
   181 
       
   182 	local results = {};
       
   183 
       
   184 	-- Wrap it in stuff and deliver
       
   185 	local first, last;
       
   186 	local count = 0;
       
   187 	local complete = "true";
       
   188 	for id, item, when in data do
       
   189 		count = count + 1;
       
   190 		if count > qmax then
       
   191 			complete = nil;
       
   192 			break;
       
   193 		end
       
   194 		local fwd_st = st.message(msg_reply_attr)
       
   195 			:tag("result", { xmlns = xmlns_mam, queryid = qid, id = id })
       
   196 				:tag("forwarded", { xmlns = xmlns_forward })
       
   197 					:tag("delay", { xmlns = xmlns_delay, stamp = timestamp(when) }):up();
       
   198 
       
   199 		if not is_stanza(item) then
       
   200 			item = st.deserialize(item);
       
   201 		end
       
   202 		item.attr.xmlns = "jabber:client";
       
   203 		fwd_st:add_child(item);
       
   204 
       
   205 		if not first then first = id; end
       
   206 		last = id;
       
   207 
       
   208 		if reverse then
       
   209 			results[count] = fwd_st;
       
   210 		else
       
   211 			origin.send(fwd_st);
       
   212 		end
       
   213 	end
       
   214 
       
   215 	if reverse then
       
   216 		for i = #results, 1, -1 do
       
   217 			origin.send(results[i]);
       
   218 		end
       
   219 		first, last = last, first;
       
   220 	end
       
   221 
       
   222 	-- That's all folks!
       
   223 	module:log("debug", "Archive query %s completed", tostring(qid));
       
   224 
       
   225 	local fin;
       
   226 	if xmlns_mam == xmlns_mam0 then
       
   227 		fin = st.message(msg_reply_attr);
       
   228 	else
       
   229 		fin = st.reply(stanza);
       
   230 	end
       
   231 	do
       
   232 		fin:tag("fin", { xmlns = xmlns_mam, queryid = qid, complete = complete })
       
   233 			:add_child(rsm.generate {
       
   234 				first = first, last = last, count = total })
       
   235 	end
       
   236 	origin.send(fin);
       
   237 	return true;
       
   238 end
       
   239 module:hook("iq-set/self/"..xmlns_mam0..":query", handle_mam_query);
       
   240 module:hook("iq-set/self/"..xmlns_mam1..":query", handle_mam_query);
       
   241 module:hook("iq-set/self/"..xmlns_mam2..":query", handle_mam_query);
       
   242 
       
   243 local function has_in_roster(user, who)
       
   244 	local roster = rm_load_roster(user, host);
       
   245 	module:log("debug", "%s has %s in roster? %s", user, who, roster[who] and "yes" or "no");
       
   246 	return roster[who];
       
   247 end
       
   248 
       
   249 local function shall_store(user, who)
       
   250 	-- TODO Cache this?
       
   251 	if not um.user_exists(user, host) then
       
   252 		return false;
       
   253 	end
       
   254 	local prefs = get_prefs(user);
       
   255 	local rule = prefs[who];
       
   256 	module:log("debug", "%s's rule for %s is %s", user, who, tostring(rule));
       
   257 	if rule ~= nil then
       
   258 		return rule;
       
   259 	end
       
   260 	-- Below could be done by a metatable
       
   261 	local default = prefs[false];
       
   262 	module:log("debug", "%s's default rule is %s", user, tostring(default));
       
   263 	if default == nil then
       
   264 		default = global_default_policy;
       
   265 		module:log("debug", "Using global default rule, %s", tostring(default));
       
   266 	end
       
   267 	if default == "roster" then
       
   268 		return has_in_roster(user, who);
       
   269 	end
       
   270 	return default;
       
   271 end
       
   272 
       
   273 -- Handle messages
       
   274 local function message_handler(event, c2s)
       
   275 	local origin, stanza = event.origin, event.stanza;
       
   276 	local log = c2s and origin.log or module._log;
       
   277 	local orig_type = stanza.attr.type or "normal";
       
   278 	local orig_from = stanza.attr.from;
       
   279 	local orig_to = stanza.attr.to or orig_from;
       
   280 	-- Stanza without 'to' are treated as if it was to their own bare jid
       
   281 
       
   282 	-- Whos storage do we put it in?
       
   283 	local store_user = c2s and origin.username or jid_split(orig_to);
       
   284 	-- And who are they chatting with?
       
   285 	local with = jid_bare(c2s and orig_to or orig_from);
       
   286 
       
   287 	-- Filter out <stanza-id> that claim to be from us
       
   288 	if stanza:get_child("stanza-id", xmlns_st_id) then
       
   289 		stanza = st.clone(stanza);
       
   290 		stanza:maptags(function (tag)
       
   291 			if tag.name == "stanza-id" and tag.attr.xmlns == xmlns_st_id then
       
   292 				local by_user, by_host, res = jid_prepped_split(tag.attr.by);
       
   293 				if not res and by_host == module.host and by_user == store_user then
       
   294 					return nil;
       
   295 				end
       
   296 			end
       
   297 			return tag;
       
   298 		end);
       
   299 		event.stanza = stanza;
       
   300 	end
       
   301 
       
   302 	-- We store chat messages or normal messages that have a body
       
   303 	if not(orig_type == "chat" or (orig_type == "normal" and stanza:get_child("body")) ) then
       
   304 		log("debug", "Not archiving stanza: %s (type)", stanza:top_tag());
       
   305 		return;
       
   306 	end
       
   307 
       
   308 	-- or if hints suggest we shouldn't
       
   309 	if not stanza:get_child("store", "urn:xmpp:hints") then -- No hint telling us we should store
       
   310 		if stanza:get_child("no-permanent-store", "urn:xmpp:hints")
       
   311 			or stanza:get_child("no-store", "urn:xmpp:hints") then -- Hint telling us we should NOT store
       
   312 			log("debug", "Not archiving stanza: %s (hint)", stanza:top_tag());
       
   313 			return;
       
   314 		end
       
   315 	end
       
   316 
       
   317 	-- Check with the users preferences
       
   318 	if shall_store(store_user, with) then
       
   319 		log("debug", "Archiving stanza: %s", stanza:top_tag());
       
   320 
       
   321 		-- And stash it
       
   322 		local ok = archive:append(store_user, nil, stanza, time_now(), with);
       
   323 		if ok then
       
   324 			local clone_for_other_handlers = st.clone(stanza);
       
   325 			local id = ok;
       
   326 			clone_for_other_handlers:tag("stanza-id", { xmlns = xmlns_st_id, by = store_user.."@"..host, id = id }):up();
       
   327 			event.stanza = clone_for_other_handlers;
       
   328 			schedule_cleanup(store_user);
       
   329 			module:fire_event("archive-message-added", { origin = origin, stanza = stanza, for_user = store_user, id = id });
       
   330 		end
       
   331 	else
       
   332 		log("debug", "Not archiving stanza: %s (prefs)", stanza:top_tag());
       
   333 	end
       
   334 end
       
   335 
       
   336 local function c2s_message_handler(event)
       
   337 	return message_handler(event, true);
       
   338 end
       
   339 
       
   340 local function strip_stanza_id(event)
       
   341 	local strip_by = jid_bare(event.origin.full_jid);
       
   342 	event.stanza = st.clone(event.stanza);
       
   343 	event.stanza:maptags(function(tag)
       
   344 		if not ( tag.attr.xmlns == xmlns_st_id and tag.attr.by == strip_by ) then
       
   345 			return tag;
       
   346 		end
       
   347 	end);
       
   348 end
       
   349 
       
   350 module:hook("pre-message/bare", strip_stanza_id, 0.01);
       
   351 module:hook("pre-message/full", strip_stanza_id, 0.01);
       
   352 
       
   353 local cleanup_after = module:get_option_string("archive_expires_after", "1w");
       
   354 local cleanup_interval = module:get_option_number("archive_cleanup_interval", 4 * 60 * 60);
       
   355 if cleanup_after ~= "never" then
       
   356 	local day = 86400;
       
   357 	local multipliers = { d = day, w = day * 7, m = 31 * day, y = 365.2425 * day };
       
   358 	local n, m = cleanup_after:lower():match("(%d+)%s*([dwmy]?)");
       
   359 	if not n then
       
   360 		module:log("error", "Could not parse archive_expires_after string %q", cleanup_after);
       
   361 		return false;
       
   362 	end
       
   363 
       
   364 	cleanup_after = tonumber(n) * ( multipliers[m] or 1 );
       
   365 
       
   366 	module:log("debug", "archive_expires_after = %d -- in seconds", cleanup_after);
       
   367 
       
   368 	if not archive.delete then
       
   369 		module:log("error", "archive_expires_after set but mod_%s does not support deleting", archive._provided_by);
       
   370 		return false;
       
   371 	end
       
   372 
       
   373 	-- Set of known users to do message expiry for
       
   374 	-- Populated either below or when new messages are added
       
   375 	cleanup = {};
       
   376 
       
   377 	-- Iterating over users is not supported by all authentication modules
       
   378 	-- Catch and ignore error if not supported
       
   379 	pcall(function ()
       
   380 		-- If this works, then we schedule cleanup for all known users on startup
       
   381 		for user in um.users(module.host) do
       
   382 			schedule_cleanup(user);
       
   383 		end
       
   384 	end);
       
   385 
       
   386 	-- At odd intervals, delete old messages for one user
       
   387 	module:add_timer(math.random(10, 60), function()
       
   388 		local user = table.remove(cleanup, 1);
       
   389 		if user then
       
   390 			module:log("debug", "Removing old messages for user %q", user);
       
   391 			local ok, err = archive:delete(user, { ["end"] = os.time() - cleanup_after; })
       
   392 			if not ok then
       
   393 				module:log("warn", "Could not expire archives for user %s: %s", user, err);
       
   394 			else
       
   395 				-- :affected() is a recent addition for eg SQLite3 in LuaDBI
       
   396 				pcall(function(stmt)
       
   397 					module:log("debug", "Removed %d messages", stmt:affected());
       
   398 				end, err);
       
   399 			end
       
   400 			cleanup[user] = nil;
       
   401 		end
       
   402 		return math.random(cleanup_interval, cleanup_interval * 2);
       
   403 	end);
       
   404 else
       
   405 	-- Don't ask the backend to count the potentially unbounded number of items,
       
   406 	-- it'll get slow.
       
   407 	use_total = false;
       
   408 end
       
   409 
       
   410 -- Stanzas sent by local clients
       
   411 local priority = 0.075
       
   412 assert(priority < 0.1, "priority must be after mod_firewall");
       
   413 assert(priority > 0.05, "priority must be before mod_carbons");
       
   414 assert(priority > 0.01, "priority must be before strip_stanza_id");
       
   415 module:hook("pre-message/bare", c2s_message_handler, priority);
       
   416 module:hook("pre-message/full", c2s_message_handler, priority);
       
   417 -- Stanszas to local clients
       
   418 priority = 0.075
       
   419 assert(priority > 0, "priority must be before mod_message");
       
   420 assert(priority < 0.1, "priority must be after mod_firewall");
       
   421 assert(priority > 0.05, "priority must be before mod_carbons");
       
   422 module:hook("message/bare", message_handler, priority);
       
   423 module:hook("message/full", message_handler, priority);
       
   424 
       
   425 module:add_feature(xmlns_mam0); -- COMPAT with XEP-0313 v 0.1
       
   426 
       
   427 module:hook("account-disco-info", function(event)
       
   428 	(event.reply or event.stanza):tag("feature", {var=xmlns_mam0}):up();
       
   429 	(event.reply or event.stanza):tag("feature", {var=xmlns_mam1}):up();
       
   430 	(event.reply or event.stanza):tag("feature", {var=xmlns_mam2}):up();
       
   431 	(event.reply or event.stanza):tag("feature", {var=xmlns_st_id}):up();
       
   432 end);
       
   433