mod_pep_plus/mod_pep_plus.lua
changeset 2805 cb2342cf3f3c
parent 2672 f8fc79b3051a
equal deleted inserted replaced
2804:8d9aed6d1f87 2805:cb2342cf3f3c
     1 local pubsub = require "util.pubsub";
     1 local pubsub = module:require "util_pubsub";
     2 local jid_bare = require "util.jid".bare;
     2 local jid_bare = require "util.jid".bare;
     3 local jid_split = require "util.jid".split;
     3 local jid_split = require "util.jid".split;
       
     4 local jid_join = require "util.jid".join;
     4 local set_new = require "util.set".new;
     5 local set_new = require "util.set".new;
     5 local st = require "util.stanza";
     6 local st = require "util.stanza";
     6 local calculate_hash = require "util.caps".calculate_hash;
     7 local calculate_hash = require "util.caps".calculate_hash;
     7 local is_contact_subscribed = require "core.rostermanager".is_contact_subscribed;
     8 local is_contact_subscribed = require "core.rostermanager".is_contact_subscribed;
       
     9 local cache = require "util.cache";
       
    10 local set = require "util.set";
     8 
    11 
     9 local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
    12 local xmlns_pubsub = "http://jabber.org/protocol/pubsub";
    10 local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event";
    13 local xmlns_pubsub_event = "http://jabber.org/protocol/pubsub#event";
    11 local xmlns_pubsub_owner = "http://jabber.org/protocol/pubsub#owner";
    14 local xmlns_pubsub_owner = "http://jabber.org/protocol/pubsub#owner";
    12 
    15 
    13 local lib_pubsub = module:require "pubsub";
    16 local lib_pubsub = module:require "pubsub";
    14 local handlers = lib_pubsub.handlers;
       
    15 local pubsub_error_reply = lib_pubsub.pubsub_error_reply;
       
    16 
    17 
    17 local empty_set = set_new();
    18 local empty_set = set_new();
    18 
    19 
    19 local services = {};
    20 local services = {};
    20 local recipients = {};
    21 local recipients = {};
    21 local hash_map = {};
    22 local hash_map = {};
    22 
    23 
       
    24 local host = module.host;
       
    25 
       
    26 local known_nodes_map = module:open_store("pep", "map");
       
    27 local known_nodes = module:open_store("pep");
       
    28 
    23 function module.save()
    29 function module.save()
    24 	return { services = services };
    30 	return { services = services };
    25 end
    31 end
    26 
    32 
    27 function module.restore(data)
    33 function module.restore(data)
    28 	services = data.services;
    34 	services = data.services;
    29 end
    35 end
    30 
    36 
    31 local function subscription_presence(user_bare, recipient)
    37 local function subscription_presence(username, recipient)
       
    38 	local user_bare = jid_join(username, host);
    32 	local recipient_bare = jid_bare(recipient);
    39 	local recipient_bare = jid_bare(recipient);
    33 	if (recipient_bare == user_bare) then return true; end
    40 	if (recipient_bare == user_bare) then return true; end
    34 	local username, host = jid_split(user_bare);
       
    35 	return is_contact_subscribed(username, host, recipient_bare);
    41 	return is_contact_subscribed(username, host, recipient_bare);
    36 end
    42 end
    37 
    43 
    38 local function get_broadcaster(name)
    44 local function simple_itemstore(username)
       
    45 	return function (config, node)
       
    46 		if config["persist_items"] then
       
    47 			module:log("debug", "Creating new persistent item store for user %s, node %q", username, node);
       
    48 			known_nodes_map:set(username, node, true);
       
    49 			local archive = module:open_store("pep_"..node, "archive");
       
    50 			return lib_pubsub.archive_itemstore(archive, config, username, node, false);
       
    51 		else
       
    52 			module:log("debug", "Creating new ephemeral item store for user %s, node %q", username, node);
       
    53 			known_nodes_map:set(username, node, nil);
       
    54 			return cache.new(tonumber(config["max_items"]));
       
    55 		end
       
    56 	end
       
    57 end
       
    58 
       
    59 local function get_broadcaster(username)
       
    60 	local user_bare = jid_join(username, host);
    39 	local function simple_broadcast(kind, node, jids, item)
    61 	local function simple_broadcast(kind, node, jids, item)
    40 		if item then
    62 		if item then
    41 			item = st.clone(item);
    63 			item = st.clone(item);
    42 			item.attr.xmlns = nil; -- Clear the pubsub namespace
    64 			item.attr.xmlns = nil; -- Clear the pubsub namespace
    43 		end
    65 		end
    44 		local message = st.message({ from = name, type = "headline" })
    66 		local message = st.message({ from = user_bare, type = "headline" })
    45 			:tag("event", { xmlns = xmlns_pubsub_event })
    67 			:tag("event", { xmlns = xmlns_pubsub_event })
    46 				:tag(kind, { node = node })
    68 				:tag(kind, { node = node })
    47 					:add_child(item);
    69 					:add_child(item);
    48 		for jid in pairs(jids) do
    70 		for jid in pairs(jids) do
    49 			module:log("debug", "Sending notification to %s from %s: %s", jid, name, tostring(item));
    71 			module:log("debug", "Sending notification to %s from %s: %s", jid, user_bare, tostring(item));
    50 			message.attr.to = jid;
    72 			message.attr.to = jid;
    51 			module:send(message);
    73 			module:send(message);
    52 		end
    74 		end
    53 	end
    75 	end
    54 	return simple_broadcast;
    76 	return simple_broadcast;
    55 end
    77 end
    56 
    78 
    57 function get_pep_service(name)
    79 function get_pep_service(username)
    58 	local service = services[name];
    80 	module:log("debug", "get_pep_service(%q)", username);
       
    81 	local user_bare = jid_join(username, host);
       
    82 	local service = services[username];
    59 	if service then
    83 	if service then
    60 		return service;
    84 		return service;
    61 	end
    85 	end
    62 	service = pubsub.new({
    86 	service = pubsub.new({
    63 		capabilities = {
    87 		capabilities = {
   153 				set_affiliation = true;
   177 				set_affiliation = true;
   154 			};
   178 			};
   155 		};
   179 		};
   156 
   180 
   157 		node_defaults = {
   181 		node_defaults = {
   158 			["pubsub#max_items"] = "1";
   182 			["max_items"] = 1;
       
   183 			["persist_items"] = true;
   159 		};
   184 		};
   160 
   185 
   161 		autocreate_on_publish = true;
   186 		autocreate_on_publish = true;
   162 		autocreate_on_subscribe = true;
   187 		autocreate_on_subscribe = true;
   163 
   188 
   164 		broadcaster = get_broadcaster(name);
   189 		itemstore = simple_itemstore(username);
       
   190 		broadcaster = get_broadcaster(username);
   165 		get_affiliation = function (jid)
   191 		get_affiliation = function (jid)
   166 			if jid_bare(jid) == name then
   192 			if jid_bare(jid) == user_bare then
   167 				return "owner";
   193 				return "owner";
   168 			elseif subscription_presence(name, jid) then
   194 			elseif subscription_presence(username, jid) then
   169 				return "subscriber";
   195 				return "subscriber";
   170 			end
   196 			end
   171 		end;
   197 		end;
   172 
   198 
   173 		normalize_jid = jid_bare;
   199 		normalize_jid = jid_bare;
   174 	});
   200 	});
   175 	services[name] = service;
   201 	local nodes, err = known_nodes:get(username);
   176 	module:add_item("pep-service", { service = service, jid = name });
   202 	if nodes then
       
   203 		module:log("debug", "Restoring nodes for user %s", username);
       
   204 		for node in pairs(nodes) do
       
   205 			module:log("debug", "Restoring node %q", node);
       
   206 			service:create(node, true);
       
   207 		end
       
   208 	elseif err then
       
   209 		module:log("error", "Could not restore nodes for %s: %s", username, err);
       
   210 	else
       
   211 		module:log("debug", "No known nodes");
       
   212 	end
       
   213 	services[username] = service;
       
   214 	module:add_item("pep-service", { service = service, jid = user_bare });
   177 	return service;
   215 	return service;
   178 end
   216 end
   179 
   217 
   180 function handle_pubsub_iq(event)
   218 function handle_pubsub_iq(event)
   181 	local origin, stanza = event.origin, event.stanza;
   219 	local origin, stanza = event.origin, event.stanza;
   182 	local pubsub = stanza.tags[1];
   220 	local service_name = origin.username;
   183 	local action = pubsub.tags[1];
   221 	if stanza.attr.to ~= nil then
   184 	if not action then
   222 		service_name = jid_split(stanza.attr.to);
   185 		return origin.send(st.error_reply(stanza, "cancel", "bad-request"));
   223 	end
   186 	end
       
   187 	local service_name = stanza.attr.to or origin.username.."@"..origin.host
       
   188 	local service = get_pep_service(service_name);
   224 	local service = get_pep_service(service_name);
   189 	local handler = handlers[stanza.attr.type.."_"..action.name];
   225 
   190 	if handler then
   226 	return lib_pubsub.handle_pubsub_iq(event, service)
   191 		handler(origin, stanza, action, service);
       
   192 		return true;
       
   193 	end
       
   194 end
   227 end
   195 
   228 
   196 module:hook("iq/bare/"..xmlns_pubsub..":pubsub", handle_pubsub_iq);
   229 module:hook("iq/bare/"..xmlns_pubsub..":pubsub", handle_pubsub_iq);
   197 module:hook("iq/bare/"..xmlns_pubsub_owner..":pubsub", handle_pubsub_iq);
   230 module:hook("iq/bare/"..xmlns_pubsub_owner..":pubsub", handle_pubsub_iq);
   198 
   231 
   223 end
   256 end
   224 
   257 
   225 local function resend_last_item(jid, node, service)
   258 local function resend_last_item(jid, node, service)
   226 	local ok, items = service:get_items(node, jid);
   259 	local ok, items = service:get_items(node, jid);
   227 	if not ok then return; end
   260 	if not ok then return; end
   228 	for i, id in ipairs(items) do
   261 	for _, id in ipairs(items) do
   229 		service.config.broadcaster("items", node, { [jid] = true }, items[id]);
   262 		service.config.broadcaster("items", node, { [jid] = true }, items[id]);
   230 	end
   263 	end
   231 end
   264 end
   232 
   265 
   233 local function update_subscriptions(recipient, service_name, nodes)
   266 local function update_subscriptions(recipient, service_name, nodes)
   266 end
   299 end
   267 
   300 
   268 module:hook("presence/bare", function(event)
   301 module:hook("presence/bare", function(event)
   269 	-- inbound presence to bare JID recieved
   302 	-- inbound presence to bare JID recieved
   270 	local origin, stanza = event.origin, event.stanza;
   303 	local origin, stanza = event.origin, event.stanza;
   271 	local user = stanza.attr.to or (origin.username..'@'..origin.host);
       
   272 	local t = stanza.attr.type;
   304 	local t = stanza.attr.type;
   273 	local self = not stanza.attr.to;
   305 	local is_self = not stanza.attr.to;
   274 	local service = get_pep_service(user);
   306 	local username = jid_split(stanza.attr.to);
       
   307 	local user_bare = jid_bare(stanza.attr.to);
       
   308 	if is_self then
       
   309 		username = origin.username;
       
   310 		user_bare = jid_join(username, host);
       
   311 	end
   275 
   312 
   276 	if not t then -- available presence
   313 	if not t then -- available presence
   277 		if self or subscription_presence(user, stanza.attr.from) then
   314 		if is_self or subscription_presence(username, stanza.attr.from) then
   278 			local recipient = stanza.attr.from;
   315 			local recipient = stanza.attr.from;
   279 			local current = recipients[user] and recipients[user][recipient];
   316 			local current = recipients[username] and recipients[username][recipient];
   280 			local hash, query_node = get_caps_hash_from_presence(stanza, current);
   317 			local hash, query_node = get_caps_hash_from_presence(stanza, current);
   281 			if current == hash or (current and current == hash_map[hash]) then return; end
   318 			if current == hash or (current and current == hash_map[hash]) then return; end
   282 			if not hash then
   319 			if not hash then
   283 				update_subscriptions(recipient, user);
   320 				update_subscriptions(recipient, username);
   284 			else
   321 			else
   285 				recipients[user] = recipients[user] or {};
   322 				recipients[username] = recipients[username] or {};
   286 				if hash_map[hash] then
   323 				if hash_map[hash] then
   287 					update_subscriptions(recipient, user, hash_map[hash]);
   324 					update_subscriptions(recipient, username, hash_map[hash]);
   288 				else
   325 				else
   289 					recipients[user][recipient] = hash;
   326 					recipients[username][recipient] = hash;
   290 					local from_bare = origin.type == "c2s" and origin.username.."@"..origin.host;
   327 					local from_bare = origin.type == "c2s" and origin.username.."@"..origin.host;
   291 					if self or origin.type ~= "c2s" or (recipients[from_bare] and recipients[from_bare][origin.full_jid]) ~= hash then
   328 					if is_self or origin.type ~= "c2s" or (recipients[from_bare] and recipients[from_bare][origin.full_jid]) ~= hash then
   292 						-- COMPAT from ~= stanza.attr.to because OneTeam can't deal with missing from attribute
   329 						-- COMPAT from ~= stanza.attr.to because OneTeam can't deal with missing from attribute
   293 						origin.send(
   330 						origin.send(
   294 							st.stanza("iq", {from=user, to=stanza.attr.from, id="disco", type="get"})
   331 							st.stanza("iq", {from=user_bare, to=stanza.attr.from, id="disco", type="get"})
   295 								:tag("query", {xmlns = "http://jabber.org/protocol/disco#info", node = query_node})
   332 								:tag("query", {xmlns = "http://jabber.org/protocol/disco#info", node = query_node})
   296 						);
   333 						);
   297 					end
   334 					end
   298 				end
   335 				end
   299 			end
   336 			end
   300 		end
   337 		end
   301 	elseif t == "unavailable" then
   338 	elseif t == "unavailable" then
   302 		update_subscriptions(stanza.attr.from, user);
   339 		update_subscriptions(stanza.attr.from, username);
   303 	elseif not self and t == "unsubscribe" then
   340 	elseif not is_self and t == "unsubscribe" then
   304 		local from = jid_bare(stanza.attr.from);
   341 		local from = jid_bare(stanza.attr.from);
   305 		local subscriptions = recipients[user];
   342 		local subscriptions = recipients[username];
   306 		if subscriptions then
   343 		if subscriptions then
   307 			for subscriber in pairs(subscriptions) do
   344 			for subscriber in pairs(subscriptions) do
   308 				if jid_bare(subscriber) == from then
   345 				if jid_bare(subscriber) == from then
   309 					update_subscriptions(subscriber, user);
   346 					update_subscriptions(subscriber, username);
   310 				end
   347 				end
   311 			end
   348 			end
   312 		end
   349 		end
   313 	end
   350 	end
   314 end, 10);
   351 end, 10);
   319 	if not disco then
   356 	if not disco then
   320 		return;
   357 		return;
   321 	end
   358 	end
   322 
   359 
   323 	-- Process disco response
   360 	-- Process disco response
   324 	local self = not stanza.attr.to;
   361 	local is_self = stanza.attr.to == nil;
   325 	local user = stanza.attr.to or (origin.username..'@'..origin.host);
   362 	local user_bare = jid_bare(stanza.attr.to);
       
   363 	local username = jid_split(stanza.attr.to);
       
   364 	if is_self then
       
   365 		username = origin.username;
       
   366 		user_bare = jid_join(username, host);
       
   367 	end
   326 	local contact = stanza.attr.from;
   368 	local contact = stanza.attr.from;
   327 	local current = recipients[user] and recipients[user][contact];
   369 	local current = recipients[username] and recipients[username][contact];
   328 	if type(current) ~= "string" then return; end -- check if waiting for recipient's response
   370 	if type(current) ~= "string" then return; end -- check if waiting for recipient's response
   329 	local ver = current;
   371 	local ver = current;
   330 	if not string.find(current, "#") then
   372 	if not string.find(current, "#") then
   331 		ver = calculate_hash(disco.tags); -- calculate hash
   373 		ver = calculate_hash(disco.tags); -- calculate hash
   332 	end
   374 	end
   336 			local nfeature = feature.attr.var:match("^(.*)%+notify$");
   378 			local nfeature = feature.attr.var:match("^(.*)%+notify$");
   337 			if nfeature then notify:add(nfeature); end
   379 			if nfeature then notify:add(nfeature); end
   338 		end
   380 		end
   339 	end
   381 	end
   340 	hash_map[ver] = notify; -- update hash map
   382 	hash_map[ver] = notify; -- update hash map
   341 	if self then
   383 	if is_self then
       
   384 		-- Optimization: Fiddle with other local users
   342 		for jid, item in pairs(origin.roster) do -- for all interested contacts
   385 		for jid, item in pairs(origin.roster) do -- for all interested contacts
   343 			if item.subscription == "both" or item.subscription == "from" then
   386 			if jid then
   344 				if not recipients[jid] then recipients[jid] = {}; end
   387 				local contact_node, contact_host = jid_split(jid);
   345 				update_subscriptions(contact, jid, notify);
   388 				if contact_host == host and item.subscription == "both" or item.subscription == "from" then
       
   389 					update_subscriptions(user_bare, contact_node, notify);
       
   390 				end
   346 			end
   391 			end
   347 		end
   392 		end
   348 	end
   393 	end
   349 	update_subscriptions(contact, user, notify);
   394 	update_subscriptions(contact, username, notify);
   350 end);
   395 end);
   351 
   396 
   352 module:hook("account-disco-info-node", function(event)
   397 module:hook("account-disco-info-node", function(event)
   353 	local reply, stanza, origin = event.reply, event.stanza, event.origin;
   398 	local reply, stanza, origin = event.reply, event.stanza, event.origin;
   354 	local service_name = stanza.attr.to or origin.username.."@"..origin.host
   399 	local service_name = origin.username;
       
   400 	if stanza.attr.to ~= nil then
       
   401 		service_name = jid_split(stanza.attr.to);
       
   402 	end
   355 	local service = get_pep_service(service_name);
   403 	local service = get_pep_service(service_name);
   356 	local node = event.node;
   404 	local node = event.node;
   357 	local ok = service:get_items(node, jid_bare(stanza.attr.from) or true);
   405 	local ok = service:get_items(node, jid_bare(stanza.attr.from) or true);
   358 	if not ok then return; end
   406 	if not ok then return; end
   359 	event.exists = true;
   407 	event.exists = true;
   360 	reply:tag('identity', {category='pubsub', type='leaf'}):up();
   408 	reply:tag('identity', {category='pubsub', type='leaf'}):up();
   361 end);
   409 end);
   362 
   410 
   363 module:hook("account-disco-info", function(event)
   411 module:hook("account-disco-info", function(event)
   364 	local reply = event.reply;
   412 	local origin, reply = event.origin, event.reply;
       
   413 
   365 	reply:tag('identity', {category='pubsub', type='pep'}):up();
   414 	reply:tag('identity', {category='pubsub', type='pep'}):up();
   366 	reply:tag('feature', {var='http://jabber.org/protocol/pubsub#publish'}):up();
   415 
       
   416 	local username = jid_split(reply.attr.from) or origin.username;
       
   417 	local service = get_pep_service(username);
       
   418 
       
   419 	local suppored_features = lib_pubsub.get_feature_set(service) + set.new{
       
   420 		-- Features not covered by the above
       
   421 		"access-presence",
       
   422 		"auto-subscribe",
       
   423 		"filtered-notifications",
       
   424 		"last-published",
       
   425 		"persistent-items",
       
   426 		"presence-notifications",
       
   427 		"presence-subscribe",
       
   428 	};
       
   429 
       
   430 	for feature in suppored_features do
       
   431 		reply:tag('feature', {var=xmlns_pubsub.."#"..feature}):up();
       
   432 	end
   367 end);
   433 end);
   368 
   434 
   369 module:hook("account-disco-items-node", function(event)
   435 module:hook("account-disco-items-node", function(event)
   370 	local reply, stanza, origin = event.reply, event.stanza, event.origin;
   436 	local reply, stanza, origin = event.reply, event.stanza, event.origin;
   371 	local node = event.node;
   437 	local node = event.node;
   372 	local service_name = stanza.attr.to or origin.username.."@"..origin.host
   438 	local is_self = stanza.attr.to == nil;
   373 	local service = get_pep_service(service_name);
   439 	local user_bare = jid_bare(stanza.attr.to);
       
   440 	local username = jid_split(stanza.attr.to);
       
   441 	if is_self then
       
   442 		username = origin.username;
       
   443 		user_bare = jid_join(username, host);
       
   444 	end
       
   445 	local service = get_pep_service(username);
   374 	local ok, ret = service:get_items(node, jid_bare(stanza.attr.from) or true);
   446 	local ok, ret = service:get_items(node, jid_bare(stanza.attr.from) or true);
   375 	if not ok then return; end
   447 	if not ok then return; end
   376 	event.exists = true;
   448 	event.exists = true;
   377 	for _, id in ipairs(ret) do
   449 	for _, id in ipairs(ret) do
   378 		reply:tag("item", { jid = service_name, name = id }):up();
   450 		reply:tag("item", { jid = user_bare, name = id }):up();
   379 	end
   451 	end
   380 end);
   452 end);
   381 
   453 
   382 module:hook("account-disco-items", function(event)
   454 module:hook("account-disco-items", function(event)
   383 	local reply, stanza, origin = event.reply, event.stanza, event.origin;
   455 	local reply, stanza, origin = event.reply, event.stanza, event.origin;
   384 
   456 
   385 	local service_name = reply.attr.from or origin.username.."@"..origin.host
   457 	local is_self = stanza.attr.to == nil;
   386 	local service = get_pep_service(service_name);
   458 	local user_bare = jid_bare(stanza.attr.to);
       
   459 	local username = jid_split(stanza.attr.to);
       
   460 	if is_self then
       
   461 		username = origin.username;
       
   462 		user_bare = jid_join(username, host);
       
   463 	end
       
   464 	local service = get_pep_service(username);
       
   465 
   387 	local ok, ret = service:get_nodes(jid_bare(stanza.attr.from));
   466 	local ok, ret = service:get_nodes(jid_bare(stanza.attr.from));
   388 	if not ok then return; end
   467 	if not ok then return; end
   389 
   468 
   390 	for node, node_obj in pairs(ret) do
   469 	for node, node_obj in pairs(ret) do
   391 		reply:tag("item", { jid = service_name, node = node, name = node_obj.config.name }):up();
   470 		reply:tag("item", { jid = user_bare, node = node, name = node_obj.config.name }):up();
   392 	end
   471 	end
   393 end);
   472 end);