--- a/mod_websocket/mod_websocket.lua Sat Jan 12 03:49:50 2013 +0100
+++ b/mod_websocket/mod_websocket.lua Sat Jan 12 17:33:52 2013 +0100
@@ -1,6 +1,4 @@
-- Prosody IM
--- Copyright (C) 2008-2010 Matthew Wild
--- Copyright (C) 2008-2010 Waqas Hussain
-- Copyright (C) 2012 Florian Zeitz
--
-- This project is MIT/X11 licensed. Please see the
@@ -9,28 +7,15 @@
module:set_global();
-local add_task = require "util.timer".add_task;
-local new_xmpp_stream = require "util.xmppstream".new;
-local nameprep = require "util.encodings".stringprep.nameprep;
-local sessionmanager = require "core.sessionmanager";
-local st = require "util.stanza";
-local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session;
-local uuid_generate = require "util.uuid".generate;
+local add_filter = require "util.filters".add_filter;
local sha1 = require "util.hashes".sha1;
local base64 = require "util.encodings".base64.encode;
-local band = require "bit".band;
-local bxor = require "bit".bxor;
-
-local xpcall, tostring, type = xpcall, tostring, type;
-local traceback = debug.traceback;
+local softreq = require "util.dependencies".softreq;
+local portmanager = require "core.portmanager";
-local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams";
-
-local log = module._log;
-
-local c2s_timeout = module:get_option_number("c2s_timeout");
-local stream_close_timeout = module:get_option_number("c2s_close_timeout", 5);
-local opt_keepalives = module:get_option_boolean("tcp_keepalives", false);
+local bit = softreq"bit" or softreq"bit32" or module:log("error", "No bit module found. Either LuaJIT 2 or Lua 5.2 is required");
+local band = bit.band;
+local bxor = bit.bxor;
local cross_domain = module:get_option("cross_domain_websocket");
if cross_domain then
@@ -44,11 +29,9 @@
end
end
-local sessions = module:shared("sessions");
-local core_process_stanza = prosody.core_process_stanza;
-
-local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza };
-local listener = {};
+module:depends("c2s")
+local sessions = module:shared("c2s/sessions");
+local c2s_listener = portmanager.get_service("c2s").listener;
-- Websocket helpers
local function parse_frame(frame)
@@ -131,189 +114,25 @@
return result;
end
---- Stream events handlers
-local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'};
-local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" };
-
-function stream_callbacks.streamopened(session, attr)
- local send = session.send;
- session.host = nameprep(attr.to);
- if not session.host then
- session:close{ condition = "improper-addressing",
- text = "A valid 'to' attribute is required on stream headers" };
- return;
- end
- session.version = tonumber(attr.version) or 0;
- session.streamid = uuid_generate();
- (session.log or session)("debug", "Client sent opening <stream:stream> to %s", session.host);
+--- Filter stuff
+function handle_request(event, path)
+ local request, response = event.request, event.response;
+ local conn = response.conn;
- if not hosts[session.host] then
- -- We don't serve this host...
- session:close{ condition = "host-unknown", text = "This server does not serve "..tostring(session.host)};
- return;
- end
-
- -- COMPAT: Some current client implementations need this to be self-closing
- if session.self_closing_stream then
- send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", {
- xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
- id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' })));
- else
- send("<?xml version='1.0'?>"..st.stanza("stream:stream", {
- xmlns = 'jabber:client', ["xmlns:stream"] = 'http://etherx.jabber.org/streams';
- id = session.streamid, from = session.host, version = '1.0', ["xml:lang"] = 'en' }):top_tag());
- end
-
- (session.log or log)("debug", "Sent reply <stream:stream> to client");
- session.notopen = nil;
-
- -- If session.secure is *false* (not nil) then it means we /were/ encrypting
- -- since we now have a new stream header, session is secured
- if session.secure == false then
- session.secure = true;
+ if not request.headers.sec_websocket_key then
+ response.headers.content_type = "text/html";
+ return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body>
+ <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p>
+ </body></html>]];
end
- local features = st.stanza("stream:features");
- hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
- module:fire_event("stream-features", session, features);
-
- send(features);
-end
-
-function stream_callbacks.streamclosed(session)
- session.log("debug", "Received </stream:stream>");
- session:close(false);
-end
-
-function stream_callbacks.error(session, error, data)
- if error == "no-stream" then
- session.log("debug", "Invalid opening stream header");
- session:close("invalid-namespace");
- elseif error == "parse-error" then
- (session.log or log)("debug", "Client XML parse error: %s", tostring(data));
- session:close("not-well-formed");
- elseif error == "stream-error" then
- local condition, text = "undefined-condition";
- for child in data:children() do
- if child.attr.xmlns == xmlns_xmpp_streams then
- if child.name ~= "text" then
- condition = child.name;
- else
- text = child:get_text();
- end
- if condition ~= "undefined-condition" and text then
- break;
- end
- end
- end
- text = condition .. (text and (" ("..text..")") or "");
- session.log("info", "Session closed by remote with error: %s", text);
- session:close(nil, text);
- end
-end
-
-local function handleerr(err) log("error", "Traceback[c2s]: %s: %s", tostring(err), traceback()); end
-function stream_callbacks.handlestanza(session, stanza)
- stanza = session.filter("stanzas/in", stanza);
- if stanza then
- return xpcall(function () return core_process_stanza(session, stanza) end, handleerr);
- end
-end
+ local wants_xmpp = false;
+ (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto)
+ if proto == "xmpp" then wants_xmpp = true; end
+ end);
---- Session methods
-local function session_close(session, reason)
- local log = session.log or log;
- if session.conn then
- if session.notopen then
- -- COMPAT: Some current client implementations need this to be self-closing
- if session.self_closing_stream then
- session.send("<?xml version='1.0'?>"..tostring(st.stanza("stream:stream", default_stream_attr)));
- else
- session.send("<?xml version='1.0'?>"..st.stanza("stream:stream", default_stream_attr):top_tag());
- end
- end
- if reason then -- nil == no err, initiated by us, false == initiated by client
- if type(reason) == "string" then -- assume stream error
- log("debug", "Disconnecting client, <stream:error> is: %s", reason);
- session.send(st.stanza("stream:error"):tag(reason, {xmlns = 'urn:ietf:params:xml:ns:xmpp-streams' }));
- elseif type(reason) == "table" then
- if reason.condition then
- local stanza = st.stanza("stream:error"):tag(reason.condition, stream_xmlns_attr):up();
- if reason.text then
- stanza:tag("text", stream_xmlns_attr):text(reason.text):up();
- end
- if reason.extra then
- stanza:add_child(reason.extra);
- end
- log("debug", "Disconnecting client, <stream:error> is: %s", tostring(stanza));
- session.send(stanza);
- elseif reason.name then -- a stanza
- log("debug", "Disconnecting client, <stream:error> is: %s", tostring(reason));
- session.send(reason);
- end
- end
- end
- session.send("</stream:stream>");
- function session.send() return false; end
-
- local reason = (reason and (reason.text or reason.condition)) or reason;
- session.log("info", "c2s stream for %s closed: %s", session.full_jid or ("<"..session.ip..">"), reason or "session closed");
-
- -- Authenticated incoming stream may still be sending us stanzas, so wait for </stream:stream> from remote
- local conn = session.conn;
- if reason == nil and not session.notopen and session.type == "c2s" then
- -- Grace time to process data from authenticated cleanly-closed stream
- add_task(stream_close_timeout, function ()
- if not session.destroyed then
- session.log("warn", "Failed to receive a stream close response, closing connection anyway...");
- sm_destroy_session(session, reason);
- conn:close();
- end
- end);
- else
- sm_destroy_session(session, reason);
- conn:close();
- end
- end
-end
-
-module:hook_global("user-deleted", function(event)
- local username, host = event.username, event.host;
- local user = hosts[host].sessions[username];
- if user and user.sessions then
- for jid, session in pairs(user.sessions) do
- session:close{ condition = "not-authorized", text = "Account deleted" };
- end
- end
-end, 200);
-
---- Port listener
-function listener.onconnect(conn)
- local session = sm_new_session(conn);
- sessions[conn] = session;
-
- session.log("info", "Client connected");
-
- -- Client is using legacy SSL (otherwise mod_tls sets this flag)
- if conn:ssl() then
- session.secure = true;
- end
-
- if opt_keepalives then
- conn:setoption("keepalive", opt_keepalives);
- end
-
- session.close = session_close;
-
- session.conn.starttls = nil;
-
- local stream = new_xmpp_stream(session, stream_callbacks);
- session.stream = stream;
- session.notopen = true;
-
- function session.reset_stream()
- session.notopen = true;
- session.stream:reset();
+ if not wants_xmpp then
+ return 501;
end
local function websocket_close(code, message)
@@ -322,7 +141,6 @@
conn:close();
end
- local filter = session.filter;
local dataBuffer;
local function handle_frame(frame)
module:log("debug", "Websocket received: %s (%i bytes)", frame.data, #frame.data);
@@ -365,115 +183,58 @@
dataBuffer = frame.data;
elseif frame.opcode == 0x2 then -- Binary frame
websocket_close(1003, "Only text frames are supported");
- return false;
+ return;
elseif frame.opcode == 0x8 then -- Close request
websocket_close(1000, "Goodbye");
- return false;
+ return;
elseif frame.opcode == 0x9 then -- Ping frame
frame.opcode = 0xA;
conn:write(build_frame(frame));
- return true;
+ return "";
else
log("warn", "Received frame with unsupported opcode %i", frame.opcode);
- return true;
+ return "";
end
if frame.FIN then
data = dataBuffer;
dataBuffer = nil;
- -- COMPAT: Some current client implementations send a self-closing <stream:stream>
- data, session.self_closing_stream = data:gsub("^(<stream:stream.*)/>$", "%1>");
- session.self_closing_stream = (session.self_closing_stream == 1)
-
- data = filter("bytes/in", data);
- if data then
- local ok, err = stream:feed(data);
- if ok then return; end
- log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_"));
- session:close("not-well-formed");
- end
+ return data;
end
- return true;
+ return "";
end
+ conn:setlistener(c2s_listener);
+ c2s_listener.onconnect(conn);
+
local frameBuffer = "";
- function session.data(data)
+ add_filter(sessions[conn], "bytes/in", function(data)
+ local cache = "";
frameBuffer = frameBuffer .. data;
local frame, length = parse_frame(frameBuffer);
while frame do
frameBuffer = frameBuffer:sub(length + 1);
- if not handle_frame(frame) then return; end
+ local result = handle_frame(frame);
+ if not result then return; end
+ cache = cache .. result;
frame, length = parse_frame(frameBuffer);
end
- end
-
- function session.send(s)
- conn:write(build_frame({ FIN = true, opcode = 0x01, data = tostring(s)}));
- end
-
- if c2s_timeout then
- add_task(c2s_timeout, function ()
- if session.type == "c2s_unauthed" then
- session:close("connection-timeout");
- end
- end);
- end
-
- session.dispatch_stanza = stream_callbacks.handlestanza;
-end
+ return cache;
-function listener.onincoming(conn, data)
- local session = sessions[conn];
- if session then
- session.data(data);
- else
- listener.onconnect(conn, data);
- session = sessions[conn];
- session.data(data);
- end
-end
-
-function listener.ondisconnect(conn, err)
- local session = sessions[conn];
- if session then
- (session.log or log)("info", "Client disconnected: %s", err or "connection closed");
- sm_destroy_session(session, err);
- sessions[conn] = nil;
- end
-end
-
-function listener.associate_session(conn, session)
- sessions[conn] = session;
-end
-
-function handle_request(event, path)
- local request, response = event.request, event.response;
-
- if not request.headers.sec_websocket_key then
- response.headers.content_type = "text/html";
- return [[<!DOCTYPE html><html><head><title>Websocket</title></head><body>
- <p>It works! Now point your WebSocket client to this URL to connect to Prosody.</p>
- </body></html>]];
- end
-
- local wants_xmpp = false;
- (request.headers.sec_websocket_protocol or ""):gsub("([^,]*),?", function (proto)
- if proto == "xmpp" then wants_xmpp = true; end
end);
- if not wants_xmpp then
- return 501;
- end
+ add_filter(sessions[conn], "bytes/out", function(data)
+ return build_frame({ FIN = true, opcode = 0x01, data = tostring(data)});
+ end);
- response.conn:setlistener(listener);
response.status = "101 Switching Protocols";
- response.headers.Upgrade = "websocket";
- response.headers.Connection = "Upgrade";
- response.headers.Sec_WebSocket_Accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
- response.headers.Sec_WebSocket_Protocol = "xmpp";
- response.headers.Access_Control_Allow_Origin = cross_domain;
+ response.headers.upgrade = "websocket";
+ response.headers.connection = "Upgrade";
+ response.headers.sec_webSocket_accept = base64(sha1(request.headers.sec_websocket_key .. "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
+ response.headers.sec_webSocket_protocol = "xmpp";
+ response.headers.access_control_allow_origin = cross_domain;
return "";
end