141 if not session.smacks then return end -- not using |
141 if not session.smacks then return end -- not using |
142 if session.hibernating then return end -- can't ack when asleep |
142 if session.hibernating then return end -- can't ack when asleep |
143 if session.awaiting_ack then return end -- already waiting |
143 if session.awaiting_ack then return end -- already waiting |
144 if force then return force end |
144 if force then return force end |
145 local queue = session.outgoing_stanza_queue; |
145 local queue = session.outgoing_stanza_queue; |
146 local expected_h = session.last_acknowledged_stanza + queue:count_unacked(); |
146 local expected_h = queue:count_acked() + queue:count_unacked(); |
147 local max_unacked = max_unacked_stanzas; |
147 local max_unacked = max_unacked_stanzas; |
148 if session.state == "inactive" then |
148 if session.state == "inactive" then |
149 max_unacked = max_inactive_unacked_stanzas; |
149 max_unacked = max_inactive_unacked_stanzas; |
150 end |
150 end |
151 -- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong |
151 -- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong |
159 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); |
159 session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked()); |
160 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
160 (session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks })) |
161 if session.destroyed then return end -- sending something can trigger destruction |
161 if session.destroyed then return end -- sending something can trigger destruction |
162 session.awaiting_ack = true; |
162 session.awaiting_ack = true; |
163 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
163 -- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile) |
164 session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked(); |
164 session.last_requested_h = queue:count_acked() + queue:count_unacked(); |
165 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked()); |
165 session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked()); |
166 if not session.delayed_ack_timer then |
166 if not session.delayed_ack_timer then |
167 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
167 session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function() |
168 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
168 ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue |
169 end); |
169 end); |
221 end |
221 end |
222 |
222 |
223 local function wrap_session_out(session, resume) |
223 local function wrap_session_out(session, resume) |
224 if not resume then |
224 if not resume then |
225 session.outgoing_stanza_queue = smqueue.new(queue_size); |
225 session.outgoing_stanza_queue = smqueue.new(queue_size); |
226 session.last_acknowledged_stanza = 0; |
|
227 end |
226 end |
228 |
227 |
229 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); |
228 add_filter(session, "stanzas/out", outgoing_stanza_filter, -999); |
230 |
229 |
231 return session; |
230 return session; |