mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
authorMatthew Wild <mwild1@gmail.com>
Mon, 01 Jun 2020 15:42:19 +0100
changeset 10859 70ac7d23673d
parent 10858 472fe13a05f9
child 10860 c99711eda0d1
mod_admin_socket, util.adminstream: New module to manage a local unix domain socket for admin functionality
plugins/mod_admin_socket.lua
util/adminstream.lua
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/plugins/mod_admin_socket.lua	Mon Jun 01 15:42:19 2020 +0100
@@ -0,0 +1,69 @@
+module:set_global();
+
+local have_unix, unix = pcall(require, "socket.unix");
+
+if not have_unix or type(unix) ~= "table" then
+	module:log_status("error", "LuaSocket unix socket support not available or incompatible, ensure it is up to date");
+	return;
+end
+
+local server = require "net.server";
+
+local adminstream = require "util.adminstream";
+
+local socket_path = module:get_option_string("admin_socket", prosody.paths.data.."/prosody.sock");
+
+local sessions = module:shared("sessions");
+
+local function fire_admin_event(session, stanza)
+	local event_data = {
+		origin = session, stanza = stanza;
+	};
+	local event_name;
+	if stanza.attr.xmlns then
+		event_name = "admin/"..stanza.attr.xmlns..":"..stanza.name;
+	else
+		event_name = "admin/"..stanza.name;
+	end
+	module:log("debug", "Firing %s", event_name);
+	return module:fire_event(event_name, event_data);
+end
+
+module:hook("server-stopping", function ()
+	for _, session in pairs(sessions) do
+		session:close("system-shutdown");
+	end
+	os.remove(socket_path);
+end);
+
+--- Unix domain socket management
+
+local conn, sock;
+
+local listeners = adminstream.server(sessions, fire_admin_event).listeners;
+
+local function accept_connection()
+	module:log("debug", "accepting...");
+	local client = sock:accept();
+	if not client then return; end
+	server.wrapclient(client, "unix", 0, listeners, "*a");
+end
+
+function module.load()
+	sock = unix.stream();
+	sock:settimeout(0);
+	os.remove(socket_path);
+	assert(sock:bind(socket_path));
+	assert(sock:listen());
+	conn = server.watchfd(sock:getfd(), accept_connection);
+end
+
+function module.unload()
+	if conn then
+		conn:close();
+	end
+	if sock then
+		sock:close();
+	end
+	os.remove(socket_path);
+end
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/util/adminstream.lua	Mon Jun 01 15:42:19 2020 +0100
@@ -0,0 +1,285 @@
+local st = require "util.stanza";
+local new_xmpp_stream = require "util.xmppstream".new;
+local sessionlib = require "util.session";
+local gettime = require "util.time".now;
+local runner = require "util.async".runner;
+local add_task = require "util.timer".add_task;
+local events = require "util.events";
+
+local stream_close_timeout = 5;
+
+local log = require "util.logger".init("adminstream");
+
+local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
+
+local stream_callbacks = { default_ns = "xmpp:prosody.im/admin" };
+
+function stream_callbacks.streamopened(session, attr)
+	-- run _streamopened in async context
+	session.thread:run({ stream = "opened", attr = attr });
+end
+
+function stream_callbacks._streamopened(session, attr) --luacheck: ignore 212/attr
+	if session.type ~= "client" then
+		session:open_stream();
+	end
+	session.notopen = nil;
+end
+
+function stream_callbacks.streamclosed(session, attr)
+	-- run _streamclosed in async context
+	session.thread:run({ stream = "closed", attr = attr });
+end
+
+function stream_callbacks._streamclosed(session)
+	session.log("debug", "Received </stream:stream>");
+	session:close(false);
+end
+
+function stream_callbacks.error(session, error, data)
+	if error == "no-stream" then
+		session.log("debug", "Invalid opening stream header (%s)", (data:gsub("^([^\1]+)\1", "{%1}")));
+		session:close("invalid-namespace");
+	elseif error == "parse-error" then
+		session.log("debug", "Client XML parse error: %s", data);
+		session:close("not-well-formed");
+	elseif error == "stream-error" then
+		local condition, text = "undefined-condition";
+		for child in data:childtags(nil, xmlns_xmpp_streams) do
+			if child.name ~= "text" then
+				condition = child.name;
+			else
+				text = child:get_text();
+			end
+			if condition ~= "undefined-condition" and text then
+				break;
+			end
+		end
+		text = condition .. (text and (" ("..text..")") or "");
+		session.log("info", "Session closed by remote with error: %s", text);
+		session:close(nil, text);
+	end
+end
+
+function stream_callbacks.handlestanza(session, stanza)
+	session.thread:run(stanza);
+end
+
+local runner_callbacks = {};
+
+function runner_callbacks:error(err)
+	self.data.log("error", "Traceback[c2s]: %s", err);
+end
+
+local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
+
+local function destroy_session(session, reason)
+	if session.destroyed then return; end
+	session.destroyed = true;
+	session.log("debug", "Destroying session: %s", reason or "unknown reason");
+end
+
+local function session_close(session, reason)
+	local log = session.log or log;
+	if session.conn then
+		if session.notopen then
+			session:open_stream();
+		end
+		if reason then -- nil == no err, initiated by us, false == initiated by client
+			local stream_error = st.stanza("stream:error");
+			if type(reason) == "string" then -- assume stream error
+				stream_error:tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' });
+			elseif type(reason) == "table" then
+				if reason.condition then
+					stream_error:tag(reason.condition, stream_xmlns_attr):up();
+					if reason.text then
+						stream_error:tag("text", stream_xmlns_attr):text(reason.text):up();
+					end
+					if reason.extra then
+						stream_error:add_child(reason.extra);
+					end
+				elseif reason.name then -- a stanza
+					stream_error = reason;
+				end
+			end
+			stream_error = tostring(stream_error);
+			log("debug", "Disconnecting client, <stream:error> is: %s", stream_error);
+			session.send(stream_error);
+		end
+
+		session.send("</stream:stream>");
+		function session.send() return false; end
+
+		local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
+		session.log("debug", "c2s stream for %s closed: %s", session.full_jid or session.ip or "<unknown>", reason_text or "session closed");
+
+		-- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
+		local conn = session.conn;
+		if reason_text == nil and not session.notopen and session.type == "c2s" then
+			-- Grace time to process data from authenticated cleanly-closed stream
+			add_task(stream_close_timeout, function ()
+				if not session.destroyed then
+					session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
+					destroy_session(session);
+					conn:close();
+				end
+			end);
+		else
+			destroy_session(session, reason_text);
+			conn:close();
+		end
+	else
+		local reason_text = (reason and (reason.name or reason.text or reason.condition)) or reason;
+		destroy_session(session, reason_text);
+	end
+end
+
+--- Public methods
+
+local function new_server(sessions, stanza_handler)
+	local listeners = {};
+
+	function listeners.onconnect(conn)
+		log("debug", "New connection");
+		local session = sessionlib.new("admin");
+		sessionlib.set_id(session);
+		sessionlib.set_logger(session);
+		sessionlib.set_conn(session, conn);
+
+		session.conntime = gettime();
+		session.type = "admin";
+
+		local stream = new_xmpp_stream(session, stream_callbacks);
+		session.stream = stream;
+		session.notopen = true;
+
+		session.thread = runner(function (stanza)
+			if st.is_stanza(stanza) then
+				stanza_handler(session, stanza);
+			elseif stanza.stream == "opened" then
+				stream_callbacks._streamopened(session, stanza.attr);
+			elseif stanza.stream == "closed" then
+				stream_callbacks._streamclosed(session, stanza.attr);
+			end
+		end, runner_callbacks, session);
+
+		function session.data(data)
+			-- Parse the data, which will store stanzas in session.pending_stanzas
+			if data then
+				local ok, err = stream:feed(data);
+				if not ok then
+					session.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
+					session:close("not-well-formed");
+				end
+			end
+		end
+
+		session.close = session_close;
+
+		session.send = function (t)
+			session.log("debug", "Sending[%s]: %s", session.type, t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
+			return session.rawsend(tostring(t));
+		end
+
+		function session.rawsend(t)
+			local ret, err = conn:write(t);
+			if not ret then
+				session.log("debug", "Error writing to connection: %s", err);
+				return false, err;
+			end
+			return true;
+		end
+
+		sessions[conn] = session;
+	end
+
+	function listeners.onincoming(conn, data)
+		local session = sessions[conn];
+		if session then
+			session.data(data);
+		end
+	end
+
+	function listeners.ondisconnect(conn, err)
+		local session = sessions[conn];
+		if session then
+			session.log("info", "Admin client disconnected: %s", err or "connection closed");
+			session.conn = nil;
+			sessions[conn]  = nil;
+		end
+	end
+	return {
+		listeners = listeners;
+	};
+end
+
+local function new_client()
+	local client = {
+		type = "client";
+		events = events.new();
+		log = log;
+	};
+
+	local listeners = {};
+
+	function listeners.onconnect(conn)
+		log("debug", "Connected");
+		client.conn = conn;
+
+		local stream = new_xmpp_stream(client, stream_callbacks);
+		client.stream = stream;
+		client.notopen = true;
+
+		client.thread = runner(function (stanza)
+			if st.is_stanza(stanza) then
+				client.events.fire_event("received", stanza);
+			elseif stanza.stream == "opened" then
+				stream_callbacks._streamopened(client, stanza.attr);
+				client.events.fire_event("connected");
+			elseif stanza.stream == "closed" then
+				client.events.fire_event("disconnected");
+				stream_callbacks._streamclosed(client, stanza.attr);
+			end
+		end, runner_callbacks, client);
+
+		client.close = session_close;
+
+		function client.send(t)
+			client.log("debug", "Sending: %s", t.top_tag and t:top_tag() or t:match("^[^>]*>?"));
+			return client.rawsend(tostring(t));
+		end
+
+		function client.rawsend(t)
+			local ret, err = conn:write(t);
+			if not ret then
+				client.log("debug", "Error writing to connection: %s", err);
+				return false, err;
+			end
+			return true;
+		end
+		client.log("debug", "Opening stream...");
+		client:open_stream();
+	end
+
+	function listeners.onincoming(conn, data) --luacheck: ignore 212/conn
+		local ok, err = client.stream:feed(data);
+		if not ok then
+			client.log("debug", "Received invalid XML (%s) %d bytes: %q", err, #data, data:sub(1, 300));
+			client:close("not-well-formed");
+		end
+	end
+
+	function listeners.ondisconnect(conn, err) --luacheck: ignore 212/conn
+		client.log("info", "Admin client disconnected: %s", err or "connection closed");
+		client.conn = nil;
+	end
+
+	client.listeners = listeners;
+
+	return client;
+end
+
+return {
+	server = new_server;
+	client = new_client;
+};