plugins/mod_bosh.lua
changeset 5776 bd0ff8ae98a8
parent 5748 cef2a1122b43
child 6531 f0687c313cf1
equal deleted inserted replaced
5775:a6c2b8933507 5776:bd0ff8ae98a8
    76 			if r == request then
    76 			if r == request then
    77 				t_remove(requests, i);
    77 				t_remove(requests, i);
    78 				break;
    78 				break;
    79 			end
    79 			end
    80 		end
    80 		end
    81 		
    81 
    82 		-- If this session now has no requests open, mark it as inactive
    82 		-- If this session now has no requests open, mark it as inactive
    83 		local max_inactive = session.bosh_max_inactive;
    83 		local max_inactive = session.bosh_max_inactive;
    84 		if max_inactive and #requests == 0 then
    84 		if max_inactive and #requests == 0 then
    85 			inactive_sessions[session] = os_time() + max_inactive;
    85 			inactive_sessions[session] = os_time() + max_inactive;
    86 			(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
    86 			(session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive);
   119 	headers.content_type = "text/xml; charset=utf-8";
   119 	headers.content_type = "text/xml; charset=utf-8";
   120 
   120 
   121 	if cross_domain and event.request.headers.origin then
   121 	if cross_domain and event.request.headers.origin then
   122 		set_cross_domain_headers(response);
   122 		set_cross_domain_headers(response);
   123 	end
   123 	end
   124 	
   124 
   125 	-- stream:feed() calls the stream_callbacks, so all stanzas in
   125 	-- stream:feed() calls the stream_callbacks, so all stanzas in
   126 	-- the body are processed in this next line before it returns.
   126 	-- the body are processed in this next line before it returns.
   127 	-- In particular, the streamopened() stream callback is where
   127 	-- In particular, the streamopened() stream callback is where
   128 	-- much of the session logic happens, because it's where we first
   128 	-- much of the session logic happens, because it's where we first
   129 	-- get to see the 'sid' of this request.
   129 	-- get to see the 'sid' of this request.
   130 	if not stream:feed(body) then
   130 	if not stream:feed(body) then
   131 		module:log("warn", "Error parsing BOSH payload")
   131 		module:log("warn", "Error parsing BOSH payload")
   132 		return 400;
   132 		return 400;
   133 	end
   133 	end
   134 	
   134 
   135 	-- Stanzas (if any) in the request have now been processed, and
   135 	-- Stanzas (if any) in the request have now been processed, and
   136 	-- we take care of the high-level BOSH logic here, including
   136 	-- we take care of the high-level BOSH logic here, including
   137 	-- giving a response or putting the request "on hold".
   137 	-- giving a response or putting the request "on hold".
   138 	local session = sessions[context.sid];
   138 	local session = sessions[context.sid];
   139 	if session then
   139 	if session then
   162 			log("debug", "Session has data in the send buffer, will send now..");
   162 			log("debug", "Session has data in the send buffer, will send now..");
   163 			local resp = t_concat(session.send_buffer);
   163 			local resp = t_concat(session.send_buffer);
   164 			session.send_buffer = {};
   164 			session.send_buffer = {};
   165 			session.send(resp);
   165 			session.send(resp);
   166 		end
   166 		end
   167 		
   167 
   168 		if not response.finished then
   168 		if not response.finished then
   169 			-- We're keeping this request open, to respond later
   169 			-- We're keeping this request open, to respond later
   170 			log("debug", "Have nothing to say, so leaving request unanswered for now");
   170 			log("debug", "Have nothing to say, so leaving request unanswered for now");
   171 			if session.bosh_wait then
   171 			if session.bosh_wait then
   172 				waiting_requests[response] = os_time() + session.bosh_wait;
   172 				waiting_requests[response] = os_time() + session.bosh_wait;
   173 			end
   173 			end
   174 		end
   174 		end
   175 		
   175 
   176 		if session.bosh_terminate then
   176 		if session.bosh_terminate then
   177 			session.log("debug", "Closing session with %d requests open", #session.requests);
   177 			session.log("debug", "Closing session with %d requests open", #session.requests);
   178 			session:close();
   178 			session:close();
   179 			return nil;
   179 			return nil;
   180 		else
   180 		else
   190 
   190 
   191 local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" };
   191 local stream_xmlns_attr = { xmlns = "urn:ietf:params:xml:ns:xmpp-streams" };
   192 
   192 
   193 local function bosh_close_stream(session, reason)
   193 local function bosh_close_stream(session, reason)
   194 	(session.log or log)("info", "BOSH client disconnected");
   194 	(session.log or log)("info", "BOSH client disconnected");
   195 	
   195 
   196 	local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
   196 	local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
   197 		["xmlns:stream"] = xmlns_streams });
   197 		["xmlns:stream"] = xmlns_streams });
   198 	
   198 
   199 
   199 
   200 	if reason then
   200 	if reason then
   201 		close_reply.attr.condition = "remote-stream-error";
   201 		close_reply.attr.condition = "remote-stream-error";
   202 		if type(reason) == "string" then -- assume stream error
   202 		if type(reason) == "string" then -- assume stream error
   203 			close_reply:tag("stream:error")
   203 			close_reply:tag("stream:error")
   234 	local sid = attr.sid;
   234 	local sid = attr.sid;
   235 	log("debug", "BOSH body open (sid: %s)", sid or "<none>");
   235 	log("debug", "BOSH body open (sid: %s)", sid or "<none>");
   236 	if not sid then
   236 	if not sid then
   237 		-- New session request
   237 		-- New session request
   238 		context.notopen = nil; -- Signals that we accept this opening tag
   238 		context.notopen = nil; -- Signals that we accept this opening tag
   239 		
   239 
   240 		-- TODO: Sanity checks here (rid, to, known host, etc.)
   240 		-- TODO: Sanity checks here (rid, to, known host, etc.)
   241 		if not hosts[attr.to] then
   241 		if not hosts[attr.to] then
   242 			-- Unknown host
   242 			-- Unknown host
   243 			log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
   243 			log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to));
   244 			local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
   244 			local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate",
   245 				["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
   245 				["xmlns:stream"] = xmlns_streams, condition = "host-unknown" });
   246 			response:send(tostring(close_reply));
   246 			response:send(tostring(close_reply));
   247 			return;
   247 			return;
   248 		end
   248 		end
   249 		
   249 
   250 		-- New session
   250 		-- New session
   251 		sid = new_uuid();
   251 		sid = new_uuid();
   252 		local session = {
   252 		local session = {
   253 			type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
   253 			type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
   254 			bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid,
   254 			bosh_version = attr.ver, bosh_wait = math_min(attr.wait, bosh_max_wait), streamid = sid,
   257 			close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
   257 			close = bosh_close_stream, dispatch_stanza = core_process_stanza, notopen = true,
   258 			log = logger.init("bosh"..sid),	secure = consider_bosh_secure or request.secure,
   258 			log = logger.init("bosh"..sid),	secure = consider_bosh_secure or request.secure,
   259 			ip = get_ip_from_request(request);
   259 			ip = get_ip_from_request(request);
   260 		};
   260 		};
   261 		sessions[sid] = session;
   261 		sessions[sid] = session;
   262 		
   262 
   263 		local filter = initialize_filters(session);
   263 		local filter = initialize_filters(session);
   264 		
   264 
   265 		session.log("debug", "BOSH session created for request from %s", session.ip);
   265 		session.log("debug", "BOSH session created for request from %s", session.ip);
   266 		log("info", "New BOSH session, assigned it sid '%s'", sid);
   266 		log("info", "New BOSH session, assigned it sid '%s'", sid);
   267 
   267 
   268 		-- Send creation response
   268 		-- Send creation response
   269 		local creating_session = true;
   269 		local creating_session = true;
   306 			end
   306 			end
   307 			return true;
   307 			return true;
   308 		end
   308 		end
   309 		request.sid = sid;
   309 		request.sid = sid;
   310 	end
   310 	end
   311 	
   311 
   312 	local session = sessions[sid];
   312 	local session = sessions[sid];
   313 	if not session then
   313 	if not session then
   314 		-- Unknown sid
   314 		-- Unknown sid
   315 		log("info", "Client tried to use sid '%s' which we don't know about", sid);
   315 		log("info", "Client tried to use sid '%s' which we don't know about", sid);
   316 		response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
   316 		response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })));
   317 		context.notopen = nil;
   317 		context.notopen = nil;
   318 		return;
   318 		return;
   319 	end
   319 	end
   320 	
   320 
   321 	if session.rid then
   321 	if session.rid then
   322 		local rid = tonumber(attr.rid);
   322 		local rid = tonumber(attr.rid);
   323 		local diff = rid - session.rid;
   323 		local diff = rid - session.rid;
   324 		if diff > 1 then
   324 		if diff > 1 then
   325 			session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
   325 			session.log("warn", "rid too large (means a request was lost). Last rid: %d New rid: %s", session.rid, attr.rid);
   332 			t_insert(session.requests, response);
   332 			t_insert(session.requests, response);
   333 			return;
   333 			return;
   334 		end
   334 		end
   335 		session.rid = rid;
   335 		session.rid = rid;
   336 	end
   336 	end
   337 	
   337 
   338 	if attr.type == "terminate" then
   338 	if attr.type == "terminate" then
   339 		-- Client wants to end this session, which we'll do
   339 		-- Client wants to end this session, which we'll do
   340 		-- after processing any stanzas in this request
   340 		-- after processing any stanzas in this request
   341 		session.bosh_terminate = true;
   341 		session.bosh_terminate = true;
   342 	end
   342 	end
   386 		local response = context.response;
   386 		local response = context.response;
   387 		response.status_code = 400;
   387 		response.status_code = 400;
   388 		response:send();
   388 		response:send();
   389 		return;
   389 		return;
   390 	end
   390 	end
   391 	
   391 
   392 	local session = sessions[context.sid];
   392 	local session = sessions[context.sid];
   393 	if error == "stream-error" then -- Remote stream error, we close normally
   393 	if error == "stream-error" then -- Remote stream error, we close normally
   394 		session:close();
   394 		session:close();
   395 	else
   395 	else
   396 		session:close({ condition = "bad-format", text = "Error processing stream" });
   396 		session:close({ condition = "bad-format", text = "Error processing stream" });
   410 			if request.conn then
   410 			if request.conn then
   411 				sessions[request.context.sid].send("");
   411 				sessions[request.context.sid].send("");
   412 			end
   412 			end
   413 		end
   413 		end
   414 	end
   414 	end
   415 	
   415 
   416 	now = now - 3;
   416 	now = now - 3;
   417 	local n_dead_sessions = 0;
   417 	local n_dead_sessions = 0;
   418 	for session, close_after in pairs(inactive_sessions) do
   418 	for session, close_after in pairs(inactive_sessions) do
   419 		if close_after < now then
   419 		if close_after < now then
   420 			(session.log or log)("debug", "BOSH client inactive too long, destroying session at %d", now);
   420 			(session.log or log)("debug", "BOSH client inactive too long, destroying session at %d", now);