mod_posix: Move everything to util.startup
This allows greater control over the order of events.
Notably, the internal ordering between daemonization, initialization of
libunbound and setup of signal handling is sensitive.
libunbound starts a separate thread for processing DNS requests.
If this thread is started before signal handling has been set up, it
will not inherit the signal handlers and instead behave as it would have
before signal handlers were set up, i.e. cause the whole process to
immediately exit.
libunbound is usually initialized on the first DNS request, usually
triggered by an outgoing s2s connection attempt.
If daemonization happens before signals have been set up, signals may
not be processed at all.
-- Because we deal with pre-authed sessions and streams we can't be host-specific
module:set_global();
local filters = require "prosody.util.filters";
local throttle = require "prosody.util.throttle";
local timer = require "prosody.util.timer";
local ceil = math.ceil;
local limits_cfg = module:get_option("limits", {});
local limits_resolution = module:get_option_period("limits_resolution", 1);
local default_bytes_per_second = 3000;
local default_burst = 2;
local rate_units = { b = 1, k = 3, m = 6, g = 9, t = 12 } -- Plan for the future.
local function parse_rate(rate, sess_type)
local quantity, unit, exp;
if rate then
quantity, unit = rate:match("^(%d+) ?([^/]+)/s$");
exp = quantity and rate_units[unit:sub(1,1):lower()];
end
if not exp then
module:log("error", "Error parsing rate for %s: %q, using default rate (%d bytes/s)", sess_type, rate, default_bytes_per_second);
return default_bytes_per_second;
end
return quantity*(10^exp);
end
local function parse_burst(burst, sess_type)
if type(burst) == "string" then
burst = burst:match("^(%d+) ?s$");
end
local n_burst = tonumber(burst);
if burst and not n_burst then
module:log("error", "Unable to parse burst for %s: %q, using default burst interval (%ds)", sess_type, burst, default_burst);
end
return n_burst or default_burst;
end
-- Process config option into limits table:
-- limits = { c2s = { bytes_per_second = X, burst_seconds = Y } }
local limits = {
c2s = {
bytes_per_second = 10 * 1024;
burst_seconds = 2;
};
s2sin = {
bytes_per_second = 30 * 1024;
burst_seconds = 2;
};
};
for sess_type, sess_limits in pairs(limits_cfg) do
limits[sess_type] = {
bytes_per_second = parse_rate(sess_limits.rate, sess_type);
burst_seconds = parse_burst(sess_limits.burst, sess_type);
};
end
local default_filter_set = {};
function default_filter_set.bytes_in(bytes, session)
local sess_throttle = session.throttle;
if sess_throttle then
local ok, _, outstanding = sess_throttle:poll(#bytes, true);
if not ok then
session.log("debug", "Session over rate limit (%d) with %d (by %d), pausing", sess_throttle.max, #bytes, outstanding);
outstanding = ceil(outstanding);
session.conn:pause(); -- Read no more data from the connection until there is no outstanding data
local outstanding_data = bytes:sub(-outstanding);
bytes = bytes:sub(1, #bytes-outstanding);
timer.add_task(limits_resolution, function ()
if not session.conn then return; end
if sess_throttle:peek(#outstanding_data) then
session.log("debug", "Resuming paused session");
session.conn:resume();
end
-- Handle what we can of the outstanding data
session.data(outstanding_data);
end);
end
end
return bytes;
end
local type_filters = {
c2s = default_filter_set;
s2sin = default_filter_set;
s2sout = default_filter_set;
};
local function filter_hook(session)
local session_type = session.type:match("^[^_]+");
local filter_set, opts = type_filters[session_type], limits[session_type];
if opts then
if session.conn and session.conn.setlimit then
session.conn:setlimit(opts.bytes_per_second);
-- Currently no burst support
else
session.throttle = throttle.create(opts.bytes_per_second * opts.burst_seconds, opts.burst_seconds);
filters.add_filter(session, "bytes/in", filter_set.bytes_in, 1000);
end
end
end
function module.load()
filters.add_filter_hook(filter_hook);
end
function module.unload()
filters.remove_filter_hook(filter_hook);
end
function unlimited(session)
local session_type = session.type:match("^[^_]+");
if session.conn and session.conn.setlimit then
session.conn:setlimit(0);
-- Currently no burst support
else
local filter_set = type_filters[session_type];
filters.remove_filter(session, "bytes/in", filter_set.bytes_in);
session.throttle = nil;
end
end
function module.add_host(module)
local unlimited_jids = module:get_option_inherited_set("unlimited_jids", {});
if not unlimited_jids:empty() then
module:hook("authentication-success", function (event)
local session = event.session;
local jid = session.username .. "@" .. session.host;
if unlimited_jids:contains(jid) then
unlimited(session);
end
end);
module:hook("s2sout-established", function (event)
local session = event.session;
if unlimited_jids:contains(session.to_host) then
unlimited(session);
end
end);
module:hook("s2sin-established", function (event)
local session = event.session;
if session.from_host and unlimited_jids:contains(session.from_host) then
unlimited(session);
end
end);
end
end