plugins/mod_smacks.lua
branch0.12
changeset 12526 1671cb924002
parent 12276 fe0f5c47fda3
child 12527 d2177cb5a766
child 12529 8087f5357f53
equal deleted inserted replaced
12524:bb5f772b3189 12526:1671cb924002
   141 	if not session.smacks then return end -- not using
   141 	if not session.smacks then return end -- not using
   142 	if session.hibernating then return end -- can't ack when asleep
   142 	if session.hibernating then return end -- can't ack when asleep
   143 	if session.awaiting_ack then return end -- already waiting
   143 	if session.awaiting_ack then return end -- already waiting
   144 	if force then return force end
   144 	if force then return force end
   145 	local queue = session.outgoing_stanza_queue;
   145 	local queue = session.outgoing_stanza_queue;
   146 	local expected_h = session.last_acknowledged_stanza + queue:count_unacked();
   146 	local expected_h = queue:count_acked() + queue:count_unacked();
   147 	local max_unacked = max_unacked_stanzas;
   147 	local max_unacked = max_unacked_stanzas;
   148 	if session.state == "inactive" then
   148 	if session.state == "inactive" then
   149 		max_unacked = max_inactive_unacked_stanzas;
   149 		max_unacked = max_inactive_unacked_stanzas;
   150 	end
   150 	end
   151 	-- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong
   151 	-- this check of last_requested_h prevents ack-loops if misbehaving clients report wrong
   159 	session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked());
   159 	session.log("debug", "Sending <r> (inside timer, before send) from %s - #queue=%d", reason, queue:count_unacked());
   160 	(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
   160 	(session.sends2s or session.send)(st.stanza("r", { xmlns = session.smacks }))
   161 	if session.destroyed then return end -- sending something can trigger destruction
   161 	if session.destroyed then return end -- sending something can trigger destruction
   162 	session.awaiting_ack = true;
   162 	session.awaiting_ack = true;
   163 	-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
   163 	-- expected_h could be lower than this expression e.g. more stanzas added to the queue meanwhile)
   164 	session.last_requested_h = session.last_acknowledged_stanza + queue:count_unacked();
   164 	session.last_requested_h = queue:count_acked() + queue:count_unacked();
   165 	session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
   165 	session.log("debug", "Sending <r> (inside timer, after send) from %s - #queue=%d", reason, queue:count_unacked());
   166 	if not session.delayed_ack_timer then
   166 	if not session.delayed_ack_timer then
   167 		session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
   167 		session.delayed_ack_timer = timer.add_task(delayed_ack_timeout, function()
   168 			ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
   168 			ack_delayed(session, nil); -- we don't know if this is the only new stanza in the queue
   169 		end);
   169 		end);
   221 end
   221 end
   222 
   222 
   223 local function wrap_session_out(session, resume)
   223 local function wrap_session_out(session, resume)
   224 	if not resume then
   224 	if not resume then
   225 		session.outgoing_stanza_queue = smqueue.new(queue_size);
   225 		session.outgoing_stanza_queue = smqueue.new(queue_size);
   226 		session.last_acknowledged_stanza = 0;
       
   227 	end
   226 	end
   228 
   227 
   229 	add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
   228 	add_filter(session, "stanzas/out", outgoing_stanza_filter, -999);
   230 
   229 
   231 	return session;
   230 	return session;