plugins/mod_bosh.lua
changeset 5046 16c7b510694b
parent 5031 28d56268a72d
child 5070 4bf6bd22ad11
equal deleted inserted replaced
5045:4ba6940deed0 5046:16c7b510694b
   240 		end
   240 		end
   241 		
   241 		
   242 		-- New session
   242 		-- New session
   243 		sid = new_uuid();
   243 		sid = new_uuid();
   244 		local session = {
   244 		local session = {
   245 			type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to,
   245 			type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
   246 			bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
   246 			bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
   247 			bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
   247 			bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
   248 			requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
   248 			requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
   249 			close = bosh_close_stream, dispatch_stanza = core_process_stanza,
   249 			close = bosh_close_stream, dispatch_stanza = core_process_stanza,
   250 			log = logger.init("bosh"..sid),	secure = consider_bosh_secure or request.secure,
   250 			log = logger.init("bosh"..sid),	secure = consider_bosh_secure or request.secure,
   252 		};
   252 		};
   253 		sessions[sid] = session;
   253 		sessions[sid] = session;
   254 		
   254 		
   255 		session.log("debug", "BOSH session created for request from %s", session.ip);
   255 		session.log("debug", "BOSH session created for request from %s", session.ip);
   256 		log("info", "New BOSH session, assigned it sid '%s'", sid);
   256 		log("info", "New BOSH session, assigned it sid '%s'", sid);
       
   257 
       
   258 		-- Send creation response
       
   259 		local creating_session = true;
       
   260 		local features = st.stanza("stream:features");
       
   261 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
       
   262 		fire_event("stream-features", session, features);
       
   263 		table.insert(session.send_buffer, tostring(features));
       
   264 
   257 		local r = session.requests;
   265 		local r = session.requests;
   258 		function session.send(s)
   266 		function session.send(s)
   259 			-- We need to ensure that outgoing stanzas have the jabber:client xmlns
   267 			-- We need to ensure that outgoing stanzas have the jabber:client xmlns
   260 			if s.attr and not s.attr.xmlns then
   268 			if s.attr and not s.attr.xmlns then
   261 				s = st.clone(s);
   269 				s = st.clone(s);
   262 				s.attr.xmlns = "jabber:client";
   270 				s.attr.xmlns = "jabber:client";
   263 			end
   271 			end
   264 			--log("debug", "Sending BOSH data: %s", tostring(s));
   272 			--log("debug", "Sending BOSH data: %s", tostring(s));
       
   273 			t_insert(session.send_buffer, tostring(s));
       
   274 
   265 			local oldest_request = r[1];
   275 			local oldest_request = r[1];
   266 			if oldest_request then
   276 			if oldest_request and not session.bosh_processing then
   267 				log("debug", "We have an open request, so sending on that");
   277 				log("debug", "We have an open request, so sending on that");
   268 				oldest_request.headers = default_headers;
   278 				oldest_request.headers = default_headers;
   269 				oldest_request:send(t_concat({
   279 				local body_attr = { xmlns = "http://jabber.org/protocol/httpbind",
   270 					"<body xmlns='http://jabber.org/protocol/httpbind' ",
   280 					["xmlns:stream"] = "http://etherx.jabber.org/streams";
   271 					session.bosh_terminate and "type='terminate' " or "",
   281 					type = session.bosh_terminate and "terminate" or nil;
   272 					"sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>",
   282 					sid = sid;
   273 					tostring(s),
   283 				};
   274 					"</body>"
   284 				if creating_session then
   275 				}));
   285 					body_attr.wait = attr.wait;
   276 			elseif s ~= "" then
   286 					body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
   277 				log("debug", "Saved to send buffer because there are %d open requests", #r);
   287 					body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
   278 				-- Hmm, no requests are open :(
   288 					body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
   279 				t_insert(session.send_buffer, tostring(s));
   289 					body_attr.hold = tostring(session.bosh_hold);
   280 				log("debug", "There are now %d things in the send_buffer", #session.send_buffer);
   290 					body_attr.authid = sid;
       
   291 					body_attr.secure = "true";
       
   292 					body_attr.ver  = '1.6'; from = session.host;
       
   293 					body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
       
   294 					body_attr["xmpp:version"] = "1.0";
       
   295 				end
       
   296 				oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>");
       
   297 				session.send_buffer = {};
   281 			end
   298 			end
   282 			return true;
   299 			return true;
   283 		end
   300 		end
   284 		
       
   285 		-- Send creation response
       
   286 		
       
   287 		local features = st.stanza("stream:features");
       
   288 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
       
   289 		fire_event("stream-features", session, features);
       
   290 		--xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
       
   291 		local body = st.stanza("body", { xmlns = xmlns_bosh,
       
   292 			wait = attr.wait,
       
   293 			inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
       
   294 			polling = tostring(BOSH_DEFAULT_POLLING),
       
   295 			requests = tostring(BOSH_DEFAULT_REQUESTS),
       
   296 			hold = tostring(session.bosh_hold),
       
   297 			sid = sid, authid = sid,
       
   298 			ver  = '1.6', from = session.host,
       
   299 			secure = 'true', ["xmpp:version"] = "1.0",
       
   300 			["xmlns:xmpp"] = "urn:xmpp:xbosh",
       
   301 			["xmlns:stream"] = "http://etherx.jabber.org/streams"
       
   302 		}):add_child(features);
       
   303 		response.headers = default_headers;
       
   304 		response:send(tostring(body));
       
   305 		
       
   306 		request.sid = sid;
   301 		request.sid = sid;
   307 		return;
   302 		return;
   308 	end
   303 	end
   309 	
   304 	
   310 	local session = sessions[sid];
   305 	local session = sessions[sid];
   341 	end
   336 	end
   342 
   337 
   343 	context.notopen = nil; -- Signals that we accept this opening tag
   338 	context.notopen = nil; -- Signals that we accept this opening tag
   344 	t_insert(session.requests, response);
   339 	t_insert(session.requests, response);
   345 	context.sid = sid;
   340 	context.sid = sid;
       
   341 	session.bosh_processing = true; -- Used to suppress replies until processing of this request is done
   346 
   342 
   347 	if session.notopen then
   343 	if session.notopen then
   348 		local features = st.stanza("stream:features");
   344 		local features = st.stanza("stream:features");
   349 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
   345 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
   350 		fire_event("stream-features", session, features);
   346 		fire_event("stream-features", session, features);
   351 		session.send(features);
   347 		table.insert(session.send_buffer, tostring(features));
   352 		session.notopen = nil;
   348 		session.notopen = nil;
   353 	end
   349 	end
   354 end
   350 end
   355 
   351 
   356 function stream_callbacks.handlestanza(context, stanza)
   352 function stream_callbacks.handlestanza(context, stanza)
   360 	if session then
   356 	if session then
   361 		if stanza.attr.xmlns == xmlns_bosh then
   357 		if stanza.attr.xmlns == xmlns_bosh then
   362 			stanza.attr.xmlns = nil;
   358 			stanza.attr.xmlns = nil;
   363 		end
   359 		end
   364 		core_process_stanza(session, stanza);
   360 		core_process_stanza(session, stanza);
       
   361 	end
       
   362 end
       
   363 
       
   364 function stream_callbacks.streamclosed(request)
       
   365 	local session = sessions[request.sid];
       
   366 	if session then
       
   367 		session.bosh_processing = false;
       
   368 		if #session.send_buffer > 0 then
       
   369 			session.send("");
       
   370 		end
   365 	end
   371 	end
   366 end
   372 end
   367 
   373 
   368 function stream_callbacks.error(context, error)
   374 function stream_callbacks.error(context, error)
   369 	log("debug", "Error parsing BOSH request payload; %s", error);
   375 	log("debug", "Error parsing BOSH request payload; %s", error);