plugins/mod_bosh.lua
changeset 4692 8e7c683d78ca
parent 4690 55f690fdc915
child 4697 778eb9405a98
equal deleted inserted replaced
4691:a164fc7057ae 4692:8e7c683d78ca
    77 local inactive_sessions = {}; -- Sessions which have no open requests
    77 local inactive_sessions = {}; -- Sessions which have no open requests
    78 
    78 
    79 -- Used to respond to idle sessions (those with waiting requests)
    79 -- Used to respond to idle sessions (those with waiting requests)
    80 local waiting_requests = {};
    80 local waiting_requests = {};
    81 function on_destroy_request(request)
    81 function on_destroy_request(request)
       
    82 	log("debug", "Request destroyed: %s", tostring(request));
    82 	waiting_requests[request] = nil;
    83 	waiting_requests[request] = nil;
    83 	local session = sessions[request.sid];
    84 	local session = sessions[request.context.sid];
    84 	if session then
    85 	if session then
    85 		local requests = session.requests;
    86 		local requests = session.requests;
    86 		for i,r in ipairs(requests) do
    87 		for i, r in ipairs(requests) do
    87 			if r == request then
    88 			if r == request then
    88 				t_remove(requests, i);
    89 				t_remove(requests, i);
    89 				break;
    90 				break;
    90 			end
    91 			end
    91 		end
    92 		end
    97 			(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
    98 			(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
    98 		end
    99 		end
    99 	end
   100 	end
   100 end
   101 end
   101 
   102 
   102 function handle_request(method, body, request)
   103 local function handle_GET(request)
   103 	if (not body) or request.method ~= "POST" then
   104 	return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>";
   104 		if request.method == "OPTIONS" then
   105 end
   105 			local headers = {};
   106 
   106 			for k,v in pairs(default_headers) do headers[k] = v; end
   107 function handle_OPTIONS(request)
   107 			headers["Content-Type"] = nil;
   108 	local headers = {};
   108 			return { headers = headers, body = "" };
   109 	for k,v in pairs(default_headers) do headers[k] = v; end
   109 		else
   110 	headers["Content-Type"] = nil;
   110 			return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>";
   111 	return { headers = headers, body = "" };
   111 		end
   112 end
   112 	end
   113 
   113 	if not method then
   114 function handle_POST(event)
   114 		log("debug", "Request %s suffered error %s", tostring(request.id), body);
   115 	log("debug", "Handling new request %s: %s\n----------", tostring(event.request), tostring(event.request.body));
   115 		return;
   116 
   116 	end
   117 	local request, response = event.request, event.response;
   117 	--log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body));
   118 	response.on_destroy = on_destroy_request;
   118 	request.notopen = true;
   119 	local body = request.body;
   119 	request.log = log;
   120 
   120 	request.on_destroy = on_destroy_request;
   121 	local context = { request = request, response = response, notopen = true };
   121 	
   122 	local stream = new_xmpp_stream(context, stream_callbacks);
   122 	local stream = new_xmpp_stream(request, stream_callbacks);
   123 	response.context = context;
   123 	
   124 	
   124 	-- stream:feed() calls the stream_callbacks, so all stanzas in
   125 	-- stream:feed() calls the stream_callbacks, so all stanzas in
   125 	-- the body are processed in this next line before it returns.
   126 	-- the body are processed in this next line before it returns.
   126 	local ok, err = stream:feed(body);
   127 	-- In particular, the streamopened() stream callback is where
   127 	if not ok then
   128 	-- much of the session logic happens, because it's where we first
   128 		log("error", "Failed to parse BOSH payload: %s", err);
   129 	-- get to see the 'sid' of this request.
   129 	end
   130 	stream:feed(body);
   130 	
   131 	
   131 	-- Stanzas (if any) in the request have now been processed, and
   132 	-- Stanzas (if any) in the request have now been processed, and
   132 	-- we take care of the high-level BOSH logic here, including
   133 	-- we take care of the high-level BOSH logic here, including
   133 	-- giving a response or putting the request "on hold".
   134 	-- giving a response or putting the request "on hold".
   134 	local session = sessions[request.sid];
   135 	local session = sessions[context.sid];
   135 	if session then
   136 	if session then
   136 		-- Session was marked as inactive, since we have
   137 		-- Session was marked as inactive, since we have
   137 		-- a request open now, unmark it
   138 		-- a request open now, unmark it
   138 		if inactive_sessions[session] and #session.requests > 0 then
   139 		if inactive_sessions[session] and #session.requests > 0 then
   139 			inactive_sessions[session] = nil;
   140 			inactive_sessions[session] = nil;
   140 		end
   141 		end
   141 
   142 
   142 		local r = session.requests;
   143 		local r = session.requests;
   143 		log("debug", "Session %s has %d out of %d requests open", request.sid, #r, session.bosh_hold);
   144 		log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold);
   144 		log("debug", "and there are %d things in the send_buffer", #session.send_buffer);
   145 		log("debug", "and there are %d things in the send_buffer:", #session.send_buffer);
       
   146 		for i, thing in ipairs(session.send_buffer) do
       
   147 			log("debug", "    %s", tostring(thing));
       
   148 		end
   145 		if #r > session.bosh_hold then
   149 		if #r > session.bosh_hold then
   146 			-- We are holding too many requests, send what's in the buffer,
   150 			-- We are holding too many requests, send what's in the buffer,
   147 			log("debug", "We are holding too many requests, so...");
   151 			log("debug", "We are holding too many requests, so...");
   148 			if #session.send_buffer > 0 then
   152 			if #session.send_buffer > 0 then
   149 				log("debug", "...sending what is in the buffer")
   153 				log("debug", "...sending what is in the buffer")
   159 			local resp = t_concat(session.send_buffer);
   163 			local resp = t_concat(session.send_buffer);
   160 			session.send_buffer = {};
   164 			session.send_buffer = {};
   161 			session.send(resp);
   165 			session.send(resp);
   162 		end
   166 		end
   163 		
   167 		
   164 		if not request.destroyed then
   168 		if not response.finished then
   165 			-- We're keeping this request open, to respond later
   169 			-- We're keeping this request open, to respond later
   166 			log("debug", "Have nothing to say, so leaving request unanswered for now");
   170 			log("debug", "Have nothing to say, so leaving request unanswered for now");
   167 			if session.bosh_wait then
   171 			if session.bosh_wait then
   168 				waiting_requests[request] = os_time() + session.bosh_wait;
   172 				waiting_requests[response] = os_time() + session.bosh_wait;
   169 			end
   173 			end
   170 		end
   174 		end
   171 		
   175 		
   172 		if session.bosh_terminate then
   176 		if session.bosh_terminate then
   173 			session.log("debug", "Closing session with %d requests open", #session.requests);
   177 			session.log("debug", "Closing session with %d requests open", #session.requests);
   211 			end
   215 			end
   212 		end
   216 		end
   213 		log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply));
   217 		log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply));
   214 	end
   218 	end
   215 
   219 
   216 	local session_close_response = { headers = default_headers, body = tostring(close_reply) };
   220 	local response_body = tostring(close_reply);
   217 
       
   218 	for _, held_request in ipairs(session.requests) do
   221 	for _, held_request in ipairs(session.requests) do
   219 		held_request:send(session_close_response);
   222 		held_request.headers = default_headers;
   220 		held_request:destroy();
   223 		held_request:send(response_body);
   221 	end
   224 	end
   222 	sessions[session.sid]  = nil;
   225 	sessions[session.sid]  = nil;
   223 	inactive_sessions[session] = nil;
   226 	inactive_sessions[session] = nil;
   224 	sm_destroy_session(session);
   227 	sm_destroy_session(session);
   225 end
   228 end
   226 
   229 
   227 -- Handle the <body> tag in the request payload.
   230 -- Handle the <body> tag in the request payload.
   228 function stream_callbacks.streamopened(request, attr)
   231 function stream_callbacks.streamopened(context, attr)
       
   232 	local request, response = context.request, context.response;
   229 	local sid = attr.sid;
   233 	local sid = attr.sid;
   230 	log("debug", "BOSH body open (sid: %s)", sid or "<none>");
   234 	log("debug", "BOSH body open (sid: %s)", sid or "<none>");
   231 	if not sid then
   235 	if not sid then
   232 		-- New session request
   236 		-- New session request
   233 		request.notopen = nil; -- Signals that we accept this opening tag
   237 		context.notopen = nil; -- Signals that we accept this opening tag
   234 		
   238 		
   235 		-- TODO: Sanity checks here (rid, to, known host, etc.)
   239 		-- TODO: Sanity checks here (rid, to, known host, etc.)
   236 		if not hosts[attr.to] then
   240 		if not hosts[attr.to] then
   237 			-- Unknown host
   241 			-- Unknown host
   238 			log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
   242 			log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
   239 			local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
   243 			local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
   240 				["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
   244 				["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
   241 			request:send(tostring(close_reply));
   245 			response:send(tostring(close_reply));
   242 			return;
   246 			return;
   243 		end
   247 		end
   244 		
   248 		
   245 		-- New session
   249 		-- New session
   246 		sid = new_uuid();
   250 		sid = new_uuid();
   256 		sessions[sid] = session;
   260 		sessions[sid] = session;
   257 		
   261 		
   258 		session.log("debug", "BOSH session created for request from %s", session.ip);
   262 		session.log("debug", "BOSH session created for request from %s", session.ip);
   259 		log("info", "New BOSH session, assigned it sid '%s'", sid);
   263 		log("info", "New BOSH session, assigned it sid '%s'", sid);
   260 		local r, send_buffer = session.requests, session.send_buffer;
   264 		local r, send_buffer = session.requests, session.send_buffer;
   261 		local response = { headers = default_headers }
       
   262 		function session.send(s)
   265 		function session.send(s)
   263 			-- We need to ensure that outgoing stanzas have the jabber:client xmlns
   266 			-- We need to ensure that outgoing stanzas have the jabber:client xmlns
   264 			if s.attr and not s.attr.xmlns then
   267 			if s.attr and not s.attr.xmlns then
   265 				s = st.clone(s);
   268 				s = st.clone(s);
   266 				s.attr.xmlns = "jabber:client";
   269 				s.attr.xmlns = "jabber:client";
   267 			end
   270 			end
   268 			--log("debug", "Sending BOSH data: %s", tostring(s));
   271 			--log("debug", "Sending BOSH data: %s", tostring(s));
   269 			local oldest_request = r[1];
   272 			local oldest_request = r[1];
   270 			if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then
   273 			if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then
   271 				log("debug", "We have an open request, so sending on that");
   274 				log("debug", "We have an open request, so sending on that");
   272 				response.body = t_concat({
   275 				oldest_request.headers = default_headers;
       
   276 				oldest_request:send(t_concat({
   273 					"<body xmlns='http://jabber.org/protocol/httpbind' ",
   277 					"<body xmlns='http://jabber.org/protocol/httpbind' ",
   274 					session.bosh_terminate and "type='terminate' " or "",
   278 					session.bosh_terminate and "type='terminate' " or "",
   275 					"sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>",
   279 					"sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>",
   276 					tostring(s),
   280 					tostring(s),
   277 					"</body>"
   281 					"</body>"
   278 				});
   282 				}));
   279 				oldest_request:send(response);
       
   280 				--log("debug", "Sent");
       
   281 				if oldest_request.stayopen then
       
   282 					if #r>1 then
       
   283 						-- Move front request to back
       
   284 						t_insert(r, oldest_request);
       
   285 						t_remove(r, 1);
       
   286 					end
       
   287 				else
       
   288 					log("debug", "Destroying the request now...");
       
   289 					oldest_request:destroy();
       
   290 				end
       
   291 			elseif s ~= "" then
   283 			elseif s ~= "" then
   292 				log("debug", "Saved to send buffer because there are %d open requests", #r);
   284 				log("debug", "Saved to send buffer because there are %d open requests", #r);
   293 				-- Hmm, no requests are open :(
   285 				-- Hmm, no requests are open :(
   294 				t_insert(session.send_buffer, tostring(s));
   286 				t_insert(session.send_buffer, tostring(s));
   295 				log("debug", "There are now %d things in the send_buffer", #session.send_buffer);
   287 				log("debug", "There are now %d things in the send_buffer", #session.send_buffer);
   301 		
   293 		
   302 		local features = st.stanza("stream:features");
   294 		local features = st.stanza("stream:features");
   303 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
   295 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
   304 		fire_event("stream-features", session, features);
   296 		fire_event("stream-features", session, features);
   305 		--xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
   297 		--xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
   306 		local response = st.stanza("body", { xmlns = xmlns_bosh,
   298 		local body = st.stanza("body", { xmlns = xmlns_bosh,
   307 			wait = attr.wait,
   299 			wait = attr.wait,
   308 			inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
   300 			inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
   309 			polling = tostring(BOSH_DEFAULT_POLLING),
   301 			polling = tostring(BOSH_DEFAULT_POLLING),
   310 			requests = tostring(BOSH_DEFAULT_REQUESTS),
   302 			requests = tostring(BOSH_DEFAULT_REQUESTS),
   311 			hold = tostring(session.bosh_hold),
   303 			hold = tostring(session.bosh_hold),
   313 			ver  = '1.6', from = session.host,
   305 			ver  = '1.6', from = session.host,
   314 			secure = 'true', ["xmpp:version"] = "1.0",
   306 			secure = 'true', ["xmpp:version"] = "1.0",
   315 			["xmlns:xmpp"] = "urn:xmpp:xbosh",
   307 			["xmlns:xmpp"] = "urn:xmpp:xbosh",
   316 			["xmlns:stream"] = "http://etherx.jabber.org/streams"
   308 			["xmlns:stream"] = "http://etherx.jabber.org/streams"
   317 		}):add_child(features);
   309 		}):add_child(features);
   318 		request:send{ headers = default_headers, body = tostring(response) };
   310 		response.headers = default_headers;
       
   311 		response:send(tostring(body));
   319 		
   312 		
   320 		request.sid = sid;
   313 		request.sid = sid;
   321 		return;
   314 		return;
   322 	end
   315 	end
   323 	
   316 	
   324 	local session = sessions[sid];
   317 	local session = sessions[sid];
   325 	if not session then
   318 	if not session then
   326 		-- Unknown sid
   319 		-- Unknown sid
   327 		log("info", "Client tried to use sid '%s' which we don't know about", sid);
   320 		log("info", "Client tried to use sid '%s' which we don't know about", sid);
   328 		request:send{ headers = default_headers, body = tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })) };
   321 		response.headers = default_headers;
   329 		request.notopen = nil;
   322 		response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
       
   323 		context.notopen = nil;
   330 		return;
   324 		return;
   331 	end
   325 	end
   332 	
   326 	
   333 	if session.rid then
   327 	if session.rid then
   334 		local rid = tonumber(attr.rid);
   328 		local rid = tonumber(attr.rid);
   336 		if diff > 1 then
   330 		if diff > 1 then
   337 			session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
   331 			session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
   338 		elseif diff <= 0 then
   332 		elseif diff <= 0 then
   339 			-- Repeated, ignore
   333 			-- Repeated, ignore
   340 			session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff);
   334 			session.log("debug", "rid repeated (on request %s), ignoring: %s (diff %d)", request.id, session.rid, diff);
   341 			request.notopen = nil;
   335 			context.notopen = nil;
   342 			request.ignore = true;
   336 			context.ignore = true;
   343 			request.sid = sid;
   337 			context.sid = sid;
   344 			t_insert(session.requests, request);
   338 			t_insert(session.requests, response);
   345 			return;
   339 			return;
   346 		end
   340 		end
   347 		session.rid = rid;
   341 		session.rid = rid;
   348 	end
   342 	end
   349 	
   343 	
   351 		-- Client wants to end this session, which we'll do
   345 		-- Client wants to end this session, which we'll do
   352 		-- after processing any stanzas in this request
   346 		-- after processing any stanzas in this request
   353 		session.bosh_terminate = true;
   347 		session.bosh_terminate = true;
   354 	end
   348 	end
   355 
   349 
   356 	request.notopen = nil; -- Signals that we accept this opening tag
   350 	context.notopen = nil; -- Signals that we accept this opening tag
   357 	t_insert(session.requests, request);
   351 	t_insert(session.requests, response);
   358 	request.sid = sid;
   352 	context.sid = sid;
   359 
   353 
   360 	if session.notopen then
   354 	if session.notopen then
   361 		local features = st.stanza("stream:features");
   355 		local features = st.stanza("stream:features");
   362 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
   356 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
   363 		fire_event("stream-features", session, features);
   357 		fire_event("stream-features", session, features);
   364 		session.send(features);
   358 		session.send(features);
   365 		session.notopen = nil;
   359 		session.notopen = nil;
   366 	end
   360 	end
   367 end
   361 end
   368 
   362 
   369 function stream_callbacks.handlestanza(request, stanza)
   363 function stream_callbacks.handlestanza(context, stanza)
   370 	if request.ignore then return; end
   364 	if context.ignore then return; end
   371 	log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
   365 	log("debug", "BOSH stanza received: %s\n", stanza:top_tag());
   372 	local session = sessions[request.sid];
   366 	local session = sessions[context.sid];
   373 	if session then
   367 	if session then
   374 		if stanza.attr.xmlns == xmlns_bosh then
   368 		if stanza.attr.xmlns == xmlns_bosh then
   375 			stanza.attr.xmlns = nil;
   369 			stanza.attr.xmlns = nil;
   376 		end
   370 		end
   377 		core_process_stanza(session, stanza);
   371 		core_process_stanza(session, stanza);
   378 	end
   372 	end
   379 end
   373 end
   380 
   374 
   381 function stream_callbacks.error(request, error)
   375 function stream_callbacks.error(context, error)
   382 	log("debug", "Error parsing BOSH request payload; %s", error);
   376 	log("debug", "Error parsing BOSH request payload; %s", error);
   383 	if not request.sid then
   377 	if not context.sid then
   384 		request:send({ headers = default_headers, status = "400 Bad Request" });
   378 		local response = context.response;
       
   379 		response.headers = default_headers;
       
   380 		response.status_code = 400;
       
   381 		request:send();
   385 		return;
   382 		return;
   386 	end
   383 	end
   387 	
   384 	
   388 	local session = sessions[request.sid];
   385 	local session = sessions[context.sid];
   389 	if error == "stream-error" then -- Remote stream error, we close normally
   386 	if error == "stream-error" then -- Remote stream error, we close normally
   390 		session:close();
   387 		session:close();
   391 	else
   388 	else
   392 		session:close({ condition = "bad-format", text = "Error processing stream" });
   389 		session:close({ condition = "bad-format", text = "Error processing stream" });
   393 	end
   390 	end
   398 	-- log("debug", "Checking for requests soon to timeout...");
   395 	-- log("debug", "Checking for requests soon to timeout...");
   399 	-- Identify requests timing out within the next few seconds
   396 	-- Identify requests timing out within the next few seconds
   400 	local now = os_time() + 3;
   397 	local now = os_time() + 3;
   401 	for request, reply_before in pairs(waiting_requests) do
   398 	for request, reply_before in pairs(waiting_requests) do
   402 		if reply_before <= now then
   399 		if reply_before <= now then
   403 			log("debug", "%s was soon to timeout, sending empty response", request.id);
   400 			log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now);
   404 			-- Send empty response to let the
   401 			-- Send empty response to let the
   405 			-- client know we're still here
   402 			-- client know we're still here
   406 			if request.conn then
   403 			if request.conn then
   407 				sessions[request.sid].send("");
   404 				sessions[request.context.sid].send("");
   408 			end
   405 			end
   409 		end
   406 		end
   410 	end
   407 	end
   411 	
   408 	
   412 	now = now - 3;
   409 	now = now - 3;
   426 		dead_sessions[i] = nil;
   423 		dead_sessions[i] = nil;
   427 		sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds");
   424 		sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds");
   428 	end
   425 	end
   429 	return 1;
   426 	return 1;
   430 end
   427 end
   431 
   428 module:add_timer(1, on_timer);
   432 
   429 
   433 local function setup()
   430 function module.add_host(module)
   434 	local ports = module:get_option_array("bosh_ports") or { 5280 };
   431 	module:depends("http");
   435 	httpserver.new_from_config(ports, handle_request, { base = "http-bind" });
   432 	module:provides("http", {
   436 	timer.add_task(1, on_timer);
   433 		default_path = "/http-bind";
   437 end
   434 		route = {
   438 if prosody.start_time then -- already started
   435 			["GET /"] = handle_GET;
   439 	setup();
   436 			["OPTIONS /"] = handle_OPTIONS;
   440 else
   437 			["POST /"] = handle_POST;
   441 	prosody.events.add_handler("server-started", setup);
   438 		};
   442 end
   439 	});
       
   440 end