mod_smacks/mod_smacks.lua
changeset 345 445178d15b51
parent 263 41f1cac40560
child 478 db0f065c4e09
equal deleted inserted replaced
344:2b0f2160fc61 345:445178d15b51
     1 local st = require "util.stanza";
     1 local st = require "util.stanza";
       
     2 local uuid_generate = require "util.uuid".generate;
     2 
     3 
     3 local t_insert, t_remove = table.insert, table.remove;
     4 local t_insert, t_remove = table.insert, table.remove;
     4 local math_min = math.min;
     5 local math_min = math.min;
       
     6 local os_time = os.time;
     5 local tonumber, tostring = tonumber, tostring;
     7 local tonumber, tostring = tonumber, tostring;
     6 local add_filter = require "util.filters".add_filter;
     8 local add_filter = require "util.filters".add_filter;
     7 local timer = require "util.timer";
     9 local timer = require "util.timer";
     8 
    10 
     9 local xmlns_sm = "urn:xmpp:sm:2";
    11 local xmlns_sm = "urn:xmpp:sm:2";
       
    12 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
    10 
    13 
    11 local sm_attr = { xmlns = xmlns_sm };
    14 local sm_attr = { xmlns = xmlns_sm };
    12 
    15 
    13 local resume_timeout = 300;
    16 local resume_timeout = module:get_option("smacks_hibernation_time", 300);
    14 local max_unacked_stanzas = 0;
    17 local max_unacked_stanzas = 0;
       
    18 
       
    19 local session_registry = {};
    15 
    20 
    16 module:hook("stream-features",
    21 module:hook("stream-features",
    17 		function (event)
    22 		function (event)
    18 			event.features:tag("sm", sm_attr):tag("optional"):up():up();
    23 			event.features:tag("sm", sm_attr):tag("optional"):up():up();
    19 		end);
    24 		end);
    21 module:hook("s2s-stream-features",
    26 module:hook("s2s-stream-features",
    22 		function (event)
    27 		function (event)
    23 			event.features:tag("sm", sm_attr):tag("optional"):up():up();
    28 			event.features:tag("sm", sm_attr):tag("optional"):up():up();
    24 		end);
    29 		end);
    25 
    30 
    26 module:hook_stanza(xmlns_sm, "enable",
    31 module:hook_stanza("http://etherx.jabber.org/streams", "features",
    27 		function (session, stanza)
    32 		function (session, stanza)
    28 			module:log("debug", "Enabling stream management");
    33 			if not session.smacks and stanza:get_child("sm", xmlns_sm) then
    29 			session.smacks = true;
    34 				session.send(st.stanza("enable", sm_attr));
    30 			
    35 			end
    31 			-- Overwrite process_stanza() and send()
    36 end);
    32 			local queue = {};
    37 
    33 			session.outgoing_stanza_queue = queue;
    38 local function wrap_session(session, resume)
    34 			session.last_acknowledged_stanza = 0;
    39 	-- Overwrite process_stanza() and send()
    35 			local _send = session.sends2s or session.send;
    40 	local queue;
    36 			local function new_send(stanza)
    41 	if not resume then
    37 				local attr = stanza.attr;
    42 		queue = {};
    38 				if attr and not attr.xmlns then -- Stanza in default stream namespace
    43 		session.outgoing_stanza_queue = queue;
    39 					queue[#queue+1] = st.clone(stanza);
    44 		session.last_acknowledged_stanza = 0;
    40 				end
    45 	else
    41 				local ok, err = _send(stanza);
    46 		queue = session.outgoing_stanza_queue;
    42 				if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
    47 	end
    43 					session.awaiting_ack = true;
    48 	
    44 					return _send(st.stanza("r", { xmlns = xmlns_sm }));
    49 	local _send = session.sends2s or session.send;
    45 				end
    50 	local function new_send(stanza)
    46 				return ok, err;
    51 		local attr = stanza.attr;
    47 			end
    52 		if attr and not attr.xmlns then -- Stanza in default stream namespace
    48 			
    53 			queue[#queue+1] = st.clone(stanza);
    49 			if session.sends2s then
    54 		end
    50 				session.sends2s = new_send;
    55 		local ok, err = _send(stanza);
    51 			else
    56 		if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then
    52 				session.send = new_send;
    57 			session.awaiting_ack = true;
    53 			end
    58 			return _send(st.stanza("r", sm_attr));
    54 			
    59 		end
    55 			session.handled_stanza_count = 0;
    60 		return ok, err;
    56 			add_filter(session, "stanzas/in", function (stanza)
    61 	end
    57 				if not stanza.attr.xmlns then
    62 	
    58 					session.handled_stanza_count = session.handled_stanza_count + 1;
    63 	if session.sends2s then
    59 					session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
    64 		session.sends2s = new_send;
    60 				end
    65 	else
    61 				return stanza;
    66 		session.send = new_send;
    62 			end);
    67 	end
    63 
    68 	
    64 			if not stanza.attr.resume then -- FIXME: Resumption should be a different spec :/
    69 	if not resume then
    65 				_send(st.stanza("enabled", sm_attr));
    70 		session.handled_stanza_count = 0;
    66 				return true;
    71 		add_filter(session, "stanzas/in", function (stanza)
    67 			end
    72 			if not stanza.attr.xmlns then
    68 		end, 100);
    73 				session.handled_stanza_count = session.handled_stanza_count + 1;
       
    74 				session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count);
       
    75 			end
       
    76 			return stanza;
       
    77 		end);
       
    78 	end
       
    79 
       
    80 	return session;
       
    81 end
       
    82 
       
    83 module:hook_stanza(xmlns_sm, "enable", function (session, stanza)
       
    84 	module:log("debug", "Enabling stream management");
       
    85 	session.smacks = true;
       
    86 	
       
    87 	wrap_session(session);
       
    88 	
       
    89 	local resume_token;
       
    90 	local resume = stanza.attr.resume;
       
    91 	if resume == "true" or resume == "1" then
       
    92 		resume_token = uuid_generate();
       
    93 		session_registry[resume_token] = session;
       
    94 		session.resumption_token = resume_token;
       
    95 	end
       
    96 	session.send(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
       
    97 	return true;
       
    98 end, 100);
    69 
    99 
    70 module:hook_stanza(xmlns_sm, "r", function (origin, stanza)
   100 module:hook_stanza(xmlns_sm, "r", function (origin, stanza)
    71 	if not origin.smacks then
   101 	if not origin.smacks then
    72 		module:log("debug", "Received ack request from non-smack-enabled session");
   102 		module:log("debug", "Received ack request from non-smack-enabled session");
    73 		return;
   103 		return;
    86 	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
   116 	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
    87 	local queue = origin.outgoing_stanza_queue;
   117 	local queue = origin.outgoing_stanza_queue;
    88 	if handled_stanza_count > #queue then
   118 	if handled_stanza_count > #queue then
    89 		module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
   119 		module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)",
    90 			handled_stanza_count, #queue);
   120 			handled_stanza_count, #queue);
       
   121 		module:log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza);
    91 		for i=1,#queue do
   122 		for i=1,#queue do
    92 			module:log("debug", "Q item %d: %s", i, tostring(queue[i]));
   123 			module:log("debug", "Q item %d: %s", i, tostring(queue[i]));
    93 		end
   124 		end
    94 	end
   125 	end
    95 	for i=1,math_min(handled_stanza_count,#queue) do
   126 	for i=1,math_min(handled_stanza_count,#queue) do
   132 					module:log("warn", "::%s", tostring(queue[i]));
   163 					module:log("warn", "::%s", tostring(queue[i]));
   133 				end
   164 				end
   134 				handle_unacked_stanzas(session);
   165 				handle_unacked_stanzas(session);
   135 			end
   166 			end
   136 		else
   167 		else
   137 			session.hibernating = true;
   168 			local hibernate_time = os_time(); -- Track the time we went into hibernation
       
   169 			session.hibernating = hibernate_time;
   138 			timer.add_task(resume_timeout, function ()
   170 			timer.add_task(resume_timeout, function ()
   139 				if session.hibernating then
   171 				-- Check the hibernate time still matches what we think it is,
       
   172 				-- otherwise the session resumed and re-hibernated.
       
   173 				if session.hibernating == hibernate_time then
       
   174 					session_registry[session.resumption_token] = nil;
   140 					session.resumption_token = nil;
   175 					session.resumption_token = nil;
   141 					sessionmanager.destroy_session(session); -- Re-destroy
   176 					-- This recursion back into our destroy handler is to
       
   177 					-- make sure we still handle any queued stanzas
       
   178 					sessionmanager.destroy_session(session);
   142 				end
   179 				end
   143 			end);
   180 			end);
   144 			return; -- Postpone destruction for now
   181 			return; -- Postpone destruction for now
   145 		end
   182 		end
   146 		
   183 		
   147 	end
   184 	end
   148 	return _destroy_session(session, err);
   185 	return _destroy_session(session, err);
   149 end
   186 end
       
   187 
       
   188 module:hook_stanza(xmlns_sm, "resume", function (session, stanza)
       
   189 	local id = stanza.attr.previd;
       
   190 	local original_session = session_registry[id];
       
   191 	if not original_session then
       
   192 		session.send(st.stanza("failed", sm_attr)
       
   193 			:tag("item-not-found", { xmlns = xmlns_errors })
       
   194 		);
       
   195 	elseif session.username == original_session.username
       
   196 	and session.host == original_session.host then
       
   197 		-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
       
   198 		original_session.ip = session.ip;
       
   199 		original_session.conn = session.conn;
       
   200 		original_session.send = session.send;
       
   201 		original_session.stream = session.stream;
       
   202 		original_session.secure = session.secure;
       
   203 		original_session.hibernating = nil;
       
   204 		local filter = original_session.filter;
       
   205 		local stream = session.stream;
       
   206 		local log = session.log;
       
   207 		function original_session.data(data)
       
   208 			data = filter("bytes/in", data);
       
   209 			if data then
       
   210 				local ok, err = stream:feed(data);
       
   211 				if ok then return; end
       
   212 				log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
       
   213 				original_session:close("xml-not-well-formed");
       
   214 			end
       
   215 		end
       
   216 		wrap_session(original_session, true);
       
   217 		-- Inform xmppstream of the new session (passed to its callbacks)
       
   218 		stream:set_session(original_session);
       
   219 		-- Similar for connlisteners
       
   220 		require "net.connlisteners".get("xmppclient").associate_session(session.conn, original_session);
       
   221 
       
   222 		session.send(st.stanza("resumed", { xmlns = xmlns_sm,
       
   223 			h = original_session.handled_stanza_count, previd = id }));
       
   224 		
       
   225 		-- Fake an <a> with the h of the <resume/> from the client
       
   226 		original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm,
       
   227 			h = stanza.attr.h }));
       
   228 		
       
   229 		-- Ok, we need to re-send any stanzas that the client didn't see
       
   230 		-- ...they are what is now left in the outgoing stanza queue
       
   231 		local queue = original_session.outgoing_stanza_queue;
       
   232 		for i=1,#queue do
       
   233 			session.send(queue[i]);
       
   234 		end
       
   235 	else
       
   236 		log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
       
   237 			session.username or "?", session.host or "?", session.type,
       
   238 			original_session.username or "?", original_session.host or "?", original_session.type);
       
   239 		session.send(st.stanza("failed", sm_attr)
       
   240 			:tag("not-authorized", { xmlns = xmlns_errors }));
       
   241 	end
       
   242 	return true;
       
   243 end);