core/s2smanager.lua
changeset 4684 dc70c4ffb66d
parent 4518 f2d695d2e31e
parent 4599 f9f5230d7a9c
child 4822 5ef05f32bc42
child 4831 da2c49a9ab99
equal deleted inserted replaced
4529:12621337471f 4684:dc70c4ffb66d
     7 --
     7 --
     8 
     8 
     9 
     9 
    10 
    10 
    11 local hosts = hosts;
    11 local hosts = hosts;
    12 local sessions = sessions;
       
    13 local core_process_stanza = function(a, b) core_process_stanza(a, b); end
    12 local core_process_stanza = function(a, b) core_process_stanza(a, b); end
    14 local add_task = require "util.timer".add_task;
       
    15 local socket = require "socket";
       
    16 local format = string.format;
    13 local format = string.format;
    17 local t_insert, t_sort = table.insert, table.sort;
    14 local t_insert, t_sort = table.insert, table.sort;
    18 local get_traceback = debug.traceback;
    15 local get_traceback = debug.traceback;
    19 local tostring, pairs, ipairs, getmetatable, newproxy, type, error, tonumber, setmetatable
    16 local tostring, pairs, ipairs, getmetatable, newproxy, type, error, tonumber, setmetatable
    20     = tostring, pairs, ipairs, getmetatable, newproxy, type, error, tonumber, setmetatable;
    17     = tostring, pairs, ipairs, getmetatable, newproxy, type, error, tonumber, setmetatable;
    21 
    18 
    22 local idna_to_ascii = require "util.encodings".idna.to_ascii;
       
    23 local connlisteners_get = require "net.connlisteners".get;
       
    24 local initialize_filters = require "util.filters".initialize;
    19 local initialize_filters = require "util.filters".initialize;
    25 local wrapclient = require "net.server".wrapclient;
    20 local wrapclient = require "net.server".wrapclient;
    26 local st = require "stanza";
    21 local st = require "stanza";
    27 local stanza = st.stanza;
    22 local stanza = st.stanza;
    28 local nameprep = require "util.encodings".stringprep.nameprep;
    23 local nameprep = require "util.encodings".stringprep.nameprep;
    39 
    34 
    40 local sha256_hash = require "util.hashes".sha256;
    35 local sha256_hash = require "util.hashes".sha256;
    41 
    36 
    42 local adns, dns = require "net.adns", require "net.dns";
    37 local adns, dns = require "net.adns", require "net.dns";
    43 local config = require "core.configmanager";
    38 local config = require "core.configmanager";
    44 local connect_timeout = config.get("*", "core", "s2s_timeout") or 60;
       
    45 local dns_timeout = config.get("*", "core", "dns_timeout") or 15;
    39 local dns_timeout = config.get("*", "core", "dns_timeout") or 15;
    46 local max_dns_depth = config.get("*", "core", "dns_max_depth") or 3;
    40 local cfg_sources = config.get("*", "core", "s2s_interface")
       
    41 	or config.get("*", "core", "interface");
    47 local sources;
    42 local sources;
    48 
    43 
       
    44 --FIXME: s2sout should create its own resolver w/ timeout
    49 dns.settimeout(dns_timeout);
    45 dns.settimeout(dns_timeout);
    50 
    46 
    51 local prosody = _G.prosody;
    47 local prosody = _G.prosody;
    52 incoming_s2s = {};
    48 incoming_s2s = {};
    53 prosody.incoming_s2s = incoming_s2s;
    49 prosody.incoming_s2s = incoming_s2s;
    54 local incoming_s2s = incoming_s2s;
    50 local incoming_s2s = incoming_s2s;
    55 
    51 
    56 module "s2smanager"
    52 module "s2smanager"
    57 
       
    58 function compare_srv_priorities(a,b)
       
    59 	return a.priority < b.priority or (a.priority == b.priority and a.weight > b.weight);
       
    60 end
       
    61 
       
    62 local bouncy_stanzas = { message = true, presence = true, iq = true };
       
    63 local function bounce_sendq(session, reason)
       
    64 	local sendq = session.sendq;
       
    65 	if sendq then
       
    66 		session.log("info", "sending error replies for "..#sendq.." queued stanzas because of failed outgoing connection to "..tostring(session.to_host));
       
    67 		local dummy = {
       
    68 			type = "s2sin";
       
    69 			send = function(s)
       
    70 				(session.log or log)("error", "Replying to to an s2s error reply, please report this! Traceback: %s", get_traceback());
       
    71 			end;
       
    72 			dummy = true;
       
    73 		};
       
    74 		for i, data in ipairs(sendq) do
       
    75 			local reply = data[2];
       
    76 			if reply and not(reply.attr.xmlns) and bouncy_stanzas[reply.name] then
       
    77 				reply.attr.type = "error";
       
    78 				reply:tag("error", {type = "cancel"})
       
    79 					:tag("remote-server-not-found", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"}):up();
       
    80 				if reason then
       
    81 					reply:tag("text", {xmlns = "urn:ietf:params:xml:ns:xmpp-stanzas"})
       
    82 						:text("Server-to-server connection failed: "..reason):up();
       
    83 				end
       
    84 				core_process_stanza(dummy, reply);
       
    85 			end
       
    86 			sendq[i] = nil;
       
    87 		end
       
    88 		session.sendq = nil;
       
    89 	end
       
    90 end
       
    91 
       
    92 function send_to_host(from_host, to_host, data)
       
    93 	if not hosts[from_host] then
       
    94 		log("warn", "Attempt to send stanza from %s - a host we don't serve", from_host);
       
    95 		return false;
       
    96 	end
       
    97 	local host = hosts[from_host].s2sout[to_host];
       
    98 	if host then
       
    99 		-- We have a connection to this host already
       
   100 		if host.type == "s2sout_unauthed" and (data.name ~= "db:verify" or not host.dialback_key) then
       
   101 			(host.log or log)("debug", "trying to send over unauthed s2sout to "..to_host);
       
   102 			
       
   103 			-- Queue stanza until we are able to send it
       
   104 			if host.sendq then t_insert(host.sendq, {tostring(data), data.attr.type ~= "error" and data.attr.type ~= "result" and st.reply(data)});
       
   105 			else host.sendq = { {tostring(data), data.attr.type ~= "error" and data.attr.type ~= "result" and st.reply(data)} }; end
       
   106 			host.log("debug", "stanza [%s] queued ", data.name);
       
   107 		elseif host.type == "local" or host.type == "component" then
       
   108 			log("error", "Trying to send a stanza to ourselves??")
       
   109 			log("error", "Traceback: %s", get_traceback());
       
   110 			log("error", "Stanza: %s", tostring(data));
       
   111 			return false;
       
   112 		else
       
   113 			(host.log or log)("debug", "going to send stanza to "..to_host.." from "..from_host);
       
   114 			-- FIXME
       
   115 			if host.from_host ~= from_host then
       
   116 				log("error", "WARNING! This might, possibly, be a bug, but it might not...");
       
   117 				log("error", "We are going to send from %s instead of %s", tostring(host.from_host), tostring(from_host));
       
   118 			end
       
   119 			host.sends2s(data);
       
   120 			host.log("debug", "stanza sent over "..host.type);
       
   121 		end
       
   122 	else
       
   123 		log("debug", "opening a new outgoing connection for this stanza");
       
   124 		local host_session = new_outgoing(from_host, to_host);
       
   125 
       
   126 		-- Store in buffer
       
   127 		host_session.sendq = { {tostring(data), data.attr.type ~= "error" and data.attr.type ~= "result" and st.reply(data)} };
       
   128 		log("debug", "stanza [%s] queued until connection complete", tostring(data.name));
       
   129 		if (not host_session.connecting) and (not host_session.conn) then
       
   130 			log("warn", "Connection to %s failed already, destroying session...", to_host);
       
   131 			if not destroy_session(host_session, "Connection failed") then
       
   132 				-- Already destroyed, we need to bounce our stanza
       
   133 				bounce_sendq(host_session, host_session.destruction_reason);
       
   134 			end
       
   135 			return false;
       
   136 		end
       
   137 	end
       
   138 	return true;
       
   139 end
       
   140 
    53 
   141 local open_sessions = 0;
    54 local open_sessions = 0;
   142 
    55 
   143 function new_incoming(conn)
    56 function new_incoming(conn)
   144 	local session = { conn = conn, type = "s2sin_unauthed", direction = "incoming", hosts = {} };
    57 	local session = { conn = conn, type = "s2sin_unauthed", direction = "incoming", hosts = {} };
   145 	if true then
    58 	if true then
   146 		session.trace = newproxy(true);
    59 		session.trace = newproxy(true);
   147 		getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; end;
    60 		getmetatable(session.trace).__gc = function () open_sessions = open_sessions - 1; end;
   148 	end
    61 	end
   149 	open_sessions = open_sessions + 1;
    62 	open_sessions = open_sessions + 1;
   150 	local w, log = conn.write, logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$"));
    63 	session.log = logger_init("s2sin"..tostring(conn):match("[a-f0-9]+$"));
   151 	session.log = log;
       
   152 	local filter = initialize_filters(session);
       
   153 	session.sends2s = function (t)
       
   154 		log("debug", "sending: %s", t.top_tag and t:top_tag() or t:match("^([^>]*>?)"));
       
   155 		if t.name then
       
   156 			t = filter("stanzas/out", t);
       
   157 		end
       
   158 		if t then
       
   159 			t = filter("bytes/out", tostring(t));
       
   160 			if t then
       
   161 				return w(conn, t);
       
   162 			end
       
   163 		end
       
   164 	end
       
   165 	incoming_s2s[session] = true;
    64 	incoming_s2s[session] = true;
   166 	add_task(connect_timeout, function ()
       
   167 		if session.conn ~= conn or
       
   168 		   session.type == "s2sin" then
       
   169 			return; -- Ok, we're connect[ed|ing]
       
   170 		end
       
   171 		-- Not connected, need to close session and clean up
       
   172 		(session.log or log)("debug", "Destroying incomplete session %s->%s due to inactivity",
       
   173 		    session.from_host or "(unknown)", session.to_host or "(unknown)");
       
   174 		session:close("connection-timeout");
       
   175 	end);
       
   176 	return session;
    65 	return session;
   177 end
    66 end
   178 
    67 
   179 function new_outgoing(from_host, to_host, connect)
    68 function new_outgoing(from_host, to_host, connect)
   180 		local host_session = { to_host = to_host, from_host = from_host, host = from_host,
    69 	local host_session = { to_host = to_host, from_host = from_host, host = from_host,
   181 		                       notopen = true, type = "s2sout_unauthed", direction = "outgoing",
    70 		               notopen = true, type = "s2sout_unauthed", direction = "outgoing" };
   182 		                       open_stream = session_open_stream };
    71 	hosts[from_host].s2sout[to_host] = host_session;
   183 		
    72 	local conn_name = "s2sout"..tostring(host_session):match("[a-f0-9]*$");
   184 		hosts[from_host].s2sout[to_host] = host_session;
    73 	host_session.log = logger_init(conn_name);
   185 		
    74 	return host_session;
   186 		host_session.close = destroy_session; -- This gets replaced by xmppserver_listener later
       
   187 		
       
   188 		local log;
       
   189 		do
       
   190 			local conn_name = "s2sout"..tostring(host_session):match("[a-f0-9]*$");
       
   191 			log = logger_init(conn_name);
       
   192 			host_session.log = log;
       
   193 		end
       
   194 		
       
   195 		initialize_filters(host_session);
       
   196 		
       
   197 		if connect ~= false then
       
   198 			-- Kick the connection attempting machine into life
       
   199 			if not attempt_connection(host_session) then
       
   200 				-- Intentionally not returning here, the
       
   201 				-- session is needed, connected or not
       
   202 				destroy_session(host_session);
       
   203 			end
       
   204 		end
       
   205 		
       
   206 		if not host_session.sends2s then
       
   207 			-- A sends2s which buffers data (until the stream is opened)
       
   208 			-- note that data in this buffer will be sent before the stream is authed
       
   209 			-- and will not be ack'd in any way, successful or otherwise
       
   210 			local buffer;
       
   211 			function host_session.sends2s(data)
       
   212 				if not buffer then
       
   213 					buffer = {};
       
   214 					host_session.send_buffer = buffer;
       
   215 				end
       
   216 				log("debug", "Buffering data on unconnected s2sout to %s", to_host);
       
   217 				buffer[#buffer+1] = data;
       
   218 				log("debug", "Buffered item %d: %s", #buffer, tostring(data));
       
   219 			end
       
   220 		end
       
   221 
       
   222 		return host_session;
       
   223 end
       
   224 
       
   225 
       
   226 function attempt_connection(host_session, err)
       
   227 	local from_host, to_host = host_session.from_host, host_session.to_host;
       
   228 	local connect_host, connect_port = to_host and idna_to_ascii(to_host), 5269;
       
   229 	
       
   230 	if not connect_host then
       
   231 		return false;
       
   232 	end
       
   233 	
       
   234 	if not err then -- This is our first attempt
       
   235 		log("debug", "First attempt to connect to %s, starting with SRV lookup...", to_host);
       
   236 		host_session.connecting = true;
       
   237 		local handle;
       
   238 		handle = adns.lookup(function (answer)
       
   239 			handle = nil;
       
   240 			host_session.connecting = nil;
       
   241 			if answer then
       
   242 				log("debug", to_host.." has SRV records, handling...");
       
   243 				local srv_hosts = {};
       
   244 				host_session.srv_hosts = srv_hosts;
       
   245 				for _, record in ipairs(answer) do
       
   246 					t_insert(srv_hosts, record.srv);
       
   247 				end
       
   248 				if #srv_hosts == 1 and srv_hosts[1].target == "." then
       
   249 					log("debug", to_host.." does not provide a XMPP service");
       
   250 					destroy_session(host_session, err); -- Nothing to see here
       
   251 					return;
       
   252 				end
       
   253 				t_sort(srv_hosts, compare_srv_priorities);
       
   254 				
       
   255 				local srv_choice = srv_hosts[1];
       
   256 				host_session.srv_choice = 1;
       
   257 				if srv_choice then
       
   258 					connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
       
   259 					log("debug", "Best record found, will connect to %s:%d", connect_host, connect_port);
       
   260 				end
       
   261 			else
       
   262 				log("debug", to_host.." has no SRV records, falling back to A");
       
   263 			end
       
   264 			-- Try with SRV, or just the plain hostname if no SRV
       
   265 			local ok, err = try_connect(host_session, connect_host, connect_port);
       
   266 			if not ok then
       
   267 				if not attempt_connection(host_session, err) then
       
   268 					-- No more attempts will be made
       
   269 					destroy_session(host_session, err);
       
   270 				end
       
   271 			end
       
   272 		end, "_xmpp-server._tcp."..connect_host..".", "SRV");
       
   273 		
       
   274 		return true; -- Attempt in progress
       
   275 	elseif host_session.ip_hosts then
       
   276 		return try_connect(host_session, connect_host, connect_port, err);
       
   277 	elseif host_session.srv_hosts and #host_session.srv_hosts > host_session.srv_choice then -- Not our first attempt, and we also have SRV
       
   278 		host_session.srv_choice = host_session.srv_choice + 1;
       
   279 		local srv_choice = host_session.srv_hosts[host_session.srv_choice];
       
   280 		connect_host, connect_port = srv_choice.target or to_host, srv_choice.port or connect_port;
       
   281 		host_session.log("info", "Connection failed (%s). Attempt #%d: This time to %s:%d", tostring(err), host_session.srv_choice, connect_host, connect_port);
       
   282 	else
       
   283 		host_session.log("info", "Out of connection options, can't connect to %s", tostring(host_session.to_host));
       
   284 		-- We're out of options
       
   285 		return false;
       
   286 	end
       
   287 	
       
   288 	if not (connect_host and connect_port) then
       
   289 		-- Likely we couldn't resolve DNS
       
   290 		log("warn", "Hmm, we're without a host (%s) and port (%s) to connect to for %s, giving up :(", tostring(connect_host), tostring(connect_port), tostring(to_host));
       
   291 		return false;
       
   292 	end
       
   293 	
       
   294 	return try_connect(host_session, connect_host, connect_port);
       
   295 end
       
   296 
       
   297 function try_next_ip(host_session)
       
   298 	host_session.connecting = nil;
       
   299 	host_session.ip_choice = host_session.ip_choice + 1;
       
   300 	local ip = host_session.ip_hosts[host_session.ip_choice];
       
   301 	local ok, err= make_connect(host_session, ip.ip, ip.port);
       
   302 	if not ok then
       
   303 		if not attempt_connection(host_session, err or "closed") then
       
   304 			err = err and (": "..err) or "";
       
   305 			destroy_session(host_session, "Connection failed"..err);
       
   306 		end
       
   307 	end
       
   308 end
       
   309 
       
   310 function try_connect(host_session, connect_host, connect_port, err)
       
   311 	host_session.connecting = true;
       
   312 
       
   313 	if not err then
       
   314 		local IPs = {};
       
   315 		host_session.ip_hosts = IPs;
       
   316 		local handle4, handle6;
       
   317 		local has_other = false;
       
   318 
       
   319 		if not sources then
       
   320 			sources =  {};
       
   321 			local cfg_sources = config.get("*", "core", "interface") or connlisteners_get("xmppserver").default_interface;
       
   322 			if type(cfg_sources) == "string" then
       
   323 				cfg_sources = { cfg_sources };
       
   324 			end
       
   325 			for i, source in ipairs(cfg_sources) do
       
   326 				if source == "*" then
       
   327 					sources[i] = new_ip("0.0.0.0", "IPv4");
       
   328 				else
       
   329 					sources[i] = new_ip(source, (source:find(":") and "IPv6") or "IPv4");
       
   330 				end
       
   331 			end
       
   332 		end
       
   333 
       
   334 		handle4 = adns.lookup(function (reply, err)
       
   335 			handle4 = nil;
       
   336 
       
   337 			-- COMPAT: This is a compromise for all you CNAME-(ab)users :)
       
   338 			if not (reply and reply[#reply] and reply[#reply].a) then
       
   339 				local count = max_dns_depth;
       
   340 				reply = dns.peek(connect_host, "CNAME", "IN");
       
   341 				while count > 0 and reply and reply[#reply] and not reply[#reply].a and reply[#reply].cname do
       
   342 					log("debug", "Looking up %s (DNS depth is %d)", tostring(reply[#reply].cname), count);
       
   343 					reply = dns.peek(reply[#reply].cname, "A", "IN") or dns.peek(reply[#reply].cname, "CNAME", "IN");
       
   344 					count = count - 1;
       
   345 				end
       
   346 			end
       
   347 			-- end of CNAME resolving
       
   348 
       
   349 			if reply and reply[#reply] and reply[#reply].a then
       
   350 				for _, ip in ipairs(reply) do
       
   351 					log("debug", "DNS reply for %s gives us %s", connect_host, ip.a);
       
   352 					IPs[#IPs+1] = new_ip(ip.a, "IPv4");
       
   353 				end
       
   354 			end
       
   355 
       
   356 			if has_other then
       
   357 				if #IPs > 0 then
       
   358 					rfc3484_dest(host_session.ip_hosts, sources);
       
   359 					for i = 1, #IPs do
       
   360 						IPs[i] = {ip = IPs[i], port = connect_port};
       
   361 					end
       
   362 					host_session.ip_choice = 0;
       
   363 					try_next_ip(host_session);
       
   364 				else
       
   365 					log("debug", "DNS lookup failed to get a response for %s", connect_host);
       
   366 					host_session.ip_hosts = nil;
       
   367 					if not attempt_connection(host_session, "name resolution failed") then -- Retry if we can
       
   368 						log("debug", "No other records to try for %s - destroying", host_session.to_host);
       
   369 						err = err and (": "..err) or "";
       
   370 						destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
       
   371 					end
       
   372 				end
       
   373 			else
       
   374 				has_other = true;
       
   375 			end
       
   376 		end, connect_host, "A", "IN");
       
   377 
       
   378 		handle6 = adns.lookup(function (reply, err)
       
   379 			handle6 = nil;
       
   380 
       
   381 			if reply and reply[#reply] and reply[#reply].aaaa then
       
   382 				for _, ip in ipairs(reply) do
       
   383 					log("debug", "DNS reply for %s gives us %s", connect_host, ip.aaaa);
       
   384 					IPs[#IPs+1] = new_ip(ip.aaaa, "IPv6");
       
   385 				end
       
   386 			end
       
   387 
       
   388 			if has_other then
       
   389 				if #IPs > 0 then
       
   390 					rfc3484_dest(host_session.ip_hosts, sources);
       
   391 					for i = 1, #IPs do
       
   392 						IPs[i] = {ip = IPs[i], port = connect_port};
       
   393 					end
       
   394 					host_session.ip_choice = 0;
       
   395 					try_next_ip(host_session);
       
   396 				else
       
   397 					log("debug", "DNS lookup failed to get a response for %s", connect_host);
       
   398 					host_session.ip_hosts = nil;
       
   399 					if not attempt_connection(host_session, "name resolution failed") then -- Retry if we can
       
   400 						log("debug", "No other records to try for %s - destroying", host_session.to_host);
       
   401 						err = err and (": "..err) or "";
       
   402 						destroy_session(host_session, "DNS resolution failed"..err); -- End of the line, we can't
       
   403 					end
       
   404 				end
       
   405 			else
       
   406 				has_other = true;
       
   407 			end
       
   408 		end, connect_host, "AAAA", "IN");
       
   409 
       
   410 		return true;
       
   411 	elseif host_session.ip_hosts and #host_session.ip_hosts > host_session.ip_choice then -- Not our first attempt, and we also have IPs left to try
       
   412 		try_next_ip(host_session);
       
   413 	else
       
   414 		host_session.ip_hosts = nil;
       
   415 		if not attempt_connection(host_session, "out of IP addresses") then -- Retry if we can
       
   416 			log("debug", "No other records to try for %s - destroying", host_session.to_host);
       
   417 			err = err and (": "..err) or "";
       
   418 			destroy_session(host_session, "Connecting failed"..err); -- End of the line, we can't
       
   419 			return false;
       
   420 		end
       
   421 	end
       
   422 
       
   423 	return true;
       
   424 end
       
   425 
       
   426 function make_connect(host_session, connect_host, connect_port)
       
   427 	(host_session.log or log)("info", "Beginning new connection attempt to %s ([%s]:%d)", host_session.to_host, connect_host.addr, connect_port);
       
   428 	-- Ok, we're going to try to connect
       
   429 	
       
   430 	local from_host, to_host = host_session.from_host, host_session.to_host;
       
   431 	
       
   432 	local conn, handler;
       
   433 	if connect_host.proto == "IPv4" then
       
   434 		conn, handler = socket.tcp();
       
   435 	elseif socket.tcp6 then
       
   436 		conn, handler = socket.tcp6();
       
   437 	end
       
   438 	
       
   439 	if not conn then
       
   440 		log("warn", "Failed to create outgoing connection, system error: %s", handler);
       
   441 		return false, handler;
       
   442 	end
       
   443 
       
   444 	conn:settimeout(0);
       
   445 	local success, err = conn:connect(connect_host.addr, connect_port);
       
   446 	if not success and err ~= "timeout" then
       
   447 		log("warn", "s2s connect() to %s (%s:%d) failed: %s", host_session.to_host, connect_host.addr, connect_port, err);
       
   448 		return false, err;
       
   449 	end
       
   450 	
       
   451 	local cl = connlisteners_get("xmppserver");
       
   452 	conn = wrapclient(conn, connect_host.addr, connect_port, cl, cl.default_mode or 1 );
       
   453 	host_session.conn = conn;
       
   454 	
       
   455 	local filter = initialize_filters(host_session);
       
   456 	local w, log = conn.write, host_session.log;
       
   457 	host_session.sends2s = function (t)
       
   458 		log("debug", "sending: %s", (t.top_tag and t:top_tag()) or t:match("^[^>]*>?"));
       
   459 		if t.name then
       
   460 			t = filter("stanzas/out", t);
       
   461 		end
       
   462 		if t then
       
   463 			t = filter("bytes/out", tostring(t));
       
   464 			if t then
       
   465 				return w(conn, tostring(t));
       
   466 			end
       
   467 		end
       
   468 	end
       
   469 	
       
   470 	-- Register this outgoing connection so that xmppserver_listener knows about it
       
   471 	-- otherwise it will assume it is a new incoming connection
       
   472 	cl.register_outgoing(conn, host_session);
       
   473 	
       
   474 	host_session:open_stream(from_host, to_host);
       
   475 	
       
   476 	log("debug", "Connection attempt in progress...");
       
   477 	add_task(connect_timeout, function ()
       
   478 		if host_session.conn ~= conn or
       
   479 		   host_session.type == "s2sout" or
       
   480 		   host_session.connecting then
       
   481 			return; -- Ok, we're connect[ed|ing]
       
   482 		end
       
   483 		-- Not connected, need to close session and clean up
       
   484 		(host_session.log or log)("warn", "Destroying incomplete session %s->%s due to inactivity",
       
   485 		    host_session.from_host or "(unknown)", host_session.to_host or "(unknown)");
       
   486 		host_session:close("connection-timeout");
       
   487 	end);
       
   488 	return true;
       
   489 end
       
   490 
       
   491 function session_open_stream(session, from, to)
       
   492 	session.sends2s(st.stanza("stream:stream", {
       
   493 		xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
       
   494 		["xmlns:stream"]='http://etherx.jabber.org/streams',
       
   495 		from=from, to=to, version='1.0', ["xml:lang"]='en'}):top_tag());
       
   496 end
       
   497 
       
   498 local function check_cert_status(session)
       
   499 	local conn = session.conn:socket()
       
   500 	local cert
       
   501 	if conn.getpeercertificate then
       
   502 		cert = conn:getpeercertificate()
       
   503 	end
       
   504 
       
   505 	if cert then
       
   506 		local chain_valid, errors = conn:getpeerverification()
       
   507 		-- Is there any interest in printing out all/the number of errors here?
       
   508 		if not chain_valid then
       
   509 			(session.log or log)("debug", "certificate chain validation result: invalid");
       
   510 			session.cert_chain_status = "invalid";
       
   511 		else
       
   512 			(session.log or log)("debug", "certificate chain validation result: valid");
       
   513 			session.cert_chain_status = "valid";
       
   514 
       
   515 			local host;
       
   516 			if session.direction == "incoming" then
       
   517 				host = session.from_host;
       
   518 			else
       
   519 				host = session.to_host;
       
   520 			end
       
   521 
       
   522 			-- We'll go ahead and verify the asserted identity if the
       
   523 			-- connecting server specified one.
       
   524 			if host then
       
   525 				if cert_verify_identity(host, "xmpp-server", cert) then
       
   526 					session.cert_identity_status = "valid"
       
   527 				else
       
   528 					session.cert_identity_status = "invalid"
       
   529 				end
       
   530 			end
       
   531 		end
       
   532 	end
       
   533 end
       
   534 
       
   535 function streamopened(session, attr)
       
   536 	local send = session.sends2s;
       
   537 	
       
   538 	-- TODO: #29: SASL/TLS on s2s streams
       
   539 	session.version = tonumber(attr.version) or 0;
       
   540 	
       
   541 	-- TODO: Rename session.secure to session.encrypted
       
   542 	if session.secure == false then
       
   543 		session.secure = true;
       
   544 	end
       
   545 
       
   546 	if session.direction == "incoming" then
       
   547 		-- Send a reply stream header
       
   548 		session.to_host = attr.to and nameprep(attr.to);
       
   549 		session.from_host = attr.from and nameprep(attr.from);
       
   550 	
       
   551 		session.streamid = uuid_gen();
       
   552 		(session.log or log)("debug", "Incoming s2s received %s", st.stanza("stream:stream", attr):top_tag());
       
   553 		if session.to_host then
       
   554 			if not hosts[session.to_host] then
       
   555 				-- Attempting to connect to a host we don't serve
       
   556 				session:close({
       
   557 					condition = "host-unknown";
       
   558 					text = "This host does not serve "..session.to_host
       
   559 				});
       
   560 				return;
       
   561 			elseif hosts[session.to_host].disallow_s2s then
       
   562 				-- Attempting to connect to a host that disallows s2s
       
   563 				session:close({
       
   564 					condition = "policy-violation";
       
   565 					text = "Server-to-server communication is not allowed to this host";
       
   566 				});
       
   567 				return;
       
   568 			end
       
   569 		end
       
   570 
       
   571 		if session.secure and not session.cert_chain_status then check_cert_status(session); end
       
   572 
       
   573 		send("<?xml version='1.0'?>");
       
   574 		send(stanza("stream:stream", { xmlns='jabber:server', ["xmlns:db"]='jabber:server:dialback',
       
   575 				["xmlns:stream"]='http://etherx.jabber.org/streams', id=session.streamid, from=session.to_host, to=session.from_host, version=(session.version > 0 and "1.0" or nil) }):top_tag());
       
   576 		if session.version >= 1.0 then
       
   577 			local features = st.stanza("stream:features");
       
   578 			
       
   579 			if session.to_host then
       
   580 				hosts[session.to_host].events.fire_event("s2s-stream-features", { origin = session, features = features });
       
   581 			else
       
   582 				(session.log or log)("warn", "No 'to' on stream header from %s means we can't offer any features", session.from_host or "unknown host");
       
   583 			end
       
   584 			
       
   585 			log("debug", "Sending stream features: %s", tostring(features));
       
   586 			send(features);
       
   587 		end
       
   588 	elseif session.direction == "outgoing" then
       
   589 		-- If we are just using the connection for verifying dialback keys, we won't try and auth it
       
   590 		if not attr.id then error("stream response did not give us a streamid!!!"); end
       
   591 		session.streamid = attr.id;
       
   592 
       
   593 		if session.secure and not session.cert_chain_status then check_cert_status(session); end
       
   594 
       
   595 		-- Send unauthed buffer
       
   596 		-- (stanzas which are fine to send before dialback)
       
   597 		-- Note that this is *not* the stanza queue (which
       
   598 		-- we can only send if auth succeeds) :)
       
   599 		local send_buffer = session.send_buffer;
       
   600 		if send_buffer and #send_buffer > 0 then
       
   601 			log("debug", "Sending s2s send_buffer now...");
       
   602 			for i, data in ipairs(send_buffer) do
       
   603 				session.sends2s(tostring(data));
       
   604 				send_buffer[i] = nil;
       
   605 			end
       
   606 		end
       
   607 		session.send_buffer = nil;
       
   608 	
       
   609 		-- If server is pre-1.0, don't wait for features, just do dialback
       
   610 		if session.version < 1.0 then
       
   611 			if not session.dialback_verifying then
       
   612 				log("debug", "Initiating dialback...");
       
   613 				initiate_dialback(session);
       
   614 			else
       
   615 				mark_connected(session);
       
   616 			end
       
   617 		end
       
   618 	end
       
   619 	session.notopen = nil;
       
   620 end
       
   621 
       
   622 function streamclosed(session)
       
   623 	(session.log or log)("debug", "Received </stream:stream>");
       
   624 	session:close();
       
   625 end
       
   626 
       
   627 function initiate_dialback(session)
       
   628 	-- generate dialback key
       
   629 	session.dialback_key = generate_dialback(session.streamid, session.to_host, session.from_host);
       
   630 	session.sends2s(format("<db:result from='%s' to='%s'>%s</db:result>", session.from_host, session.to_host, session.dialback_key));
       
   631 	session.log("info", "sent dialback key on outgoing s2s stream");
       
   632 end
       
   633 
       
   634 function generate_dialback(id, to, from)
       
   635 	return sha256_hash(id..to..from..hosts[from].dialback_secret, true);
       
   636 end
       
   637 
       
   638 function verify_dialback(id, to, from, key)
       
   639 	return key == generate_dialback(id, to, from);
       
   640 end
    75 end
   641 
    76 
   642 function make_authenticated(session, host)
    77 function make_authenticated(session, host)
   643 	if not session.secure then
    78 	if not session.secure then
   644 		local local_host = session.direction == "incoming" and session.to_host or session.from_host;
    79 		local local_host = session.direction == "incoming" and session.to_host or session.from_host;
   676 	local sendq, send = session.sendq, session.sends2s;
   111 	local sendq, send = session.sendq, session.sends2s;
   677 	
   112 	
   678 	local from, to = session.from_host, session.to_host;
   113 	local from, to = session.from_host, session.to_host;
   679 	
   114 	
   680 	session.log("info", session.direction.." s2s connection "..from.."->"..to.." complete");
   115 	session.log("info", session.direction.." s2s connection "..from.."->"..to.." complete");
   681 	
   116 
   682 	local send_to_host = send_to_host;
       
   683 	function session.send(data) return send_to_host(to, from, data); end
       
   684 	
       
   685 	local event_data = { session = session };
   117 	local event_data = { session = session };
   686 	if session.type == "s2sout" then
   118 	if session.type == "s2sout" then
   687 		prosody.events.fire_event("s2sout-established", event_data);
   119 		prosody.events.fire_event("s2sout-established", event_data);
   688 		hosts[session.from_host].events.fire_event("s2sout-established", event_data);
   120 		hosts[session.from_host].events.fire_event("s2sout-established", event_data);
   689 	else
   121 	else
   699 				sendq[i] = nil;
   131 				sendq[i] = nil;
   700 			end
   132 			end
   701 			session.sendq = nil;
   133 			session.sendq = nil;
   702 		end
   134 		end
   703 		
   135 		
       
   136 		session.ip_hosts = nil;
   704 		session.srv_hosts = nil;
   137 		session.srv_hosts = nil;
   705 	end
   138 	end
   706 end
   139 end
   707 
   140 
   708 local resting_session = { -- Resting, not dead
   141 local resting_session = { -- Resting, not dead
   736 	if session.destroyed then return; end
   169 	if session.destroyed then return; end
   737 	(session.log or log)("debug", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)..(reason and (": "..reason) or ""));
   170 	(session.log or log)("debug", "Destroying "..tostring(session.direction).." session "..tostring(session.from_host).."->"..tostring(session.to_host)..(reason and (": "..reason) or ""));
   738 	
   171 	
   739 	if session.direction == "outgoing" then
   172 	if session.direction == "outgoing" then
   740 		hosts[session.from_host].s2sout[session.to_host] = nil;
   173 		hosts[session.from_host].s2sout[session.to_host] = nil;
   741 		bounce_sendq(session, reason);
   174 		session:bounce_sendq(reason);
   742 	elseif session.direction == "incoming" then
   175 	elseif session.direction == "incoming" then
   743 		incoming_s2s[session] = nil;
   176 		incoming_s2s[session] = nil;
   744 	end
   177 	end
   745 	
   178 	
   746 	local event_data = { session = session, reason = reason };
   179 	local event_data = { session = session, reason = reason };