plugins/mod_c2s.lua
changeset 5773 c9a712673d8a
parent 5764 969e0a054795
child 5776 bd0ff8ae98a8
equal deleted inserted replaced
5772:9cef4b5c2fe3 5773:c9a712673d8a
    16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
    16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
    17 local uuid_generate = require "util.uuid".generate;
    17 local uuid_generate = require "util.uuid".generate;
    18 
    18 
    19 local xpcall, tostring, type = xpcall, tostring, type;
    19 local xpcall, tostring, type = xpcall, tostring, type;
    20 local traceback = debug.traceback;
    20 local traceback = debug.traceback;
       
    21 local t_insert, t_remove = table.insert, table.remove;
       
    22 local co_running, co_resume = coroutine.running, coroutine.resume;
    21 
    23 
    22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
    24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
    23 
    25 
    24 local log = module._log;
    26 local log = module._log;
    25 
    27 
    29 
    31 
    30 local sessions = module:shared("sessions");
    32 local sessions = module:shared("sessions");
    31 local core_process_stanza = prosody.core_process_stanza;
    33 local core_process_stanza = prosody.core_process_stanza;
    32 local hosts = prosody.hosts;
    34 local hosts = prosody.hosts;
    33 
    35 
    34 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
    36 local stream_callbacks = { default_ns = "jabber:client" };
    35 local listener = {};
    37 local listener = {};
    36 
    38 
    37 --- Stream events handlers
    39 --- Stream events handlers
    38 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
    40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
    39 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
    41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
   118 end
   120 end
   119 
   121 
   120 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
   122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end
   121 function stream_callbacks.handlestanza(session, stanza)
   123 function stream_callbacks.handlestanza(session, stanza)
   122 	stanza = session.filter("stanzas/in", stanza);
   124 	stanza = session.filter("stanzas/in", stanza);
   123 	if stanza then
   125 	t_insert(session.pending_stanzas, stanza);
   124 		return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
       
   125 	end
       
   126 end
   126 end
   127 
   127 
   128 --- Session methods
   128 --- Session methods
   129 local function session_close(session, reason)
   129 local function session_close(session, reason)
   130 	local log = session.log or log;
   130 	local log = session.log or log;
   222 	function session.reset_stream()
   222 	function session.reset_stream()
   223 		session.notopen = true;
   223 		session.notopen = true;
   224 		session.stream:reset();
   224 		session.stream:reset();
   225 	end
   225 	end
   226 	
   226 	
       
   227 	session.thread = coroutine.create(function (stanza)
       
   228 		while true do
       
   229 			core_process_stanza(session, stanza);
       
   230 			stanza = coroutine.yield("ready");
       
   231 		end
       
   232 	end);
       
   233 
       
   234 	session.pending_stanzas = {};
       
   235 
   227 	local filter = session.filter;
   236 	local filter = session.filter;
   228 	function session.data(data)
   237 	function session.data(data)
   229 		data = filter("bytes/in", data);
   238 		-- Parse the data, which will store stanzas in session.pending_stanzas
   230 		if data then
   239 		if data then
   231 			local ok, err = stream:feed(data);
   240 			data = filter("bytes/in", data);
   232 			if ok then return; end
   241 			if data then
   233 			log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
   242 				local ok, err = stream:feed(data);
   234 			session:close("not-well-formed");
   243 				if not ok then
   235 		end
   244 					log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
   236 	end
   245 					session:close("not-well-formed");
   237 
   246 				end
   238 	
   247 			end
       
   248 		end
       
   249 
       
   250 		if co_running() ~= session.thread and not session.paused then
       
   251 			if session.state == "wait" then
       
   252 				session.state = "ready";
       
   253 				local ok, state = co_resume(session.thread);
       
   254 				if not ok then
       
   255 					log("error", "Traceback[c2s]: %s", state);
       
   256 				elseif state == "wait" then
       
   257 					return;
       
   258 				end
       
   259 			end
       
   260 			-- We're not currently running, so start the thread to process pending stanzas
       
   261 			local s, thread = session.pending_stanzas, session.thread;
       
   262 			local n = #s;
       
   263 			while n > 0 and session.state ~= "wait" do
       
   264 				session.log("debug", "processing %d stanzas", n);
       
   265 				local consumed;
       
   266 				for i = 1,n do
       
   267 					local stanza = s[i];
       
   268 					local ok, state = co_resume(thread, stanza);
       
   269 					if not ok then
       
   270 						log("error", "Traceback[c2s]: %s", state);
       
   271 					elseif state == "wait" then
       
   272 						consumed = i;
       
   273 						session.state = "wait";
       
   274 						break;
       
   275 					end
       
   276 				end
       
   277 				if not consumed then consumed = n; end
       
   278 				for i = 1, #s do
       
   279 					s[i] = s[consumed+i];
       
   280 				end
       
   281 				n = #s;
       
   282 			end
       
   283 		end
       
   284 	end
       
   285 
   239 	if c2s_timeout then
   286 	if c2s_timeout then
   240 		add_task(c2s_timeout, function ()
   287 		add_task(c2s_timeout, function ()
   241 			if session.type == "c2s_unauthed" then
   288 			if session.type == "c2s_unauthed" then
   242 				session:close("connection-timeout");
   289 				session:close("connection-timeout");
   243 			end
   290 			end
   244 		end);
   291 		end);
   245 	end
   292 	end
   246 
   293 
   247 	session.dispatch_stanza = stream_callbacks.handlestanza;
   294 	session.dispatch_stanza = stream_callbacks.handlestanza;
       
   295 
       
   296 	function session:sleep(by)
       
   297 		session.log("debug", "Sleeping for %s", by);
       
   298 		session.paused = by or "?";
       
   299 		session.conn:pause();
       
   300 		if co_running() == session.thread then
       
   301 			coroutine.yield("wait");
       
   302 		end
       
   303 	end
       
   304 	function session:wake(by)
       
   305 		assert(session.paused == (by or "?"));
       
   306 		session.log("debug", "Waking for %s", by);
       
   307 		session.paused = nil;
       
   308 		session.conn:resume();
       
   309 		session.data(); --FIXME: next tick?
       
   310 	end
   248 end
   311 end
   249 
   312 
   250 function listener.onincoming(conn, data)
   313 function listener.onincoming(conn, data)
   251 	local session = sessions[conn];
   314 	local session = sessions[conn];
   252 	if session then
   315 	if session then