plugins/mod_smacks.lua
changeset 12527 d2177cb5a766
parent 12523 d935af51c644
parent 12526 1671cb924002
child 12528 dd5ab9a6b599
equal deleted inserted replaced
12525:f07b3f6ad903 12527:d2177cb5a766
   158 	if not session.smacks then return end -- not using
   158 	if not session.smacks then return end -- not using
   159 	if session.hibernating then return end -- can't ack when asleep
   159 	if session.hibernating then return end -- can't ack when asleep
   160 	if session.awaiting_ack then return end -- already waiting
   160 	if session.awaiting_ack then return end -- already waiting
   161 	if force then return force end
   161 	if force then return force end
   162 	local queue = session.outgoing_stanza_queue;
   162 	local queue = session.outgoing_stanza_queue;
   163 	local expected_h = session.last_acknowledged_stanza + queue:count_unacked();
   163 	local expected_h = queue:count_acked() + queue:count_unacked();
   164 	local max_unacked = max_unacked_stanzas;
   164 	local max_unacked = max_unacked_stanzas;
   165 	if session.state == "inactive" then
   165 	if session.state == "inactive" then
   166 		max_unacked = max_inactive_unacked_stanzas;
   166 		max_unacked = max_inactive_unacked_stanzas;
   167 	end
   167 	end
   168 	-- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong
   168 	-- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong
   176 	session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked());
   176 	session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked());
   177 	(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
   177 	(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
   178 	if session.destroyed then return end -- sending something can trigger destruction
   178 	if session.destroyed then return end -- sending something can trigger destruction
   179 	session.awaiting_ack = true;
   179 	session.awaiting_ack = true;
   180 	-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
   180 	-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
   181 	session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked();
   181 	session.last_requested_h = queue:count_acked() + queue:count_unacked();
   182 	session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
   182 	session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
   183 	if not session.delayed_ack_timer then
   183 	if not session.delayed_ack_timer then
   184 		session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
   184 		session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
   185 			ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
   185 			ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
   186 		end);
   186 		end);
   238 end
   238 end
   239 
   239 
   240 local function wrap_session_out(session, resume)
   240 local function wrap_session_out(session, resume)
   241 	if not resume then
   241 	if not resume then
   242 		session.outgoing_stanza_queue = smqueue.new(queue_size);
   242 		session.outgoing_stanza_queue = smqueue.new(queue_size);
   243 		session.last_acknowledged_stanza = 0;
       
   244 	end
   243 	end
   245 
   244 
   246 	add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
   245 	add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
   247 
   246 
   248 	return session;
   247 	return session;