--- a/mod_websocket/mod_websocket.lua Thu May 24 23:52:36 2012 +0200
+++ b/mod_websocket/mod_websocket.lua Fri May 25 17:20:41 2012 +0200
@@ -1,53 +1,127 @@
-module.host = "*" -- Global module
+-- Prosody IM
+-- Copyright (C) 2008-2010 Matthew Wild
+-- Copyright (C) 2008-2010 Waqas Hussain
+-- Copyright (C) 2012 Florian Zeitz
+--
+-- This project is MIT/X11 licensed. Please see the
+-- COPYING file in the source package for more information.
+--
+
+module:set_global();
-local logger = require "util.logger";
-local log = logger.init("mod_websocket");
-local httpserver = require "net.httpserver";
-local lxp = require "lxp";
-local init_xmlhandlers = require "core.xmlhandlers";
+local add_task = require "util.timer".add_task;
+local new_xmpp_stream = require "util.xmppstream".new;
+local nameprep = require "util.encodings".stringprep.nameprep;
+local sessionmanager = require "core.sessionmanager";
local st = require "util.stanza";
-local sm = require "core.sessionmanager";
+local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
+local uuid_generate = require "util.uuid".generate;
+local sha1 = require "util.hashes".sha1;
+local base64 = require "util.encodings".base64.encode;
+local bxor = require "bit".bxor;
+local tohex = require "bit".tohex;
-local sessions = {};
-local default_headers = { };
+module:depends("http")
-local stream_callbacks = { default_ns = "jabber:client",
- streamopened = sm.streamopened,
- streamclosed = sm.streamclosed,
- handlestanza = core_process_stanza };
+local xpcall, tostring, type = xpcall, tostring, type;
+local traceback = debug.traceback;
+
+local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
+
+local log = module._log;
+
+local c2s_timeout = module:get_option_number("c2s_timeout");
+local opt_keepalives = module:get_option_boolean("tcp_keepalives", false);
+
+local sessions = module:shared("sessions");
+
+local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
+local listener = {};
+
+--- Stream events handlers
+local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
+local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
+
+function stream_callbacks.streamopened(session, attr)
+ local send = session.send;
+ session.host = nameprep(attr.to);
+ if not session.host then
+ session:close{ condition = "improper-addressing",
+ text = "A valid 'to' attribute is required on stream headers" };
+ return;
+ end
+ session.version = tonumber(attr.version) or 0;
+ session.streamid = uuid_generate();
+ (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host);
+
+ if not hosts[session.host] then
+ -- We don't serve this host...
+ session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)};
+ return;
+ end
+
+ send("<?xml version='1.0'?>"..(tostring(st.stanza("stream:stream", {
+ xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
+ id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag()):gsub(">", "/>")));
+
+ (session.log or log)("debug", "Sent reply <stream:stream> to client");
+ session.notopen = nil;
+
+ -- If session.secure is *false* (not nil) then it means we /were/ encrypting
+ -- since we now have a new stream header, session is secured
+ if session.secure == false then
+ session.secure = true;
+ end
+
+ local features = st.stanza("stream:features");
+ hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
+ module:fire_event("stream-features", session, features);
+
+ send(features);
+end
+
+function stream_callbacks.streamclosed(session)
+ session.log("debug", "Received </stream:stream>");
+ session:close();
+end
+
function stream_callbacks.error(session, error, data)
if error == "no-stream" then
session.log("debug", "Invalid opening stream header");
session:close("invalid-namespace");
- elseif session.close then
- (session.log or log)("debug", "Client XML parse error: %s", tostring(error));
- session:close("xml-not-well-formed");
+ elseif error == "parse-error" then
+ (session.log or log)("debug", "Client XML parse error: %s", tostring(data));
+ session:close("not-well-formed");
+ elseif error == "stream-error" then
+ local condition, text = "undefined-condition";
+ for child in data:children() do
+ if child.attr.xmlns == xmlns_xmpp_streams then
+ if child.name ~= "text" then
+ condition = child.name;
+ else
+ text = child:get_text();
+ end
+ if condition ~= "undefined-condition" and text then
+ break;
+ end
+ end
+ end
+ text = condition .. (text and (" ("..text..")") or "");
+ session.log("info", "Session closed by remote with error: %s", text);
+ session:close(nil, text);
end
end
-
-local function session_reset_stream(session)
- local parser = lxp.new(init_xmlhandlers(session, stream_callbacks), "\1");
- session.parser = parser;
-
- session.notopen = true;
-
- function session.data(conn, data)
- data, _ = data:gsub("[%z\255]", "")
- log("debug", "Parsing: %s", data)
-
- local ok, err = parser:parse(data)
- if not ok then
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data,
- data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("xml-not-well-formed");
- end
+local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end
+function stream_callbacks.handlestanza(session, stanza)
+ stanza = session.filter("stanzas/in", stanza);
+ if stanza then
+ return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
end
end
-local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
-local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
+--- Session methods
local function session_close(session, reason)
local log = session.log or log;
if session.conn then
@@ -78,85 +152,137 @@
end
session.send("</stream:stream>");
session.conn:close();
- websocket_listener.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed");
+ listener.ondisconnect(session.conn, (reason and (reason.text or reason.condition)) or reason or "session closed");
end
end
+--- Port listener
+function listener.onconnect(conn)
+ local session = sm_new_session(conn);
+ sessions[conn] = session;
-local websocket_listener = { default_mode = "*a" };
-function websocket_listener.onincoming(conn, data)
- local session = sessions[conn];
- if not session then
- session = { type = "c2s_unauthed",
- conn = conn,
- reset_stream = session_reset_stream,
- close = session_close,
- dispatch_stanza = stream_callbacks.handlestanza,
- log = logger.init("websocket"),
- secure = conn.ssl };
+ session.log("info", "Client connected");
+
+ -- Client is using legacy SSL (otherwise mod_tls sets this flag)
+ if conn:ssl() then
+ session.secure = true;
+ end
- function session.send(s)
- conn:write("\00" .. tostring(s) .. "\255");
- end
+ if opt_keepalives then
+ conn:setoption("keepalive", opt_keepalives);
+ end
+
+ session.close = session_close;
- sessions[conn] = session;
+ local stream = new_xmpp_stream(session, stream_callbacks);
+ session.stream = stream;
+ session.notopen = true;
+
+ function session.reset_stream()
+ session.notopen = true;
+ session.stream:reset();
end
- session_reset_stream(session);
+ local filter = session.filter;
+ function session.data(data)
+ local off = 0;
+ local len = string.byte(data, 2) - 0x80;
+ if len == 126 then
+ off = 2;
+ elseif len ==127 then
+ off = 8;
+ end
+ local key = {string.byte(data, off+3), string.byte(data, off+4), string.byte(data, off+5), string.byte(data, off+6)}
+ local decoded = "";
+ local counter = 0;
+ for i = off+7, #data do
+ decoded = decoded .. string.char(bxor(key[counter+1], string.byte(data, i)));
+ counter = (counter + 1) % 4;
+ end
+ module:log("debug", "Websocket received: %s %i", decoded, #decoded)
+ decoded = decoded:gsub("/>$", ">");
+
+ data = filter("bytes/in", decoded);
+ if data then
+ local ok, err = stream:feed(data);
+ if ok then return; end
+ log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
+ session:close("not-well-formed");
+ end
+ end
- if data then
- session.data(conn, data);
+ function session.send(s)
+ s = tostring(s);
+ local len = #s;
+ if len < 126 then
+ conn:write("\x81" .. string.char(len) .. s);
+ elseif len <= 0xffff then
+ conn:write("\x81" .. string.char(126) .. string.char(len/0x100) .. string.char(len%0x100) .. s);
+ else
+ conn:write("\x81" .. string.char(127) .. string.char(len/0x100000000000000)
+ .. string.char((len%0x100000000000000)/0x1000000000000) .. string.char((len%0x1000000000000)/0x10000000000)
+ .. string.char((len%0x10000000000)/0x100000000) .. string.char((len%0x100000000)/0x1000000)
+ .. string.char((len%0x1000000)/0x10000) .. string.char((len%0x10000)/0x100)
+ .. string.char((len%0x100)))
+ end
+ end
+
+ if c2s_timeout then
+ add_task(c2s_timeout, function ()
+ if session.type == "c2s_unauthed" then
+ session:close("connection-timeout");
+ end
+ end);
+ end
+
+ session.dispatch_stanza = stream_callbacks.handlestanza;
+end
+
+function listener.onincoming(conn, data)
+ local session = sessions[conn];
+ if session then
+ session.data(data);
+ else
+ listener.onconnect(conn, data);
+ session = sessions[conn];
+ session.data(data);
end
end
-function websocket_listener.ondisconnect(conn, err)
+function listener.ondisconnect(conn, err)
local session = sessions[conn];
if session then
(session.log or log)("info", "Client disconnected: %s", err);
- sm.destroy_session(session, err);
- sessions[conn] = nil;
+ sm_destroy_session(session, err);
+ sessions[conn] = nil;
session = nil;
end
end
-
-function handle_request(method, body, request)
- if request.method ~= "GET" or request.headers["upgrade"] ~= "WebSocket" or request.headers["connection"] ~= "Upgrade" then
- if request.method == "OPTIONS" then
- return { headers = default_headers, body = "" };
- else
- return "<html><body>You really don't look like a Websocket client to me... what do you want?</body></html>";
- end
- end
-
- local subprotocol = request.headers["Websocket-Protocol"];
- if subprotocol ~= nil and subprotocol ~= "XMPP" then
- return "<html><body>You really don't look like an XMPP Websocket client to me... what do you want?</body></html>";
- end
-
- if not method then
- log("debug", "Request %s suffered error %s", tostring(request.id), body);
- return;
- end
-
- request.conn:setlistener(websocket_listener);
- request.write("HTTP/1.1 101 Web Socket Protocol Handshake\r\n");
- request.write("Upgrade: WebSocket\r\n");
- request.write("Connection: Upgrade\r\n");
- request.write("WebSocket-Origin: file://\r\n"); -- FIXME
- request.write("WebSocket-Location: ws://localhost:5281/xmpp-websocket\r\n"); -- FIXME
- request.write("WebSocket-Protocol: XMPP\r\n");
- request.write("\r\n");
-
- return true;
+function listener.associate_session(conn, session)
+ sessions[conn] = session;
end
-local function setup()
- local ports = module:get_option("websocket_ports") or { 5281 };
- httpserver.new_from_config(ports, handle_request, { base = "xmpp-websocket" });
+function handle_request(event, path)
+ local request, response = event.request, event.response;
+
+ -- Add sanity checks
+
+ response.conn:setlistener(listener);
+ response.status = "101 Switching Protocols";
+ response.headers.Upgrade = "websocket";
+ response.headers.Connection = "Upgrade";
+ response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
+ response.headers.Sec_WebSocket_Protocol = "xmpp";
+
+ return "";
end
-if prosody.start_time then -- already started
- setup();
-else
- prosody.events.add_handler("server-started", setup);
+
+function module.load()
+ module:provides("http", {
+ name = "xmpp-websocket";
+ route = {
+ ["GET /*"] = handle_request;
+ };
+ });
end