25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; |
25 local s2s_new_outgoing = require "core.s2smanager".new_outgoing; |
26 local s2s_destroy_session = require "core.s2smanager".destroy_session; |
26 local s2s_destroy_session = require "core.s2smanager".destroy_session; |
27 local uuid_gen = require "util.uuid".generate; |
27 local uuid_gen = require "util.uuid".generate; |
28 local fire_global_event = prosody.events.fire_event; |
28 local fire_global_event = prosody.events.fire_event; |
29 local runner = require "util.async".runner; |
29 local runner = require "util.async".runner; |
30 |
30 local connect = require "net.connect".connect; |
31 local s2sout = module:require("s2sout"); |
31 local service = require "net.resolvers.service"; |
32 |
32 |
33 local connect_timeout = module:get_option_number("s2s_timeout", 90); |
33 local connect_timeout = module:get_option_number("s2s_timeout", 90); |
34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); |
34 local stream_close_timeout = module:get_option_number("s2s_close_timeout", 5); |
35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); |
35 local opt_keepalives = module:get_option_boolean("s2s_tcp_keepalives", module:get_option_boolean("tcp_keepalives", true)); |
36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... |
36 local secure_auth = module:get_option_boolean("s2s_secure_auth", false); -- One day... |
42 local measure_ipv6 = module:measure("ipv6", "amount"); |
42 local measure_ipv6 = module:measure("ipv6", "amount"); |
43 |
43 |
44 local sessions = module:shared("sessions"); |
44 local sessions = module:shared("sessions"); |
45 |
45 |
46 local runner_callbacks = {}; |
46 local runner_callbacks = {}; |
|
47 |
|
48 local listener = {}; |
47 |
49 |
48 local log = module._log; |
50 local log = module._log; |
49 |
51 |
50 module:hook("stats-update", function () |
52 module:hook("stats-update", function () |
51 local count = 0; |
53 local count = 0; |
152 -- Create a new outgoing session for a stanza |
154 -- Create a new outgoing session for a stanza |
153 function route_to_new_session(event) |
155 function route_to_new_session(event) |
154 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; |
156 local from_host, to_host, stanza = event.from_host, event.to_host, event.stanza; |
155 log("debug", "opening a new outgoing connection for this stanza"); |
157 log("debug", "opening a new outgoing connection for this stanza"); |
156 local host_session = s2s_new_outgoing(from_host, to_host); |
158 local host_session = s2s_new_outgoing(from_host, to_host); |
|
159 host_session.version = 1; |
157 |
160 |
158 -- Store in buffer |
161 -- Store in buffer |
159 host_session.bounce_sendq = bounce_sendq; |
162 host_session.bounce_sendq = bounce_sendq; |
160 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; |
163 host_session.sendq = { {tostring(stanza), stanza.attr.type ~= "error" and stanza.attr.type ~= "result" and st.reply(stanza)} }; |
161 log("debug", "stanza [%s] queued until connection complete", stanza.name); |
164 log("debug", "stanza [%s] queued until connection complete", stanza.name); |
162 s2sout.initiate_connection(host_session); |
165 connect(service.new(to_host, "xmpp-server", "tcp", { default_port = 5269 }), listener, nil, { session = host_session }); |
163 if (not host_session.connecting) and (not host_session.conn) then |
|
164 log("warn", "Connection to %s failed already, destroying session...", to_host); |
|
165 s2s_destroy_session(host_session, "Connection failed"); |
|
166 return false; |
|
167 end |
|
168 return true; |
166 return true; |
169 end |
167 end |
170 |
168 |
171 local function keepalive(event) |
169 local function keepalive(event) |
172 return event.session.sends2s(' '); |
170 return event.session.sends2s(' '); |
476 text = condition .. (text and (" ("..text..")") or ""); |
474 text = condition .. (text and (" ("..text..")") or ""); |
477 session.log("info", "Session closed by remote with error: %s", text); |
475 session.log("info", "Session closed by remote with error: %s", text); |
478 session:close(nil, text); |
476 session:close(nil, text); |
479 end |
477 end |
480 end |
478 end |
481 |
|
482 local listener = {}; |
|
483 |
479 |
484 --- Session methods |
480 --- Session methods |
485 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
481 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
486 local function session_close(session, reason, remote_reason) |
482 local function session_close(session, reason, remote_reason) |
487 local log = session.log or log; |
483 local log = session.log or log; |
677 |
673 |
678 function listener.ondisconnect(conn, err) |
674 function listener.ondisconnect(conn, err) |
679 local session = sessions[conn]; |
675 local session = sessions[conn]; |
680 if session then |
676 if session then |
681 sessions[conn] = nil; |
677 sessions[conn] = nil; |
|
678 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); |
|
679 s2s_destroy_session(session, err); |
|
680 end |
|
681 end |
|
682 |
|
683 function listener.onfail(data, err) |
|
684 local session = data and data.session; |
|
685 if session then |
682 if err and session.direction == "outgoing" and session.notopen then |
686 if err and session.direction == "outgoing" and session.notopen then |
683 (session.log or log)("debug", "s2s connection attempt failed: %s", err); |
687 (session.log or log)("debug", "s2s connection attempt failed: %s", err); |
684 if s2sout.attempt_connection(session, err) then |
|
685 return; -- Session lives for now |
|
686 end |
|
687 end |
688 end |
688 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); |
689 (session.log or log)("debug", "s2s disconnected: %s->%s (%s)", session.from_host, session.to_host, err or "connection closed"); |
689 s2s_destroy_session(session, err); |
690 s2s_destroy_session(session, err); |
690 end |
691 end |
691 end |
692 end |
703 initialize_session(session); |
704 initialize_session(session); |
704 end |
705 end |
705 |
706 |
706 function listener.ondetach(conn) |
707 function listener.ondetach(conn) |
707 sessions[conn] = nil; |
708 sessions[conn] = nil; |
|
709 end |
|
710 |
|
711 function listener.onattach(conn, data) |
|
712 local session = data and data.session; |
|
713 if session then |
|
714 session.conn = conn; |
|
715 sessions[conn] = session; |
|
716 initialize_session(session); |
|
717 end |
708 end |
718 end |
709 |
719 |
710 function check_auth_policy(event) |
720 function check_auth_policy(event) |
711 local host, session = event.host, event.session; |
721 local host, session = event.host, event.session; |
712 local must_secure = secure_auth; |
722 local must_secure = secure_auth; |
727 return false; |
737 return false; |
728 end |
738 end |
729 end |
739 end |
730 |
740 |
731 module:hook("s2s-check-certificate", check_auth_policy, -1); |
741 module:hook("s2s-check-certificate", check_auth_policy, -1); |
732 |
|
733 s2sout.set_listener(listener); |
|
734 |
742 |
735 module:hook("server-stopping", function(event) |
743 module:hook("server-stopping", function(event) |
736 local reason = event.reason; |
744 local reason = event.reason; |
737 for _, session in pairs(sessions) do |
745 for _, session in pairs(sessions) do |
738 session:close{ condition = "system-shutdown", text = reason }; |
746 session:close{ condition = "system-shutdown", text = reason }; |