1 local st = require "util.stanza"; |
1 local st = require "util.stanza"; |
|
2 local uuid_generate = require "util.uuid".generate; |
2 |
3 |
3 local t_insert, t_remove = table.insert, table.remove; |
4 local t_insert, t_remove = table.insert, table.remove; |
4 local math_min = math.min; |
5 local math_min = math.min; |
|
6 local os_time = os.time; |
5 local tonumber, tostring = tonumber, tostring; |
7 local tonumber, tostring = tonumber, tostring; |
6 local add_filter = require "util.filters".add_filter; |
8 local add_filter = require "util.filters".add_filter; |
7 local timer = require "util.timer"; |
9 local timer = require "util.timer"; |
8 |
10 |
9 local xmlns_sm = "urn:xmpp:sm:2"; |
11 local xmlns_sm = "urn:xmpp:sm:2"; |
|
12 local xmlns_errors = "urn:ietf:params:xml:ns:xmpp-stanzas"; |
10 |
13 |
11 local sm_attr = { xmlns = xmlns_sm }; |
14 local sm_attr = { xmlns = xmlns_sm }; |
12 |
15 |
13 local resume_timeout = 300; |
16 local resume_timeout = module:get_option("smacks_hibernation_time", 300); |
14 local max_unacked_stanzas = 0; |
17 local max_unacked_stanzas = 0; |
|
18 |
|
19 local session_registry = {}; |
15 |
20 |
16 module:hook("stream-features", |
21 module:hook("stream-features", |
17 function (event) |
22 function (event) |
18 event.features:tag("sm", sm_attr):tag("optional"):up():up(); |
23 event.features:tag("sm", sm_attr):tag("optional"):up():up(); |
19 end); |
24 end); |
21 module:hook("s2s-stream-features", |
26 module:hook("s2s-stream-features", |
22 function (event) |
27 function (event) |
23 event.features:tag("sm", sm_attr):tag("optional"):up():up(); |
28 event.features:tag("sm", sm_attr):tag("optional"):up():up(); |
24 end); |
29 end); |
25 |
30 |
26 module:hook_stanza(xmlns_sm, "enable", |
31 module:hook_stanza("http://etherx.jabber.org/streams", "features", |
27 function (session, stanza) |
32 function (session, stanza) |
28 module:log("debug", "Enabling stream management"); |
33 if not session.smacks and stanza:get_child("sm", xmlns_sm) then |
29 session.smacks = true; |
34 session.send(st.stanza("enable", sm_attr)); |
30 |
35 end |
31 -- Overwrite process_stanza() and send() |
36 end); |
32 local queue = {}; |
37 |
33 session.outgoing_stanza_queue = queue; |
38 local function wrap_session(session, resume) |
34 session.last_acknowledged_stanza = 0; |
39 -- Overwrite process_stanza() and send() |
35 local _send = session.sends2s or session.send; |
40 local queue; |
36 local function new_send(stanza) |
41 if not resume then |
37 local attr = stanza.attr; |
42 queue = {}; |
38 if attr and not attr.xmlns then -- Stanza in default stream namespace |
43 session.outgoing_stanza_queue = queue; |
39 queue[#queue+1] = st.clone(stanza); |
44 session.last_acknowledged_stanza = 0; |
40 end |
45 else |
41 local ok, err = _send(stanza); |
46 queue = session.outgoing_stanza_queue; |
42 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then |
47 end |
43 session.awaiting_ack = true; |
48 |
44 return _send(st.stanza("r", { xmlns = xmlns_sm })); |
49 local _send = session.sends2s or session.send; |
45 end |
50 local function new_send(stanza) |
46 return ok, err; |
51 local attr = stanza.attr; |
47 end |
52 if attr and not attr.xmlns then -- Stanza in default stream namespace |
48 |
53 queue[#queue+1] = st.clone(stanza); |
49 if session.sends2s then |
54 end |
50 session.sends2s = new_send; |
55 local ok, err = _send(stanza); |
51 else |
56 if ok and #queue > max_unacked_stanzas and not session.awaiting_ack then |
52 session.send = new_send; |
57 session.awaiting_ack = true; |
53 end |
58 return _send(st.stanza("r", sm_attr)); |
54 |
59 end |
55 session.handled_stanza_count = 0; |
60 return ok, err; |
56 add_filter(session, "stanzas/in", function (stanza) |
61 end |
57 if not stanza.attr.xmlns then |
62 |
58 session.handled_stanza_count = session.handled_stanza_count + 1; |
63 if session.sends2s then |
59 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); |
64 session.sends2s = new_send; |
60 end |
65 else |
61 return stanza; |
66 session.send = new_send; |
62 end); |
67 end |
63 |
68 |
64 if not stanza.attr.resume then -- FIXME: Resumption should be a different spec :/ |
69 if not resume then |
65 _send(st.stanza("enabled", sm_attr)); |
70 session.handled_stanza_count = 0; |
66 return true; |
71 add_filter(session, "stanzas/in", function (stanza) |
67 end |
72 if not stanza.attr.xmlns then |
68 end, 100); |
73 session.handled_stanza_count = session.handled_stanza_count + 1; |
|
74 session.log("debug", "Handled %d incoming stanzas", session.handled_stanza_count); |
|
75 end |
|
76 return stanza; |
|
77 end); |
|
78 end |
|
79 |
|
80 return session; |
|
81 end |
|
82 |
|
83 module:hook_stanza(xmlns_sm, "enable", function (session, stanza) |
|
84 module:log("debug", "Enabling stream management"); |
|
85 session.smacks = true; |
|
86 |
|
87 wrap_session(session); |
|
88 |
|
89 local resume_token; |
|
90 local resume = stanza.attr.resume; |
|
91 if resume == "true" or resume == "1" then |
|
92 resume_token = uuid_generate(); |
|
93 session_registry[resume_token] = session; |
|
94 session.resumption_token = resume_token; |
|
95 end |
|
96 session.send(st.stanza("enabled", { xmlns = xmlns_sm, id = resume_token, resume = resume })); |
|
97 return true; |
|
98 end, 100); |
69 |
99 |
70 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) |
100 module:hook_stanza(xmlns_sm, "r", function (origin, stanza) |
71 if not origin.smacks then |
101 if not origin.smacks then |
72 module:log("debug", "Received ack request from non-smack-enabled session"); |
102 module:log("debug", "Received ack request from non-smack-enabled session"); |
73 return; |
103 return; |
86 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza; |
116 local handled_stanza_count = tonumber(stanza.attr.h)-origin.last_acknowledged_stanza; |
87 local queue = origin.outgoing_stanza_queue; |
117 local queue = origin.outgoing_stanza_queue; |
88 if handled_stanza_count > #queue then |
118 if handled_stanza_count > #queue then |
89 module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", |
119 module:log("warn", "The client says it handled %d new stanzas, but we only sent %d :)", |
90 handled_stanza_count, #queue); |
120 handled_stanza_count, #queue); |
|
121 module:log("debug", "Client h: %d, our h: %d", tonumber(stanza.attr.h), origin.last_acknowledged_stanza); |
91 for i=1,#queue do |
122 for i=1,#queue do |
92 module:log("debug", "Q item %d: %s", i, tostring(queue[i])); |
123 module:log("debug", "Q item %d: %s", i, tostring(queue[i])); |
93 end |
124 end |
94 end |
125 end |
95 for i=1,math_min(handled_stanza_count,#queue) do |
126 for i=1,math_min(handled_stanza_count,#queue) do |
132 module:log("warn", "::%s", tostring(queue[i])); |
163 module:log("warn", "::%s", tostring(queue[i])); |
133 end |
164 end |
134 handle_unacked_stanzas(session); |
165 handle_unacked_stanzas(session); |
135 end |
166 end |
136 else |
167 else |
137 session.hibernating = true; |
168 local hibernate_time = os_time(); -- Track the time we went into hibernation |
|
169 session.hibernating = hibernate_time; |
138 timer.add_task(resume_timeout, function () |
170 timer.add_task(resume_timeout, function () |
139 if session.hibernating then |
171 -- Check the hibernate time still matches what we think it is, |
|
172 -- otherwise the session resumed and re-hibernated. |
|
173 if session.hibernating == hibernate_time then |
|
174 session_registry[session.resumption_token] = nil; |
140 session.resumption_token = nil; |
175 session.resumption_token = nil; |
141 sessionmanager.destroy_session(session); -- Re-destroy |
176 -- This recursion back into our destroy handler is to |
|
177 -- make sure we still handle any queued stanzas |
|
178 sessionmanager.destroy_session(session); |
142 end |
179 end |
143 end); |
180 end); |
144 return; -- Postpone destruction for now |
181 return; -- Postpone destruction for now |
145 end |
182 end |
146 |
183 |
147 end |
184 end |
148 return _destroy_session(session, err); |
185 return _destroy_session(session, err); |
149 end |
186 end |
|
187 |
|
188 module:hook_stanza(xmlns_sm, "resume", function (session, stanza) |
|
189 local id = stanza.attr.previd; |
|
190 local original_session = session_registry[id]; |
|
191 if not original_session then |
|
192 session.send(st.stanza("failed", sm_attr) |
|
193 :tag("item-not-found", { xmlns = xmlns_errors }) |
|
194 ); |
|
195 elseif session.username == original_session.username |
|
196 and session.host == original_session.host then |
|
197 -- TODO: All this should move to sessionmanager (e.g. session:replace(new_session)) |
|
198 original_session.ip = session.ip; |
|
199 original_session.conn = session.conn; |
|
200 original_session.send = session.send; |
|
201 original_session.stream = session.stream; |
|
202 original_session.secure = session.secure; |
|
203 original_session.hibernating = nil; |
|
204 local filter = original_session.filter; |
|
205 local stream = session.stream; |
|
206 local log = session.log; |
|
207 function original_session.data(data) |
|
208 data = filter("bytes/in", data); |
|
209 if data then |
|
210 local ok, err = stream:feed(data); |
|
211 if ok then return; end |
|
212 log("debug", "Received invalid XML (%s) %d bytes: %s", tostring(err), #data, data:sub(1, 300):gsub("[\r\n]+", " "):gsub("[%z\1-\31]", "_")); |
|
213 original_session:close("xml-not-well-formed"); |
|
214 end |
|
215 end |
|
216 wrap_session(original_session, true); |
|
217 -- Inform xmppstream of the new session (passed to its callbacks) |
|
218 stream:set_session(original_session); |
|
219 -- Similar for connlisteners |
|
220 require "net.connlisteners".get("xmppclient").associate_session(session.conn, original_session); |
|
221 |
|
222 session.send(st.stanza("resumed", { xmlns = xmlns_sm, |
|
223 h = original_session.handled_stanza_count, previd = id })); |
|
224 |
|
225 -- Fake an <a> with the h of the <resume/> from the client |
|
226 original_session:dispatch_stanza(st.stanza("a", { xmlns = xmlns_sm, |
|
227 h = stanza.attr.h })); |
|
228 |
|
229 -- Ok, we need to re-send any stanzas that the client didn't see |
|
230 -- ...they are what is now left in the outgoing stanza queue |
|
231 local queue = original_session.outgoing_stanza_queue; |
|
232 for i=1,#queue do |
|
233 session.send(queue[i]); |
|
234 end |
|
235 else |
|
236 log("warn", "Client %s@%s[%s] tried to resume stream for %s@%s[%s]", |
|
237 session.username or "?", session.host or "?", session.type, |
|
238 original_session.username or "?", original_session.host or "?", original_session.type); |
|
239 session.send(st.stanza("failed", sm_attr) |
|
240 :tag("not-authorized", { xmlns = xmlns_errors })); |
|
241 end |
|
242 return true; |
|
243 end); |