158 if not session.smacks then return end -- not using |
158 if not session.smacks then return end -- not using |
159 if session.hibernating then return end -- can't ack when asleep |
159 if session.hibernating then return end -- can't ack when asleep |
160 if session.awaiting_ack then return end -- already waiting |
160 if session.awaiting_ack then return end -- already waiting |
161 if force then return force end |
161 if force then return force end |
162 local queue = session.outgoing_stanza_queue; |
162 local queue = session.outgoing_stanza_queue; |
163 local expected_h = session.last_acknowledged_stanza + queue:count_unacked(); |
163 local expected_h = queue:count_acked() + queue:count_unacked(); |
164 local max_unacked = max_unacked_stanzas; |
164 local max_unacked = max_unacked_stanzas; |
165 if session.state == "inactive" then |
165 if session.state == "inactive" then |
166 max_unacked = max_inactive_unacked_stanzas; |
166 max_unacked = max_inactive_unacked_stanzas; |
167 end |
167 end |
168 -- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong |
168 -- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong |
176 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); |
176 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); |
177 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
177 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
178 if session.destroyed then return end -- sending something can trigger destruction |
178 if session.destroyed then return end -- sending something can trigger destruction |
179 session.awaiting_ack = true; |
179 session.awaiting_ack = true; |
180 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
180 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
181 session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked(); |
181 session.last_requested_h = queue:count_acked() + queue:count_unacked(); |
182 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked()); |
182 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked()); |
183 if not session.delayed_ack_timer then |
183 if not session.delayed_ack_timer then |
184 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
184 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
185 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
185 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
186 end); |
186 end); |
238 end |
238 end |
239 |
239 |
240 local function wrap_session_out(session, resume) |
240 local function wrap_session_out(session, resume) |
241 if not resume then |
241 if not resume then |
242 session.outgoing_stanza_queue = smqueue.new(queue_size); |
242 session.outgoing_stanza_queue = smqueue.new(queue_size); |
243 session.last_acknowledged_stanza = 0; |
|
244 end |
243 end |
245 |
244 |
246 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); |
245 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); |
247 |
246 |
248 return session; |
247 return session; |