mod_smacks/mod_smacks.lua
changeset 1298 659da45a2b4b
parent 1293 ddbc1eb8d431
child 1324 853a382c9bd6
equal deleted inserted replaced
1297:3df303543765 1298:659da45a2b4b
     7 local tonumber, tostring = tonumber, tostring;
     7 local tonumber, tostring = tonumber, tostring;
     8 local add_filter = require "util.filters".add_filter;
     8 local add_filter = require "util.filters".add_filter;
     9 local timer = require "util.timer";
     9 local timer = require "util.timer";
    10 local datetime = require "util.datetime";
    10 local datetime = require "util.datetime";
    11 
    11 
    12 local xmlns_sm = "urn:xmpp:sm:2";
    12 local xmlns_sm2 = "urn:xmpp:sm:2";
       
    13 local xmlns_sm3 = "urn:xmpp:sm:3";
    13 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
    14 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas";
    14 local xmlns_delay = "urn:xmpp:delay";
    15 local xmlns_delay = "urn:xmpp:delay";
    15 
    16 
    16 local sm_attr = { xmlns = xmlns_sm };
    17 local sm2_attr = { xmlns = xmlns_sm2 };
       
    18 local sm3_attr = { xmlns = xmlns_sm3 };
    17 
    19 
    18 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300);
    20 local resume_timeout = module:get_option_number("smacks_hibernation_time", 300);
    19 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
    21 local s2s_smacks = module:get_option_boolean("smacks_enabled_s2s", false);
    20 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
    22 local max_unacked_stanzas = module:get_option_number("smacks_max_unacked_stanzas", 0);
    21 local core_process_stanza = prosody.core_process_stanza;
    23 local core_process_stanza = prosody.core_process_stanza;
    40 end
    42 end
    41 
    43 
    42 module:hook("stream-features",
    44 module:hook("stream-features",
    43 		function (event)
    45 		function (event)
    44 			if can_do_smacks(event.origin, true) then
    46 			if can_do_smacks(event.origin, true) then
    45 				event.features:tag("sm", sm_attr):tag("optional"):up():up();
    47 				event.features:tag("sm", sm2_attr):tag("optional"):up():up();
       
    48 				event.features:tag("sm", sm3_attr):tag("optional"):up():up();
    46 			end
    49 			end
    47 		end);
    50 		end);
    48 
    51 
    49 module:hook("s2s-stream-features",
    52 module:hook("s2s-stream-features",
    50 		function (event)
    53 		function (event)
    51 			if can_do_smacks(event.origin, true) then
    54 			if can_do_smacks(event.origin, true) then
    52 				event.features:tag("sm", sm_attr):tag("optional"):up():up();
    55 				event.features:tag("sm", sm2_attr):tag("optional"):up():up();
       
    56 				event.features:tag("sm", sm3_attr):tag("optional"):up():up();
    53 			end
    57 			end
    54 		end);
    58 		end);
    55 
    59 
    56 module:hook_stanza("http://etherx.jabber.org/streams", "features",
    60 module:hook_stanza("http://etherx.jabber.org/streams", "features",
    57 		function (session, stanza)
    61 		function (session, stanza)
    58 			if can_do_smacks(session) and stanza:get_child("sm", xmlns_sm) then
    62 			if can_do_smacks(session) then
    59 				session.sends2s(st.stanza("enable", sm_attr));
    63 				if stanza:get_child("sm", xmlns_sm3) then
    60 			end
    64 					session.sends2s(st.stanza("enable", sm3_attr));
    61 end);
    65 				elseif stanza:get_child("sm", xmlns_sm2) then
    62 
    66 					session.sends2s(st.stanza("enable", sm2_attr));
    63 local function wrap_session(session, resume)
    67 				end
       
    68 			end
       
    69 		end);
       
    70 
       
    71 local function wrap_session(session, resume, xmlns_sm)
       
    72 	local sm_attr = { xmlns = xmlns_sm };
    64 	-- Overwrite process_stanza() and send()
    73 	-- Overwrite process_stanza() and send()
    65 	local queue;
    74 	local queue;
    66 	if not resume then
    75 	if not resume then
    67 		queue = {};
    76 		queue = {};
    68 		session.outgoing_stanza_queue = queue;
    77 		session.outgoing_stanza_queue = queue;
   123 	end
   132 	end
   124 
   133 
   125 	return session;
   134 	return session;
   126 end
   135 end
   127 
   136 
   128 module:hook_stanza(xmlns_sm, "enable", function (session, stanza)
   137 function handle_enable(session, stanza, xmlns_sm)
   129 	local ok, err, err_text = can_do_smacks(session);
   138 	local ok, err, err_text = can_do_smacks(session);
   130 	if not ok then
   139 	if not ok then
   131 		session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it?
   140 		session.log("warn", "Failed to enable smacks: %s", err_text); -- TODO: XEP doesn't say we can send error text, should it?
   132 		session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors}));
   141 		session.send(st.stanza("failed", { xmlns = xmlns_sm }):tag(err, { xmlns = xmlns_errors}));
   133 		return true;
   142 		return true;
   134 	end
   143 	end
   135 
   144 
   136 	module:log("debug", "Enabling stream management");
   145 	module:log("debug", "Enabling stream management");
   137 	session.smacks = true;
   146 	session.smacks = true;
   138 	
   147 	
   139 	wrap_session(session);
   148 	wrap_session(session, false, xmlns_sm);
   140 	
   149 	
   141 	local resume_token;
   150 	local resume_token;
   142 	local resume = stanza.attr.resume;
   151 	local resume = stanza.attr.resume;
   143 	if resume == "true" or resume == "1" then
   152 	if resume == "true" or resume == "1" then
   144 		resume_token = uuid_generate();
   153 		resume_token = uuid_generate();
   145 		session_registry[resume_token] = session;
   154 		session_registry[resume_token] = session;
   146 		session.resumption_token = resume_token;
   155 		session.resumption_token = resume_token;
   147 	end
   156 	end
   148 	(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
   157 	(session.sends2s or session.send)(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume }));
   149 	return true;
   158 	return true;
   150 end, 100);
   159 end
   151 
   160 module:hook_stanza(xmlns_sm2, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm2); end, 100);
   152 module:hook_stanza(xmlns_sm, "enabled", function (session, stanza)
   161 module:hook_stanza(xmlns_sm3, "enable", function (session, stanza) return handle_enable(session, stanza, xmlns_sm3); end, 100);
       
   162 
       
   163 function handle_enabled(session, stanza, xmlns_sm)
   153 	module:log("debug", "Enabling stream management");
   164 	module:log("debug", "Enabling stream management");
   154 	session.smacks = true;
   165 	session.smacks = true;
   155 	
   166 	
   156 	wrap_session(session);
   167 	wrap_session(session, false, xmlns_sm);
   157 
   168 
   158 	-- FIXME Resume?
   169 	-- FIXME Resume?
   159 	
   170 	
   160 	return true;
   171 	return true;
   161 end, 100);
   172 end
   162 
   173 module:hook_stanza(xmlns_sm2, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm2); end, 100);
   163 module:hook_stanza(xmlns_sm, "r", function (origin, stanza)
   174 module:hook_stanza(xmlns_sm3, "enabled", function (session, stanza) return handle_enabled(session, stanza, xmlns_sm3); end, 100);
       
   175 
       
   176 function handle_r(origin, stanza, xmlns_sm)
   164 	if not origin.smacks then
   177 	if not origin.smacks then
   165 		module:log("debug", "Received ack request from non-smack-enabled session");
   178 		module:log("debug", "Received ack request from non-smack-enabled session");
   166 		return;
   179 		return;
   167 	end
   180 	end
   168 	module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
   181 	module:log("debug", "Received ack request, acking for %d", origin.handled_stanza_count);
   169 	-- Reply with <a>
   182 	-- Reply with <a>
   170 	(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
   183 	(origin.sends2s or origin.send)(st.stanza("a", { xmlns = xmlns_sm, h = tostring(origin.handled_stanza_count) }));
   171 	return true;
   184 	return true;
   172 end);
   185 end
   173 
   186 module:hook_stanza(xmlns_sm2, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm2); end);
   174 module:hook_stanza(xmlns_sm, "a", function (origin, stanza)
   187 module:hook_stanza(xmlns_sm3, "r", function (origin, stanza) return handle_r(origin, stanza, xmlns_sm3); end);
       
   188 
       
   189 function handle_a(origin, stanza)
   175 	if not origin.smacks then return; end
   190 	if not origin.smacks then return; end
   176 	origin.awaiting_ack = nil;
   191 	origin.awaiting_ack = nil;
   177 	-- Remove handled stanzas from outgoing_stanza_queue
   192 	-- Remove handled stanzas from outgoing_stanza_queue
   178 	--log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
   193 	--log("debug", "ACK: h=%s, last=%s", stanza.attr.h or "", origin.last_acknowledged_stanza or "");
   179 	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
   194 	local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza;
   189 	for i=1,math_min(handled_stanza_count,#queue) do
   204 	for i=1,math_min(handled_stanza_count,#queue) do
   190 		t_remove(origin.outgoing_stanza_queue, 1);
   205 		t_remove(origin.outgoing_stanza_queue, 1);
   191 	end
   206 	end
   192 	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
   207 	origin.last_acknowledged_stanza = origin.last_acknowledged_stanza + handled_stanza_count;
   193 	return true;
   208 	return true;
   194 end);
   209 end
       
   210 module:hook_stanza(xmlns_sm2, "a", handle_a);
       
   211 module:hook_stanza(xmlns_sm3, "a", handle_a);
   195 
   212 
   196 --TODO: Optimise... incoming stanzas should be handled by a per-session
   213 --TODO: Optimise... incoming stanzas should be handled by a per-session
   197 -- function that has a counter as an upvalue (no table indexing for increments,
   214 -- function that has a counter as an upvalue (no table indexing for increments,
   198 -- and won't slow non-198 sessions). We can also then remove the .handled flag
   215 -- and won't slow non-198 sessions). We can also then remove the .handled flag
   199 -- on stanzas
   216 -- on stanzas
   254 		end
   271 		end
   255 		
   272 		
   256 	end
   273 	end
   257 end);
   274 end);
   258 
   275 
   259 module:hook_stanza(xmlns_sm, "resume", function (session, stanza)
   276 function handle_resume(session, stanza, xmlns_sm)
   260 	if session.full_jid then
   277 	if session.full_jid then
   261 		session.log("warn", "Tried to resume after resource binding");
   278 		session.log("warn", "Tried to resume after resource binding");
   262 		session.send(st.stanza("failed", sm_attr)
   279 		session.send(st.stanza("failed", { xmlns = xmlns_sm })
   263 			:tag("unexpected-request", { xmlns = xmlns_errors })
   280 			:tag("unexpected-request", { xmlns = xmlns_errors })
   264 		);
   281 		);
   265 		return true;
   282 		return true;
   266 	end
   283 	end
   267 
   284 
   268 	local id = stanza.attr.previd;
   285 	local id = stanza.attr.previd;
   269 	local original_session = session_registry[id];
   286 	local original_session = session_registry[id];
   270 	if not original_session then
   287 	if not original_session then
   271 		session.log("debug", "Tried to resume non-existent session with id %s", id);
   288 		session.log("debug", "Tried to resume non-existent session with id %s", id);
   272 		session.send(st.stanza("failed", sm_attr)
   289 		session.send(st.stanza("failed", { xmlns = xmlns_sm })
   273 			:tag("item-not-found", { xmlns = xmlns_errors })
   290 			:tag("item-not-found", { xmlns = xmlns_errors })
   274 		);
   291 		);
   275 	elseif session.username == original_session.username
   292 	elseif session.username == original_session.username
   276 	and session.host == original_session.host then
   293 	and session.host == original_session.host then
   277 		session.log("debug", "mod_smacks resuming existing session...");
   294 		session.log("debug", "mod_smacks resuming existing session...");
   298 				if ok then return; end
   315 				if ok then return; end
   299 				log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
   316 				log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
   300 				original_session:close("xml-not-well-formed");
   317 				original_session:close("xml-not-well-formed");
   301 			end
   318 			end
   302 		end
   319 		end
   303 		wrap_session(original_session, true);
   320 		wrap_session(original_session, true, xmlns_sm);
   304 		-- Inform xmppstream of the new session (passed to its callbacks)
   321 		-- Inform xmppstream of the new session (passed to its callbacks)
   305 		stream:set_session(original_session);
   322 		stream:set_session(original_session);
   306 		-- Similar for connlisteners
   323 		-- Similar for connlisteners
   307 		c2s_sessions[session.conn] = original_session;
   324 		c2s_sessions[session.conn] = original_session;
   308 
   325 
   321 		end
   338 		end
   322 	else
   339 	else
   323 		module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
   340 		module:log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]",
   324 			session.username or "?", session.host or "?", session.type,
   341 			session.username or "?", session.host or "?", session.type,
   325 			original_session.username or "?", original_session.host or "?", original_session.type);
   342 			original_session.username or "?", original_session.host or "?", original_session.type);
   326 		session.send(st.stanza("failed", sm_attr)
   343 		session.send(st.stanza("failed", { xmlns = xmlns_sm })
   327 			:tag("not-authorized", { xmlns = xmlns_errors }));
   344 			:tag("not-authorized", { xmlns = xmlns_errors }));
   328 	end
   345 	end
   329 	return true;
   346 	return true;
   330 end);
   347 end
       
   348 module:hook_stanza(xmlns_sm2, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm2); end);
       
   349 module:hook_stanza(xmlns_sm3, "resume", function (session, stanza) return handle_resume(session, stanza, xmlns_sm3); end);