16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
16 local sm_new_session, sm_destroy_session = sessionmanager.new_session, sessionmanager.destroy_session; |
17 local uuid_generate = require "util.uuid".generate; |
17 local uuid_generate = require "util.uuid".generate; |
18 |
18 |
19 local xpcall, tostring, type = xpcall, tostring, type; |
19 local xpcall, tostring, type = xpcall, tostring, type; |
20 local traceback = debug.traceback; |
20 local traceback = debug.traceback; |
|
21 local t_insert, t_remove = table.insert, table.remove; |
|
22 local co_running, co_resume = coroutine.running, coroutine.resume; |
21 |
23 |
22 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
24 local xmlns_xmpp_streams = "urn:ietf:params:xml:ns:xmpp-streams"; |
23 |
25 |
24 local log = module._log; |
26 local log = module._log; |
25 |
27 |
29 |
31 |
30 local sessions = module:shared("sessions"); |
32 local sessions = module:shared("sessions"); |
31 local core_process_stanza = prosody.core_process_stanza; |
33 local core_process_stanza = prosody.core_process_stanza; |
32 local hosts = prosody.hosts; |
34 local hosts = prosody.hosts; |
33 |
35 |
34 local stream_callbacks = { default_ns = "jabber:client", handlestanza = core_process_stanza }; |
36 local stream_callbacks = { default_ns = "jabber:client" }; |
35 local listener = {}; |
37 local listener = {}; |
36 |
38 |
37 --- Stream events handlers |
39 --- Stream events handlers |
38 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
40 local stream_xmlns_attr = {xmlns='urn:ietf:params:xml:ns:xmpp-streams'}; |
39 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; |
41 local default_stream_attr = { ["xmlns:stream"] = "http://etherx.jabber.org/streams", xmlns = stream_callbacks.default_ns, version = "1.0", id = "" }; |
118 end |
120 end |
119 |
121 |
120 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end |
122 local function handleerr(err) log("error", "Traceback[c2s]: %s", traceback(tostring(err), 2)); end |
121 function stream_callbacks.handlestanza(session, stanza) |
123 function stream_callbacks.handlestanza(session, stanza) |
122 stanza = session.filter("stanzas/in", stanza); |
124 stanza = session.filter("stanzas/in", stanza); |
123 if stanza then |
125 t_insert(session.pending_stanzas, stanza); |
124 return xpcall(function () return core_process_stanza(session, stanza) end, handleerr); |
|
125 end |
|
126 end |
126 end |
127 |
127 |
128 --- Session methods |
128 --- Session methods |
129 local function session_close(session, reason) |
129 local function session_close(session, reason) |
130 local log = session.log or log; |
130 local log = session.log or log; |
222 function session.reset_stream() |
222 function session.reset_stream() |
223 session.notopen = true; |
223 session.notopen = true; |
224 session.stream:reset(); |
224 session.stream:reset(); |
225 end |
225 end |
226 |
226 |
|
227 session.thread = coroutine.create(function (stanza) |
|
228 while true do |
|
229 core_process_stanza(session, stanza); |
|
230 stanza = coroutine.yield("ready"); |
|
231 end |
|
232 end); |
|
233 |
|
234 session.pending_stanzas = {}; |
|
235 |
227 local filter = session.filter; |
236 local filter = session.filter; |
228 function session.data(data) |
237 function session.data(data) |
229 data = filter("bytes/in", data); |
238 -- Parse the data, which will store stanzas in session.pending_stanzas |
230 if data then |
239 if data then |
231 local ok, err = stream:feed(data); |
240 data = filter("bytes/in", data); |
232 if ok then return; end |
241 if data then |
233 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
242 local ok, err = stream:feed(data); |
234 session:close("not-well-formed"); |
243 if not ok then |
235 end |
244 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
236 end |
245 session:close("not-well-formed"); |
237 |
246 end |
238 |
247 end |
|
248 end |
|
249 |
|
250 if co_running() ~= session.thread and not session.paused then |
|
251 if session.state == "wait" then |
|
252 session.state = "ready"; |
|
253 local ok, state = co_resume(session.thread); |
|
254 if not ok then |
|
255 log("error", "Traceback[c2s]: %s", state); |
|
256 elseif state == "wait" then |
|
257 return; |
|
258 end |
|
259 end |
|
260 -- We're not currently running, so start the thread to process pending stanzas |
|
261 local s, thread = session.pending_stanzas, session.thread; |
|
262 local n = #s; |
|
263 while n > 0 and session.state ~= "wait" do |
|
264 session.log("debug", "processing %d stanzas", n); |
|
265 local consumed; |
|
266 for i = 1,n do |
|
267 local stanza = s[i]; |
|
268 local ok, state = co_resume(thread, stanza); |
|
269 if not ok then |
|
270 log("error", "Traceback[c2s]: %s", state); |
|
271 elseif state == "wait" then |
|
272 consumed = i; |
|
273 session.state = "wait"; |
|
274 break; |
|
275 end |
|
276 end |
|
277 if not consumed then consumed = n; end |
|
278 for i = 1, #s do |
|
279 s[i] = s[consumed+i]; |
|
280 end |
|
281 n = #s; |
|
282 end |
|
283 end |
|
284 end |
|
285 |
239 if c2s_timeout then |
286 if c2s_timeout then |
240 add_task(c2s_timeout, function () |
287 add_task(c2s_timeout, function () |
241 if session.type == "c2s_unauthed" then |
288 if session.type == "c2s_unauthed" then |
242 session:close("connection-timeout"); |
289 session:close("connection-timeout"); |
243 end |
290 end |
244 end); |
291 end); |
245 end |
292 end |
246 |
293 |
247 session.dispatch_stanza = stream_callbacks.handlestanza; |
294 session.dispatch_stanza = stream_callbacks.handlestanza; |
|
295 |
|
296 function session:sleep(by) |
|
297 session.log("debug", "Sleeping for %s", by); |
|
298 session.paused = by or "?"; |
|
299 session.conn:pause(); |
|
300 if co_running() == session.thread then |
|
301 coroutine.yield("wait"); |
|
302 end |
|
303 end |
|
304 function session:wake(by) |
|
305 assert(session.paused == (by or "?")); |
|
306 session.log("debug", "Waking for %s", by); |
|
307 session.paused = nil; |
|
308 session.conn:resume(); |
|
309 session.data(); --FIXME: next tick? |
|
310 end |
248 end |
311 end |
249 |
312 |
250 function listener.onincoming(conn, data) |
313 function listener.onincoming(conn, data) |
251 local session = sessions[conn]; |
314 local session = sessions[conn]; |
252 if session then |
315 if session then |