mod_anti_spam/rtbl.lib.lua
changeset 5863 259ffdbf8906
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/mod_anti_spam/rtbl.lib.lua	Tue Mar 05 18:26:29 2024 +0000
@@ -0,0 +1,122 @@
+local array = require "util.array";
+local id = require "util.id";
+local it = require "util.iterators";
+local set = require "util.set";
+local st = require "util.stanza";
+
+module:depends("pubsub_subscription");
+
+local function new_rtbl_subscription(rtbl_service_jid, rtbl_node, handlers)
+	local items = {};
+
+	local function notify(event_type, hash)
+		local handler = handlers[event_type];
+		if not handler then return; end
+		handler(hash);
+	end
+
+	module:add_item("pubsub-subscription", {
+		service = rtbl_service_jid;
+		node = rtbl_node;
+
+		-- Callbacks:
+		on_subscribed = function()
+			module:log("info", "RTBL active: %s:%s", rtbl_service_jid, rtbl_node);
+		end;
+
+		on_error = function(err)
+			module:log(
+				"error",
+				"Failed to subscribe to RTBL: %s:%s %s::%s:  %s",
+				rtbl_service_jid,
+				rtbl_node,
+				err.type,
+				err.condition,
+				err.text
+			);
+		end;
+
+		on_item = function(event)
+			local hash = event.item.attr.id;
+			if not hash then return; end
+			module:log("debug", "Received new hash from %s:%s: %s", rtbl_service_jid, rtbl_node, hash);
+			items[hash] = true;
+			notify("added", hash);
+		end;
+
+		on_retract = function (event)
+			local hash = event.item.attr.id;
+			if not hash then return; end
+			module:log("debug", "Retracted hash from %s:%s: %s", rtbl_service_jid, rtbl_node, hash);
+			items[hash] = nil;
+			notify("removed", hash);
+		end;
+
+		purge = function()
+			module:log("debug", "Purge all hashes from %s:%s", rtbl_service_jid, rtbl_node);
+			for hash in pairs(items) do
+				items[hash] = nil;
+				notify("removed", hash);
+			end
+		end;
+	});
+
+	local request_id = "rtbl-request-"..id.short();
+
+	local function request_list()
+		local items_request = st.iq({ to = rtbl_service_jid, from = module.host, type = "get", id = request_id })
+			:tag("pubsub", { xmlns = "http://jabber.org/protocol/pubsub" })
+				:tag("items", { node = rtbl_node }):up()
+			:up();
+		module:send(items_request);
+	end
+
+	local function update_list(event)
+		local from_jid = event.stanza.attr.from;
+		if from_jid ~= rtbl_service_jid then
+			module:log("debug", "Ignoring RTBL response from unknown sender: %s", from_jid);
+			return;
+		end
+		local items_el = event.stanza:find("{http://jabber.org/protocol/pubsub}pubsub/items");
+		if not items_el then
+			module:log("warn", "Invalid items response from RTBL service %s:%s", rtbl_service_jid, rtbl_node);
+			return;
+		end
+
+		local old_entries = set.new(array.collect(it.keys(items)));
+
+		local n_added, n_removed, n_total = 0, 0, 0;
+		for item in items_el:childtags("item") do
+			local hash = item.attr.id;
+			if hash then
+				n_total = n_total + 1;
+				if not old_entries:contains(hash) then
+					-- New entry
+					n_added = n_added + 1;
+					items[hash] = true;
+					notify("added", hash);
+				else
+					-- Entry already existed
+					old_entries:remove(hash);
+				end
+			end
+		end
+
+		-- Remove old entries that weren't in the received list
+		for hash in old_entries do
+			n_removed = n_removed + 1;
+			items[hash] = nil;
+			notify("removed", hash);
+		end
+
+		module:log("info", "%d RTBL entries received from %s:%s (%d added, %d removed)", n_total, from_jid, rtbl_node, n_added, n_removed);
+		return true;
+	end
+
+	module:hook("iq-result/host/"..request_id, update_list);
+	module:add_timer(0, request_list);
+end
+
+return {
+	new_rtbl_subscription = new_rtbl_subscription;
+}