plugins/mod_s2s/mod_s2s.lua
changeset 7454 464a8a8de625
parent 7453 3ae19750cd46
child 7470 9a73c85baffe
--- a/plugins/mod_s2s/mod_s2s.lua	Mon May 30 13:30:53 2016 +0200
+++ b/plugins/mod_s2s/mod_s2s.lua	Mon May 30 13:36:43 2016 +0200
@@ -26,6 +26,7 @@
 local s2s_destroy_session = require "core.s2smanager".destroy_session;
 local uuid_gen = require "util.uuid".generate;
 local fire_global_event = prosody.events.fire_event;
+local runner = require "util.async".runner;
 
 local s2sout = module:require("s2sout");
 
@@ -41,6 +42,8 @@
 
 local sessions = module:shared("sessions");
 
+local runner_callbacks = {};
+
 local log = module._log;
 
 --- Handle stanzas to remote domains
@@ -257,11 +260,21 @@
 
 --- XMPP stream event handlers
 
-local stream_callbacks = { default_ns = "jabber:server", handlestanza =  core_process_stanza };
+local stream_callbacks = { default_ns = "jabber:server" };
+
+function stream_callbacks.handlestanza(session, stanza)
+	stanza = session.filter("stanzas/in", stanza);
+	session.thread:run(stanza);
+end
 
 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
 
 function stream_callbacks.streamopened(session, attr)
+	-- run _streamopened in async context
+	session.thread:run({ attr = attr });
+end
+
+function stream_callbacks._streamopened(session, attr)
 	session.version = tonumber(attr.version) or 0;
 
 	-- TODO: Rename session.secure to session.encrypted
@@ -435,14 +448,6 @@
 	end
 end
 
-local function handleerr(err) log("error", "Traceback[s2s]: %s", traceback(tostring(err), 2)); end
-function stream_callbacks.handlestanza(session, stanza)
-	stanza = session.filter("stanzas/in", stanza);
-	if stanza then
-		return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
-	end
-end
-
 local listener = {};
 
 --- Session methods
@@ -517,6 +522,15 @@
 -- Session initialization logic shared by incoming and outgoing
 local function initialize_session(session)
 	local stream = new_xmpp_stream(session, stream_callbacks);
+
+	session.thread = runner(function (stanza)
+		if stanza.name == nil then
+			stream_callbacks._streamopened(session, stanza.attr);
+		else
+			core_process_stanza(session, stanza);
+		end
+	end, runner_callbacks, session);
+
 	local log = session.log or log;
 	session.stream = stream;
 
@@ -580,6 +594,20 @@
 	end);
 end
 
+function runner_callbacks:ready()
+	self.data.log("debug", "Runner %s ready (%s)", self.thread, coroutine.status(self.thread));
+	self.data.conn:resume();
+end
+
+function runner_callbacks:waiting()
+	self.data.log("debug", "Runner %s waiting (%s)", self.thread, coroutine.status(self.thread));
+	self.data.conn:pause();
+end
+
+function runner_callbacks:error(err)
+	(self.data.log or log)("error", "Traceback[s2s]: %s", err);
+end
+
 function listener.onconnect(conn)
 	measure_connections(1);
 	conn:setoption("keepalive", opt_keepalives);