mod_csi_simple: Count buffered items and flush when it reaches configured limit
authorKim Alvefur <zash@zash.se>
Sun, 24 Mar 2019 18:33:38 +0100
changeset 9916 601f9781a605
parent 9915 ed011935c22d
child 9917 7d78b24d8449
mod_csi_simple: Count buffered items and flush when it reaches configured limit In this mode, stanzas have been serialized to strings in the internal net.server buffer, so it is difficult to count them after the fact.
plugins/mod_csi_simple.lua
--- a/plugins/mod_csi_simple.lua	Sun Mar 24 18:32:50 2019 +0100
+++ b/plugins/mod_csi_simple.lua	Sun Mar 24 18:33:38 2019 +0100
@@ -10,6 +10,7 @@
 local st = require "util.stanza";
 local dt = require "util.datetime";
 local new_queue = require "util.queue".new;
+local filters = require "util.filters";
 
 local function new_pump(output, ...)
 	-- luacheck: ignore 212/self
@@ -92,10 +93,22 @@
 	return stanza;
 end
 
+local function manage_buffer(stanza, session)
+	local ctr = session.csi_counter or 0;
+	if ctr >= queue_size or module:fire_event("csi-is-stanza-important", { stanza = stanza, session = session }) then
+		session.conn:resume_writes();
+	else
+		stanza = with_timestamp(stanza, jid.join(session.username, session.host))
+	end
+	session.csi_counter = ctr + 1;
+	return stanza;
+end
+
 module:hook("csi-client-inactive", function (event)
 	local session = event.origin;
 	if session.conn and session.conn and session.conn.pause_writes then
 		session.conn:pause_writes();
+		filters.add_filter(session, "stanzas/out", manage_buffer);
 	elseif session.pump then
 		session.pump:pause();
 	else
@@ -122,7 +135,16 @@
 	if session.pump then
 		session.pump:resume();
 	elseif session.conn and session.conn and session.conn.resume_writes then
+		filters.remove_filter(session, "stanzas/out", manage_buffer);
 		session.conn:resume_writes();
 	end
 end);
 
+
+module:hook("c2s-ondrain", function (event)
+	local session = event.session;
+	if session.state == "inactive" and session.conn and session.conn and session.conn.pause_writes then
+		session.csi_counter = 0;
+		session.conn:pause_writes();
+	end
+end);