mod_bosh: Correctly handle data included in the session initiation request, and cork session while a request is being processed, preventing replying to requests when there may be more data to come, reducing round-trips.
authorMatthew Wild <mwild1@gmail.com>
Sun, 29 Jul 2012 01:56:45 +0100
changeset 5046 16c7b510694b
parent 5045 4ba6940deed0
child 5047 aab64739022e
mod_bosh: Correctly handle data included in the session initiation request, and cork session while a request is being processed, preventing replying to requests when there may be more data to come, reducing round-trips.
plugins/mod_bosh.lua
--- a/plugins/mod_bosh.lua	Sat Jul 28 22:37:24 2012 +0200
+++ b/plugins/mod_bosh.lua	Sun Jul 29 01:56:45 2012 +0100
@@ -242,7 +242,7 @@
 		-- New session
 		sid = new_uuid();
 		local session = {
-			type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid), host = attr.to,
+			type = "c2s_unauthed", conn = {}, sid = sid, rid = tonumber(attr.rid)-1, host = attr.to,
 			bosh_version = attr.ver, bosh_wait = attr.wait, streamid = sid,
 			bosh_hold = BOSH_DEFAULT_HOLD, bosh_max_inactive = BOSH_DEFAULT_INACTIVITY,
 			requests = { }, send_buffer = {}, reset_stream = bosh_reset_stream,
@@ -254,6 +254,14 @@
 		
 		session.log("debug", "BOSH session created for request from %s", session.ip);
 		log("info", "New BOSH session, assigned it sid '%s'", sid);
+
+		-- Send creation response
+		local creating_session = true;
+		local features = st.stanza("stream:features");
+		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
+		fire_event("stream-features", session, features);
+		table.insert(session.send_buffer, tostring(features));
+
 		local r = session.requests;
 		function session.send(s)
 			-- We need to ensure that outgoing stanzas have the jabber:client xmlns
@@ -262,47 +270,34 @@
 				s.attr.xmlns = "jabber:client";
 			end
 			--log("debug", "Sending BOSH data: %s", tostring(s));
+			t_insert(session.send_buffer, tostring(s));
+
 			local oldest_request = r[1];
-			if oldest_request then
+			if oldest_request and not session.bosh_processing then
 				log("debug", "We have an open request, so sending on that");
 				oldest_request.headers = default_headers;
-				oldest_request:send(t_concat({
-					"<body xmlns='http://jabber.org/protocol/httpbind' ",
-					session.bosh_terminate and "type='terminate' " or "",
-					"sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>",
-					tostring(s),
-					"</body>"
-				}));
-			elseif s ~= "" then
-				log("debug", "Saved to send buffer because there are %d open requests", #r);
-				-- Hmm, no requests are open :(
-				t_insert(session.send_buffer, tostring(s));
-				log("debug", "There are now %d things in the send_buffer", #session.send_buffer);
+				local body_attr = { xmlns = "http://jabber.org/protocol/httpbind",
+					["xmlns:stream"] = "http://etherx.jabber.org/streams";
+					type = session.bosh_terminate and "terminate" or nil;
+					sid = sid;
+				};
+				if creating_session then
+					body_attr.wait = attr.wait;
+					body_attr.inactivity = tostring(BOSH_DEFAULT_INACTIVITY);
+					body_attr.polling = tostring(BOSH_DEFAULT_POLLING);
+					body_attr.requests = tostring(BOSH_DEFAULT_REQUESTS);
+					body_attr.hold = tostring(session.bosh_hold);
+					body_attr.authid = sid;
+					body_attr.secure = "true";
+					body_attr.ver  = '1.6'; from = session.host;
+					body_attr["xmlns:xmpp"] = "urn:xmpp:xbosh";
+					body_attr["xmpp:version"] = "1.0";
+				end
+				oldest_request:send(st.stanza("body", body_attr):top_tag()..t_concat(session.send_buffer).."</body>");
+				session.send_buffer = {};
 			end
 			return true;
 		end
-		
-		-- Send creation response
-		
-		local features = st.stanza("stream:features");
-		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
-		fire_event("stream-features", session, features);
-		--xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh'
-		local body = st.stanza("body", { xmlns = xmlns_bosh,
-			wait = attr.wait,
-			inactivity = tostring(BOSH_DEFAULT_INACTIVITY),
-			polling = tostring(BOSH_DEFAULT_POLLING),
-			requests = tostring(BOSH_DEFAULT_REQUESTS),
-			hold = tostring(session.bosh_hold),
-			sid = sid, authid = sid,
-			ver  = '1.6', from = session.host,
-			secure = 'true', ["xmpp:version"] = "1.0",
-			["xmlns:xmpp"] = "urn:xmpp:xbosh",
-			["xmlns:stream"] = "http://etherx.jabber.org/streams"
-		}):add_child(features);
-		response.headers = default_headers;
-		response:send(tostring(body));
-		
 		request.sid = sid;
 		return;
 	end
@@ -343,12 +338,13 @@
 	context.notopen = nil; -- Signals that we accept this opening tag
 	t_insert(session.requests, response);
 	context.sid = sid;
+	session.bosh_processing = true; -- Used to suppress replies until processing of this request is done
 
 	if session.notopen then
 		local features = st.stanza("stream:features");
 		hosts[session.host].events.fire_event("stream-features", { origin = session, features = features });
 		fire_event("stream-features", session, features);
-		session.send(features);
+		table.insert(session.send_buffer, tostring(features));
 		session.notopen = nil;
 	end
 end
@@ -365,6 +361,16 @@
 	end
 end
 
+function stream_callbacks.streamclosed(request)
+	local session = sessions[request.sid];
+	if session then
+		session.bosh_processing = false;
+		if #session.send_buffer > 0 then
+			session.send("");
+		end
+	end
+end
+
 function stream_callbacks.error(context, error)
 	log("debug", "Error parsing BOSH request payload; %s", error);
 	if not context.sid then