77 local inactive_sessions = {}; -- Sessions which have no open requests |
77 local inactive_sessions = {}; -- Sessions which have no open requests |
78 |
78 |
79 -- Used to respond to idle sessions (those with waiting requests) |
79 -- Used to respond to idle sessions (those with waiting requests) |
80 local waiting_requests = {}; |
80 local waiting_requests = {}; |
81 function on_destroy_request(request) |
81 function on_destroy_request(request) |
|
82 log("debug", "Request destroyed: %s", tostring(request)); |
82 waiting_requests[request] = nil; |
83 waiting_requests[request] = nil; |
83 local session = sessions[request.sid]; |
84 local session = sessions[request.context.sid]; |
84 if session then |
85 if session then |
85 local requests = session.requests; |
86 local requests = session.requests; |
86 for i,r in ipairs(requests) do |
87 for i, r in ipairs(requests) do |
87 if r == request then |
88 if r == request then |
88 t_remove(requests, i); |
89 t_remove(requests, i); |
89 break; |
90 break; |
90 end |
91 end |
91 end |
92 end |
97 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); |
98 (session.log or log)("debug", "BOSH session marked as inactive (for %ds)", max_inactive); |
98 end |
99 end |
99 end |
100 end |
100 end |
101 end |
101 |
102 |
102 function handle_request(method, body, request) |
103 local function handle_GET(request) |
103 if (not body) or request.method ~= "POST" then |
104 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; |
104 if request.method == "OPTIONS" then |
105 end |
105 local headers = {}; |
106 |
106 for k,v in pairs(default_headers) do headers[k] = v; end |
107 function handle_OPTIONS(request) |
107 headers["Content-Type"] = nil; |
108 local headers = {}; |
108 return { headers = headers, body = "" }; |
109 for k,v in pairs(default_headers) do headers[k] = v; end |
109 else |
110 headers["Content-Type"] = nil; |
110 return "<html><body>You really don't look like a BOSH client to me... what do you want?</body></html>"; |
111 return { headers = headers, body = "" }; |
111 end |
112 end |
112 end |
113 |
113 if not method then |
114 function handle_POST(event) |
114 log("debug", "Request %s suffered error %s", tostring(request.id), body); |
115 log("debug", "Handling new request %s: %s\n----------", tostring(event.request), tostring(event.request.body)); |
115 return; |
116 |
116 end |
117 local request, response = event.request, event.response; |
117 --log("debug", "Handling new request %s: %s\n----------", request.id, tostring(body)); |
118 response.on_destroy = on_destroy_request; |
118 request.notopen = true; |
119 local body = request.body; |
119 request.log = log; |
120 |
120 request.on_destroy = on_destroy_request; |
121 local context = { request = request, response = response, notopen = true }; |
121 |
122 local stream = new_xmpp_stream(context, stream_callbacks); |
122 local stream = new_xmpp_stream(request, stream_callbacks); |
123 response.context = context; |
123 |
124 |
124 -- stream:feed() calls the stream_callbacks, so all stanzas in |
125 -- stream:feed() calls the stream_callbacks, so all stanzas in |
125 -- the body are processed in this next line before it returns. |
126 -- the body are processed in this next line before it returns. |
126 local ok, err = stream:feed(body); |
127 -- In particular, the streamopened() stream callback is where |
127 if not ok then |
128 -- much of the session logic happens, because it's where we first |
128 log("error", "Failed to parse BOSH payload: %s", err); |
129 -- get to see the 'sid' of this request. |
129 end |
130 stream:feed(body); |
130 |
131 |
131 -- Stanzas (if any) in the request have now been processed, and |
132 -- Stanzas (if any) in the request have now been processed, and |
132 -- we take care of the high-level BOSH logic here, including |
133 -- we take care of the high-level BOSH logic here, including |
133 -- giving a response or putting the request "on hold". |
134 -- giving a response or putting the request "on hold". |
134 local session = sessions[request.sid]; |
135 local session = sessions[context.sid]; |
135 if session then |
136 if session then |
136 -- Session was marked as inactive, since we have |
137 -- Session was marked as inactive, since we have |
137 -- a request open now, unmark it |
138 -- a request open now, unmark it |
138 if inactive_sessions[session] and #session.requests > 0 then |
139 if inactive_sessions[session] and #session.requests > 0 then |
139 inactive_sessions[session] = nil; |
140 inactive_sessions[session] = nil; |
140 end |
141 end |
141 |
142 |
142 local r = session.requests; |
143 local r = session.requests; |
143 log("debug", "Session %s has %d out of %d requests open", request.sid, #r, session.bosh_hold); |
144 log("debug", "Session %s has %d out of %d requests open", context.sid, #r, session.bosh_hold); |
144 log("debug", "and there are %d things in the send_buffer", #session.send_buffer); |
145 log("debug", "and there are %d things in the send_buffer:", #session.send_buffer); |
|
146 for i, thing in ipairs(session.send_buffer) do |
|
147 log("debug", " %s", tostring(thing)); |
|
148 end |
145 if #r > session.bosh_hold then |
149 if #r > session.bosh_hold then |
146 -- We are holding too many requests, send what's in the buffer, |
150 -- We are holding too many requests, send what's in the buffer, |
147 log("debug", "We are holding too many requests, so..."); |
151 log("debug", "We are holding too many requests, so..."); |
148 if #session.send_buffer > 0 then |
152 if #session.send_buffer > 0 then |
149 log("debug", "...sending what is in the buffer") |
153 log("debug", "...sending what is in the buffer") |
159 local resp = t_concat(session.send_buffer); |
163 local resp = t_concat(session.send_buffer); |
160 session.send_buffer = {}; |
164 session.send_buffer = {}; |
161 session.send(resp); |
165 session.send(resp); |
162 end |
166 end |
163 |
167 |
164 if not request.destroyed then |
168 if not response.finished then |
165 -- We're keeping this request open, to respond later |
169 -- We're keeping this request open, to respond later |
166 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
170 log("debug", "Have nothing to say, so leaving request unanswered for now"); |
167 if session.bosh_wait then |
171 if session.bosh_wait then |
168 waiting_requests[request] = os_time() + session.bosh_wait; |
172 waiting_requests[response] = os_time() + session.bosh_wait; |
169 end |
173 end |
170 end |
174 end |
171 |
175 |
172 if session.bosh_terminate then |
176 if session.bosh_terminate then |
173 session.log("debug", "Closing session with %d requests open", #session.requests); |
177 session.log("debug", "Closing session with %d requests open", #session.requests); |
211 end |
215 end |
212 end |
216 end |
213 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); |
217 log("info", "Disconnecting client, <stream:error> is: %s", tostring(close_reply)); |
214 end |
218 end |
215 |
219 |
216 local session_close_response = { headers = default_headers, body = tostring(close_reply) }; |
220 local response_body = tostring(close_reply); |
217 |
|
218 for _, held_request in ipairs(session.requests) do |
221 for _, held_request in ipairs(session.requests) do |
219 held_request:send(session_close_response); |
222 held_request.headers = default_headers; |
220 held_request:destroy(); |
223 held_request:send(response_body); |
221 end |
224 end |
222 sessions[session.sid] = nil; |
225 sessions[session.sid] = nil; |
223 inactive_sessions[session] = nil; |
226 inactive_sessions[session] = nil; |
224 sm_destroy_session(session); |
227 sm_destroy_session(session); |
225 end |
228 end |
226 |
229 |
227 -- Handle the <body> tag in the request payload. |
230 -- Handle the <body> tag in the request payload. |
228 function stream_callbacks.streamopened(request, attr) |
231 function stream_callbacks.streamopened(context, attr) |
|
232 local request, response = context.request, context.response; |
229 local sid = attr.sid; |
233 local sid = attr.sid; |
230 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); |
234 log("debug", "BOSH body open (sid: %s)", sid or "<none>"); |
231 if not sid then |
235 if not sid then |
232 -- New session request |
236 -- New session request |
233 request.notopen = nil; -- Signals that we accept this opening tag |
237 context.notopen = nil; -- Signals that we accept this opening tag |
234 |
238 |
235 -- TODO: Sanity checks here (rid, to, known host, etc.) |
239 -- TODO: Sanity checks here (rid, to, known host, etc.) |
236 if not hosts[attr.to] then |
240 if not hosts[attr.to] then |
237 -- Unknown host |
241 -- Unknown host |
238 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); |
242 log("debug", "BOSH client tried to connect to unknown host: %s", tostring(attr.to)); |
239 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", |
243 local close_reply = st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", |
240 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); |
244 ["xmlns:stream"] = xmlns_streams, condition = "host-unknown" }); |
241 request:send(tostring(close_reply)); |
245 response:send(tostring(close_reply)); |
242 return; |
246 return; |
243 end |
247 end |
244 |
248 |
245 -- New session |
249 -- New session |
246 sid = new_uuid(); |
250 sid = new_uuid(); |
256 sessions[sid] = session; |
260 sessions[sid] = session; |
257 |
261 |
258 session.log("debug", "BOSH session created for request from %s", session.ip); |
262 session.log("debug", "BOSH session created for request from %s", session.ip); |
259 log("info", "New BOSH session, assigned it sid '%s'", sid); |
263 log("info", "New BOSH session, assigned it sid '%s'", sid); |
260 local r, send_buffer = session.requests, session.send_buffer; |
264 local r, send_buffer = session.requests, session.send_buffer; |
261 local response = { headers = default_headers } |
|
262 function session.send(s) |
265 function session.send(s) |
263 -- We need to ensure that outgoing stanzas have the jabber:client xmlns |
266 -- We need to ensure that outgoing stanzas have the jabber:client xmlns |
264 if s.attr and not s.attr.xmlns then |
267 if s.attr and not s.attr.xmlns then |
265 s = st.clone(s); |
268 s = st.clone(s); |
266 s.attr.xmlns = "jabber:client"; |
269 s.attr.xmlns = "jabber:client"; |
267 end |
270 end |
268 --log("debug", "Sending BOSH data: %s", tostring(s)); |
271 --log("debug", "Sending BOSH data: %s", tostring(s)); |
269 local oldest_request = r[1]; |
272 local oldest_request = r[1]; |
270 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then |
273 if oldest_request and (not(auto_cork) or waiting_requests[oldest_request]) then |
271 log("debug", "We have an open request, so sending on that"); |
274 log("debug", "We have an open request, so sending on that"); |
272 response.body = t_concat({ |
275 oldest_request.headers = default_headers; |
|
276 oldest_request:send(t_concat({ |
273 "<body xmlns='http://jabber.org/protocol/httpbind' ", |
277 "<body xmlns='http://jabber.org/protocol/httpbind' ", |
274 session.bosh_terminate and "type='terminate' " or "", |
278 session.bosh_terminate and "type='terminate' " or "", |
275 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", |
279 "sid='", sid, "' xmlns:stream = 'http://etherx.jabber.org/streams'>", |
276 tostring(s), |
280 tostring(s), |
277 "</body>" |
281 "</body>" |
278 }); |
282 })); |
279 oldest_request:send(response); |
|
280 --log("debug", "Sent"); |
|
281 if oldest_request.stayopen then |
|
282 if #r>1 then |
|
283 -- Move front request to back |
|
284 t_insert(r, oldest_request); |
|
285 t_remove(r, 1); |
|
286 end |
|
287 else |
|
288 log("debug", "Destroying the request now..."); |
|
289 oldest_request:destroy(); |
|
290 end |
|
291 elseif s ~= "" then |
283 elseif s ~= "" then |
292 log("debug", "Saved to send buffer because there are %d open requests", #r); |
284 log("debug", "Saved to send buffer because there are %d open requests", #r); |
293 -- Hmm, no requests are open :( |
285 -- Hmm, no requests are open :( |
294 t_insert(session.send_buffer, tostring(s)); |
286 t_insert(session.send_buffer, tostring(s)); |
295 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); |
287 log("debug", "There are now %d things in the send_buffer", #session.send_buffer); |
301 |
293 |
302 local features = st.stanza("stream:features"); |
294 local features = st.stanza("stream:features"); |
303 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
295 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
304 fire_event("stream-features", session, features); |
296 fire_event("stream-features", session, features); |
305 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' |
297 --xmpp:version='1.0' xmlns:xmpp='urn:xmpp:xbosh' |
306 local response = st.stanza("body", { xmlns = xmlns_bosh, |
298 local body = st.stanza("body", { xmlns = xmlns_bosh, |
307 wait = attr.wait, |
299 wait = attr.wait, |
308 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), |
300 inactivity = tostring(BOSH_DEFAULT_INACTIVITY), |
309 polling = tostring(BOSH_DEFAULT_POLLING), |
301 polling = tostring(BOSH_DEFAULT_POLLING), |
310 requests = tostring(BOSH_DEFAULT_REQUESTS), |
302 requests = tostring(BOSH_DEFAULT_REQUESTS), |
311 hold = tostring(session.bosh_hold), |
303 hold = tostring(session.bosh_hold), |
313 ver = '1.6', from = session.host, |
305 ver = '1.6', from = session.host, |
314 secure = 'true', ["xmpp:version"] = "1.0", |
306 secure = 'true', ["xmpp:version"] = "1.0", |
315 ["xmlns:xmpp"] = "urn:xmpp:xbosh", |
307 ["xmlns:xmpp"] = "urn:xmpp:xbosh", |
316 ["xmlns:stream"] = "http://etherx.jabber.org/streams" |
308 ["xmlns:stream"] = "http://etherx.jabber.org/streams" |
317 }):add_child(features); |
309 }):add_child(features); |
318 request:send{ headers = default_headers, body = tostring(response) }; |
310 response.headers = default_headers; |
|
311 response:send(tostring(body)); |
319 |
312 |
320 request.sid = sid; |
313 request.sid = sid; |
321 return; |
314 return; |
322 end |
315 end |
323 |
316 |
324 local session = sessions[sid]; |
317 local session = sessions[sid]; |
325 if not session then |
318 if not session then |
326 -- Unknown sid |
319 -- Unknown sid |
327 log("info", "Client tried to use sid '%s' which we don't know about", sid); |
320 log("info", "Client tried to use sid '%s' which we don't know about", sid); |
328 request:send{ headers = default_headers, body = tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" })) }; |
321 response.headers = default_headers; |
329 request.notopen = nil; |
322 response:send(tostring(st.stanza("body", { xmlns = xmlns_bosh, type = "terminate", condition = "item-not-found" }))); |
|
323 context.notopen = nil; |
330 return; |
324 return; |
331 end |
325 end |
332 |
326 |
333 if session.rid then |
327 if session.rid then |
334 local rid = tonumber(attr.rid); |
328 local rid = tonumber(attr.rid); |
351 -- Client wants to end this session, which we'll do |
345 -- Client wants to end this session, which we'll do |
352 -- after processing any stanzas in this request |
346 -- after processing any stanzas in this request |
353 session.bosh_terminate = true; |
347 session.bosh_terminate = true; |
354 end |
348 end |
355 |
349 |
356 request.notopen = nil; -- Signals that we accept this opening tag |
350 context.notopen = nil; -- Signals that we accept this opening tag |
357 t_insert(session.requests, request); |
351 t_insert(session.requests, response); |
358 request.sid = sid; |
352 context.sid = sid; |
359 |
353 |
360 if session.notopen then |
354 if session.notopen then |
361 local features = st.stanza("stream:features"); |
355 local features = st.stanza("stream:features"); |
362 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
356 hosts[session.host].events.fire_event("stream-features", { origin = session, features = features }); |
363 fire_event("stream-features", session, features); |
357 fire_event("stream-features", session, features); |
364 session.send(features); |
358 session.send(features); |
365 session.notopen = nil; |
359 session.notopen = nil; |
366 end |
360 end |
367 end |
361 end |
368 |
362 |
369 function stream_callbacks.handlestanza(request, stanza) |
363 function stream_callbacks.handlestanza(context, stanza) |
370 if request.ignore then return; end |
364 if context.ignore then return; end |
371 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); |
365 log("debug", "BOSH stanza received: %s\n", stanza:top_tag()); |
372 local session = sessions[request.sid]; |
366 local session = sessions[context.sid]; |
373 if session then |
367 if session then |
374 if stanza.attr.xmlns == xmlns_bosh then |
368 if stanza.attr.xmlns == xmlns_bosh then |
375 stanza.attr.xmlns = nil; |
369 stanza.attr.xmlns = nil; |
376 end |
370 end |
377 core_process_stanza(session, stanza); |
371 core_process_stanza(session, stanza); |
378 end |
372 end |
379 end |
373 end |
380 |
374 |
381 function stream_callbacks.error(request, error) |
375 function stream_callbacks.error(context, error) |
382 log("debug", "Error parsing BOSH request payload; %s", error); |
376 log("debug", "Error parsing BOSH request payload; %s", error); |
383 if not request.sid then |
377 if not context.sid then |
384 request:send({ headers = default_headers, status = "400 Bad Request" }); |
378 local response = context.response; |
|
379 response.headers = default_headers; |
|
380 response.status_code = 400; |
|
381 request:send(); |
385 return; |
382 return; |
386 end |
383 end |
387 |
384 |
388 local session = sessions[request.sid]; |
385 local session = sessions[context.sid]; |
389 if error == "stream-error" then -- Remote stream error, we close normally |
386 if error == "stream-error" then -- Remote stream error, we close normally |
390 session:close(); |
387 session:close(); |
391 else |
388 else |
392 session:close({ condition = "bad-format", text = "Error processing stream" }); |
389 session:close({ condition = "bad-format", text = "Error processing stream" }); |
393 end |
390 end |
398 -- log("debug", "Checking for requests soon to timeout..."); |
395 -- log("debug", "Checking for requests soon to timeout..."); |
399 -- Identify requests timing out within the next few seconds |
396 -- Identify requests timing out within the next few seconds |
400 local now = os_time() + 3; |
397 local now = os_time() + 3; |
401 for request, reply_before in pairs(waiting_requests) do |
398 for request, reply_before in pairs(waiting_requests) do |
402 if reply_before <= now then |
399 if reply_before <= now then |
403 log("debug", "%s was soon to timeout, sending empty response", request.id); |
400 log("debug", "%s was soon to timeout (at %d, now %d), sending empty response", tostring(request), reply_before, now); |
404 -- Send empty response to let the |
401 -- Send empty response to let the |
405 -- client know we're still here |
402 -- client know we're still here |
406 if request.conn then |
403 if request.conn then |
407 sessions[request.sid].send(""); |
404 sessions[request.context.sid].send(""); |
408 end |
405 end |
409 end |
406 end |
410 end |
407 end |
411 |
408 |
412 now = now - 3; |
409 now = now - 3; |
426 dead_sessions[i] = nil; |
423 dead_sessions[i] = nil; |
427 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); |
424 sm_destroy_session(session, "BOSH client silent for over "..session.bosh_max_inactive.." seconds"); |
428 end |
425 end |
429 return 1; |
426 return 1; |
430 end |
427 end |
431 |
428 module:add_timer(1, on_timer); |
432 |
429 |
433 local function setup() |
430 function module.add_host(module) |
434 local ports = module:get_option_array("bosh_ports") or { 5280 }; |
431 module:depends("http"); |
435 httpserver.new_from_config(ports, handle_request, { base = "http-bind" }); |
432 module:provides("http", { |
436 timer.add_task(1, on_timer); |
433 default_path = "/http-bind"; |
437 end |
434 route = { |
438 if prosody.start_time then -- already started |
435 ["GET /"] = handle_GET; |
439 setup(); |
436 ["OPTIONS /"] = handle_OPTIONS; |
440 else |
437 ["POST /"] = handle_POST; |
441 prosody.events.add_handler("server-started", setup); |
438 }; |
442 end |
439 }); |
|
440 end |