plugins/mod_s2s/mod_s2s.lua
changeset 10124 756b8821007a
parent 10119 c0bd5daa9c7f
child 10230 77f900bbbf25
equal deleted inserted replaced
10123:29733134c76c 10124:756b8821007a
    25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
    25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing;
    26 local s2s_destroy_session = require "core.s2smanager".destroy_session;
    26 local s2s_destroy_session = require "core.s2smanager".destroy_session;
    27 local uuid_gen = require "util.uuid".generate;
    27 local uuid_gen = require "util.uuid".generate;
    28 local fire_global_event = prosody.events.fire_event;
    28 local fire_global_event = prosody.events.fire_event;
    29 local runner = require "util.async".runner;
    29 local runner = require "util.async".runner;
    30 
    30 local connect = require "net.connect".connect;
    31 local s2sout = module:require("s2sout");
    31 local service = require "net.resolvers.service";
    32 
    32 
    33 local connect_timeout = module:get_option_number("s2s_timeout", 90);
    33 local connect_timeout = module:get_option_number("s2s_timeout", 90);
    34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
    34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5);
    35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true));
    35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true));
    36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day...
    36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day...
    42 local measure_ipv6 = module:measure("ipv6", "amount");
    42 local measure_ipv6 = module:measure("ipv6", "amount");
    43 
    43 
    44 local sessions = module:shared("sessions");
    44 local sessions = module:shared("sessions");
    45 
    45 
    46 local runner_callbacks = {};
    46 local runner_callbacks = {};
       
    47 
       
    48 local listener = {};
    47 
    49 
    48 local log = module._log;
    50 local log = module._log;
    49 
    51 
    50 module:hook("stats-update", function ()
    52 module:hook("stats-update", function ()
    51 	local count = 0;
    53 	local count = 0;
   152 -- Create a new outgoing session for a stanza
   154 -- Create a new outgoing session for a stanza
   153 function route_to_new_session(event)
   155 function route_to_new_session(event)
   154 	local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
   156 	local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza;
   155 	log("debug", "opening a new outgoing connection for this stanza");
   157 	log("debug", "opening a new outgoing connection for this stanza");
   156 	local host_session = s2s_new_outgoing(from_host, to_host);
   158 	local host_session = s2s_new_outgoing(from_host, to_host);
       
   159 	host_session.version = 1;
   157 
   160 
   158 	-- Store in buffer
   161 	-- Store in buffer
   159 	host_session.bounce_sendq = bounce_sendq;
   162 	host_session.bounce_sendq = bounce_sendq;
   160 	host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
   163 	host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} };
   161 	log("debug", "stanza [%s] queued until connection complete", stanza.name);
   164 	log("debug", "stanza [%s] queued until connection complete", stanza.name);
   162 	s2sout.initiate_connection(host_session);
   165 	connect(service.new(to_host, "xmpp-server", "tcp", { default_port = 5269 }), listener, nil, { session = host_session });
   163 	if (not host_session.connecting) and (not host_session.conn) then
       
   164 		log("warn", "Connection to %s failed already, destroying session...", to_host);
       
   165 		s2s_destroy_session(host_session, "Connection failed");
       
   166 		return false;
       
   167 	end
       
   168 	return true;
   166 	return true;
   169 end
   167 end
   170 
   168 
   171 local function keepalive(event)
   169 local function keepalive(event)
   172 	return event.session.sends2s(' ');
   170 	return event.session.sends2s(' ');
   476 		text = condition .. (text and (" ("..text..")") or "");
   474 		text = condition .. (text and (" ("..text..")") or "");
   477 		session.log("info", "Session closed by remote with error: %s", text);
   475 		session.log("info", "Session closed by remote with error: %s", text);
   478 		session:close(nil, text);
   476 		session:close(nil, text);
   479 	end
   477 	end
   480 end
   478 end
   481 
       
   482 local listener = {};
       
   483 
   479 
   484 --- Session methods
   480 --- Session methods
   485 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
   481 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
   486 local function session_close(session, reason, remote_reason)
   482 local function session_close(session, reason, remote_reason)
   487 	local log = session.log or log;
   483 	local log = session.log or log;
   677 
   673 
   678 function listener.ondisconnect(conn, err)
   674 function listener.ondisconnect(conn, err)
   679 	local session = sessions[conn];
   675 	local session = sessions[conn];
   680 	if session then
   676 	if session then
   681 		sessions[conn] = nil;
   677 		sessions[conn] = nil;
       
   678 		(session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
       
   679 		s2s_destroy_session(session, err);
       
   680 	end
       
   681 end
       
   682 
       
   683 function listener.onfail(data, err)
       
   684 	local session = data and data.session;
       
   685 	if session then
   682 		if err and session.direction == "outgoing" and session.notopen then
   686 		if err and session.direction == "outgoing" and session.notopen then
   683 			(session.log or log)("debug", "s2s connection attempt failed: %s", err);
   687 			(session.log or log)("debug", "s2s connection attempt failed: %s", err);
   684 			if s2sout.attempt_connection(session, err) then
       
   685 				return; -- Session lives for now
       
   686 			end
       
   687 		end
   688 		end
   688 		(session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
   689 		(session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed");
   689 		s2s_destroy_session(session, err);
   690 		s2s_destroy_session(session, err);
   690 	end
   691 	end
   691 end
   692 end
   703 	initialize_session(session);
   704 	initialize_session(session);
   704 end
   705 end
   705 
   706 
   706 function listener.ondetach(conn)
   707 function listener.ondetach(conn)
   707 	sessions[conn] = nil;
   708 	sessions[conn] = nil;
       
   709 end
       
   710 
       
   711 function listener.onattach(conn, data)
       
   712 	local session = data and data.session;
       
   713 	if session then
       
   714 		session.conn = conn;
       
   715 		sessions[conn] = session;
       
   716 		initialize_session(session);
       
   717 	end
   708 end
   718 end
   709 
   719 
   710 function check_auth_policy(event)
   720 function check_auth_policy(event)
   711 	local host, session = event.host, event.session;
   721 	local host, session = event.host, event.session;
   712 	local must_secure = secure_auth;
   722 	local must_secure = secure_auth;
   727 		return false;
   737 		return false;
   728 	end
   738 	end
   729 end
   739 end
   730 
   740 
   731 module:hook("s2s-check-certificate", check_auth_policy, -1);
   741 module:hook("s2s-check-certificate", check_auth_policy, -1);
   732 
       
   733 s2sout.set_listener(listener);
       
   734 
   742 
   735 module:hook("server-stopping", function(event)
   743 module:hook("server-stopping", function(event)
   736 	local reason = event.reason;
   744 	local reason = event.reason;
   737 	for _, session in pairs(sessions) do
   745 	for _, session in pairs(sessions) do
   738 		session:close{ condition = "system-shutdown", text = reason };
   746 		session:close{ condition = "system-shutdown", text = reason };