mod_muc_rai/mod_muc_rai.lua
changeset 3978 f14c862598a9
child 4001 0e72dd70afff
equal deleted inserted replaced
3977:df6227e288e5 3978:f14c862598a9
       
     1 local cache = require "util.cache";
       
     2 local jid = require "util.jid";
       
     3 local st = require "util.stanza";
       
     4 
       
     5 local max_subscribers = module:get_option_number("muc_rai_max_subscribers", 1024);
       
     6 
       
     7 local muc_affiliation_store = module:open_store("config", "map");
       
     8 local muc_archive = module:open_store("muc_log", "archive");
       
     9 
       
    10 local xmlns_rai = "xmpp:prosody.im/protocol/rai";
       
    11 
       
    12 local muc_markers = module:depends("muc_markers");
       
    13 
       
    14 -- subscriber_jid -> { [room_jid] = interested }
       
    15 local subscribed_users = cache.new(max_subscribers, false);
       
    16 -- room_jid -> { [user_jid] = interested }
       
    17 local interested_users = {};
       
    18 -- room_jid -> last_id
       
    19 local room_activity_cache = cache.new(1024);
       
    20 
       
    21 -- Send a single notification for a room, updating data structures as needed
       
    22 local function send_single_notification(user_jid, room_jid)
       
    23 	local notification = st.message({ to = user_jid, from = module.host })
       
    24 		:tag("rai", { xmlns = xmlns_rai })
       
    25 			:text_tag("activity", room_jid)
       
    26 		:up();
       
    27 	local interested_room_users = interested_users[room_jid];
       
    28 	if interested_room_users then
       
    29 		interested_room_users[user_jid] = nil;
       
    30 	end
       
    31 	local interested_rooms = subscribed_users:get(user_jid);
       
    32 	if interested_rooms then
       
    33 		interested_rooms[room_jid] = nil;
       
    34 	end
       
    35 	module:log("debug", "Sending notification from %s to %s", room_jid, user_jid);
       
    36 	return module:send(notification);
       
    37 end
       
    38 
       
    39 local function subscribe_room(user_jid, room_jid)
       
    40 	local interested_rooms = subscribed_users:get(user_jid);
       
    41 	if not interested_rooms then
       
    42 		return nil, "not-subscribed";
       
    43 	end
       
    44 	module:log("debug", "Subscribed %s to %s", user_jid, room_jid);
       
    45 	interested_rooms[room_jid] = true;
       
    46 
       
    47 	local interested_room_users = interested_users[room_jid];
       
    48 	if not interested_room_users then
       
    49 		interested_room_users = {};
       
    50 		interested_users[room_jid] = interested_room_users;
       
    51 	end
       
    52 	interested_room_users[user_jid] = true;
       
    53 	return true;
       
    54 end
       
    55 
       
    56 local function unsubscribe_room(user_jid, room_jid)
       
    57 	local interested_rooms = subscribed_users:get(user_jid);
       
    58 	if not interested_rooms then
       
    59 		return nil, "not-subscribed";
       
    60 	end
       
    61 	interested_rooms[room_jid] = nil;
       
    62 
       
    63 	local interested_room_users = interested_users[room_jid];
       
    64 	if not interested_room_users then
       
    65 		return true;
       
    66 	end
       
    67 	interested_room_users[user_jid] = nil;
       
    68 	return true;
       
    69 end
       
    70 
       
    71 local function notify_interested_users(room_jid)
       
    72 	module:log("warn", "NOTIFYING FOR %s", room_jid)
       
    73 	local interested_room_users = interested_users[room_jid];
       
    74 	if not interested_room_users then
       
    75 		module:log("debug", "Nobody interested in %s", room_jid);
       
    76 		return;
       
    77 	end
       
    78 	for user_jid in pairs(interested_room_users) do
       
    79 		send_single_notification(user_jid, room_jid);
       
    80 	end
       
    81 	return true;
       
    82 end
       
    83 
       
    84 local function unsubscribe_user_from_all_rooms(user_jid)
       
    85 	local interested_rooms = subscribed_users:get(user_jid);
       
    86 	if not interested_rooms then
       
    87 		return nil, "not-subscribed";
       
    88 	end
       
    89 	for room_jid in pairs(interested_rooms) do
       
    90 		unsubscribe_room(user_jid, room_jid);
       
    91 	end
       
    92 	return true;
       
    93 end
       
    94 
       
    95 local function get_last_room_message_id(room_jid)
       
    96 	local last_room_message_id = room_activity_cache:get(room_jid);
       
    97 	if last_room_message_id then
       
    98 		return last_room_message_id;
       
    99 	end
       
   100 
       
   101 	-- Load all the data!
       
   102 	local query = {
       
   103 		limit = 1;
       
   104 		reverse = true;
       
   105 		with = "message<groupchat";
       
   106 	}
       
   107 	local data, err = muc_archive:find(jid.node(room_jid), query);
       
   108 
       
   109 	if not data then
       
   110 		module:log("error", "Could not fetch history: %s", err);
       
   111 		return nil;
       
   112 	end
       
   113 
       
   114 	local id = data();
       
   115 	room_activity_cache:set(room_jid, id);
       
   116 	return id;
       
   117 end
       
   118 
       
   119 local function update_room_activity(room_jid, last_id)
       
   120 	room_activity_cache:set(room_jid, last_id);
       
   121 end
       
   122 
       
   123 local function get_last_user_read_id(user_jid, room_jid)
       
   124 	return muc_markers.get_user_read_marker(user_jid, room_jid);
       
   125 end
       
   126 
       
   127 local function has_new_activity(room_jid, user_jid)
       
   128 	local last_room_message_id = get_last_room_message_id(room_jid);
       
   129 	local last_user_read_id = get_last_user_read_id(user_jid, room_jid);
       
   130 	return last_room_message_id ~= last_user_read_id;
       
   131 end
       
   132 
       
   133 -- Returns a set of rooms that a user is interested in
       
   134 local function get_interested_rooms(user_jid)
       
   135 	-- Use affiliation as an indication of interest, return
       
   136 	-- all rooms a user is affiliated
       
   137 	return muc_affiliation_store:get_all(jid.bare(user_jid));
       
   138 end
       
   139 
       
   140 -- Subscribes to all rooms that the user has an interest in
       
   141 -- Returns a set of room JIDs that have already had activity (thus no subscription)
       
   142 local function subscribe_all_rooms(user_jid)
       
   143 	-- Send activity notifications for all relevant rooms
       
   144 	local interested_rooms, err = get_interested_rooms(user_jid);
       
   145 
       
   146 	if not interested_rooms then
       
   147 		if err then
       
   148 			return nil, "internal-server-error";
       
   149 		end
       
   150 		interested_rooms = {};
       
   151 	end
       
   152 
       
   153 	if not subscribed_users:set(user_jid, interested_rooms) then
       
   154 		module:log("warn", "Subscriber limit (%d) reached, rejecting subscription from %s", max_subscribers, user_jid);
       
   155 		return nil, "resource-constraint";
       
   156 	end
       
   157 
       
   158 	local rooms_with_activity;
       
   159 	for room_name in pairs(interested_rooms) do
       
   160 		local room_jid = room_name.."@"..module.host;
       
   161 		if has_new_activity(room_jid, user_jid) then
       
   162 			-- There has already been activity, include this room
       
   163 			-- in the response
       
   164 			if not rooms_with_activity then
       
   165 				rooms_with_activity = {};
       
   166 			end
       
   167 			rooms_with_activity[room_jid] = true;
       
   168 		else
       
   169 			-- Subscribe to any future activity
       
   170 			subscribe_room(user_jid, room_jid);
       
   171 		end
       
   172 	end
       
   173 	return rooms_with_activity;
       
   174 end
       
   175 
       
   176 module:hook("presence/host", function (event)
       
   177 	local origin, stanza = event.origin, event.stanza;
       
   178 	local user_jid = stanza.attr.from;
       
   179 
       
   180 	if stanza.attr.type == "unavailable" then -- User going offline
       
   181 		unsubscribe_user_from_all_rooms(user_jid);
       
   182 		return true;
       
   183 	end
       
   184 
       
   185 	local rooms_with_activity, err = subscribe_all_rooms(user_jid);
       
   186 
       
   187 	if not rooms_with_activity then
       
   188 		if not err then
       
   189 			module:log("debug", "No activity to notify");
       
   190 			return true;
       
   191 		else
       
   192 			return origin.send(st.error_reply(stanza, "wait", "resource-constraint"));
       
   193 		end
       
   194 	end
       
   195 
       
   196 	local reply = st.reply(stanza)
       
   197 		:tag("rai", { xmlns = xmlns_rai });
       
   198 	for room_jid in pairs(rooms_with_activity) do
       
   199 		reply:text_tag("activity", room_jid);
       
   200 	end
       
   201 	return origin.send(reply);
       
   202 end);
       
   203 
       
   204 module:hook("muc-broadcast-message", function (event)
       
   205 	local room, stanza = event.room, event.stanza;
       
   206 	local archive_id = stanza:get_child_text("stanza-id", "urn:xmpp:sid:0");
       
   207 	if archive_id then
       
   208 		-- Remember the id of the last message so we can compare it
       
   209 		-- to the per-user marker (managed by mod_muc_markers)
       
   210 		update_room_activity(room.jid, archive_id);
       
   211 		-- Notify any users that need to be notified
       
   212 		notify_interested_users(room.jid);
       
   213 	end
       
   214 end, -1);
       
   215