plugins/mod_smacks.lua
changeset 12129 649268c9f603
parent 12118 e32f90c81519
child 12139 fa804c2db747
--- a/plugins/mod_smacks.lua	Mon Dec 27 16:05:12 2021 +0100
+++ b/plugins/mod_smacks.lua	Mon Dec 27 16:16:53 2021 +0100
@@ -15,6 +15,35 @@
 local tostring = tostring;
 local os_time = os.time;
 
+-- These metrics together allow to calculate an instantaneous
+-- "unacked stanzas" metric in the graphing frontend, without us having to
+-- iterate over all the queues.
+local tx_queued_stanzas = module:measure("tx_queued_stanzas", "counter");
+local tx_dropped_stanzas =  module:metric(
+	"histogram",
+	"tx_dropped_stanzas", "", "number of stanzas in a queue which got dropped",
+	{},
+	{buckets = {0, 1, 2, 4, 8, 16, 32}}
+):with_labels();
+local tx_acked_stanzas = module:metric(
+	"histogram",
+	"tx_acked_stanzas", "", "number of items acked per ack received",
+	{},
+	{buckets = {0, 1, 2, 4, 8, 16, 32}}
+):with_labels();
+
+-- number of session resumptions attempts where the session had expired
+local resumption_expired = module:measure("session_resumption_expired", "counter");
+local resumption_age = module:metric(
+	"histogram",
+	"resumption_age", "seconds", "time the session had been hibernating at the time of a resumption",
+	{},
+	{buckets = { 0, 1, 2, 5, 10, 20, 50, 100, 200, 500 }}
+):with_labels();
+local sessions_expired = module:measure("sessions_expired", "counter");
+local sessions_started = module:measure("sessions_started", "counter");
+
+
 local datetime = require "util.datetime";
 local add_filter = require "util.filters".add_filter;
 local jid = require "util.jid";
@@ -168,6 +197,7 @@
 		end
 
 		queue:push(cached_stanza);
+		tx_queued_stanzas(1);
 
 		if session.hibernating then
 			session.log("debug", "hibernating since %s, stanza queued", datetime.datetime(session.hibernating));
@@ -229,6 +259,7 @@
 
 local function wrap_session_in(session, resume)
 	if not resume then
+		sessions_started(1);
 		session.handled_stanza_count = 0;
 	end
 	add_filter(session, "stanzas/in", count_incoming_stanzas, 999);
@@ -349,8 +380,9 @@
 		origin:close(err);
 		return;
 	end
+	tx_acked_stanzas:sample(handled_stanza_count);
 
-	origin.log("debug", "#queue = %d", queue:count_unacked());
+	origin.log("debug", "#queue = %d (acked: %d)", queue:count_unacked(), handled_stanza_count);
 	request_ack_now_if_needed(origin, false, "handle_a", nil)
 	return true;
 end
@@ -359,7 +391,9 @@
 
 local function handle_unacked_stanzas(session)
 	local queue = session.outgoing_stanza_queue;
-	if queue:count_unacked() > 0 then
+	local unacked = queue:count_unacked()
+	if unacked > 0 then
+		tx_dropped_stanzas:sample(unacked);
 		session.smacks = false; -- Disable queueing
 		session.outgoing_stanza_queue = nil;
 		for stanza in queue._queue:consume() do
@@ -437,6 +471,7 @@
 		session.resumption_token = nil;
 		session.resending_unacked = true; -- stop outgoing_stanza_filter from re-queueing anything anymore
 		sessionmanager.destroy_session(session, "Hibernating too long");
+		sessions_expired(1);
 	end);
 	if session.conn then
 		local conn = session.conn;
@@ -490,6 +525,7 @@
 				:tag("item-not-found", { xmlns = xmlns_errors })
 			);
 			old_session_registry:set(session.username, id, nil);
+			resumption_expired(1);
 		else
 			session.log("debug", "Tried to resume non-existent session with id %s", id);
 			session.send(st.stanza("failed", { xmlns = xmlns_sm })
@@ -504,6 +540,12 @@
 		elseif session.hibernating then
 			original_session.log("error", "Hibernating session has no watchdog!")
 		end
+		-- zero age = was not hibernating yet
+		local age = 0;
+		if original_session.hibernating then
+			local now = os_time();
+			age = now - original_session.hibernating;
+		end
 		session.log("debug", "mod_smacks resuming existing session %s...", get_session_id(original_session));
 		original_session.log("debug", "mod_smacks session resumed from %s...", get_session_id(session));
 		-- TODO: All this should move to sessionmanager (e.g. session:replace(new_session))
@@ -581,6 +623,7 @@
 		module:fire_event("smacks-hibernation-end", {origin = session, resumed = original_session, queue = queue:table()});
 		original_session.awaiting_ack = nil; -- Don't wait for acks from before the resumption
 		request_ack_now_if_needed(original_session, true, "handle_resume", nil);
+		resumption_age:sample(age);
 	end
 	return true;
 end