util.datamanager: Add O(1) list indexing with on-disk index
authorKim Alvefur <zash@zash.se>
Tue, 11 May 2021 02:09:56 +0200
changeset 13138 638f627e707f
parent 13137 3692265becb7
child 13139 3fd24e1945b0
util.datamanager: Add O(1) list indexing with on-disk index Index file contains offsets and lengths of each item() which allows seeking directly to each item and reading it without parsing the entire file. Also allows tricks like binary search, assuming items have some defined order. We take advantage of the 1-based indexing in tables to store a magic header in the 0 position, so that table index 1 ends up at file index 1.
util/datamanager.lua
--- a/util/datamanager.lua	Tue May 11 02:04:59 2021 +0200
+++ b/util/datamanager.lua	Tue May 11 02:09:56 2021 +0200
@@ -7,6 +7,7 @@
 --
 
 
+local string = string;
 local format = string.format;
 local setmetatable = setmetatable;
 local ipairs = ipairs;
@@ -17,11 +18,13 @@
 local os_remove = os.remove;
 local os_rename = os.rename;
 local tonumber = tonumber;
+local floor = math.floor;
 local next = next;
 local type = type;
 local t_insert = table.insert;
 local t_concat = table.concat;
 local envloadfile = require"prosody.util.envload".envloadfile;
+local envload = require"prosody.util.envload".envload;
 local serialize = require "prosody.util.serialization".serialize;
 local lfs = require "lfs";
 -- Extract directory separator from package.config (an undocumented string that comes with lua)
@@ -255,6 +258,13 @@
 	return true, pos;
 end
 
+local index_fmt, index_item_size, index_magic;
+if string.packsize then
+	index_fmt = "TT"; -- struct { size_t start, size_t length }
+	index_item_size = string.packsize(index_fmt);
+	index_magic = string.pack(index_fmt, 7767639, 1); -- Magic string: T9 for "prosody", version number
+end
+
 local function list_append(username, host, datastore, data)
 	if not data then return; end
 	if callback(username, host, datastore) == false then return true; end
@@ -267,6 +277,22 @@
 			datastore, msg, where, username or "nil", host or "nil");
 		return ok, msg;
 	end
+	if string.packsize then
+		local offset = type(msg) == "number" and msg or 0;
+		local index_entry = string.pack(index_fmt, offset, #data);
+		if offset == 0 then
+			index_entry = index_magic .. index_entry;
+		end
+		local ok, off = append(username, host, datastore, "lidx", index_entry);
+		off = off or 0;
+		-- If this was the first item, then both the data and index offsets should
+		-- be zero, otherwise there's some kind of mismatch and we should drop the
+		-- index and recreate it from scratch
+		-- TODO Actually rebuild the index in this case?
+		if not ok or (off == 0 and offset ~= 0) or (off ~= 0 and offset == 0) then
+			os_remove(getpath(username, host, datastore, "lidx"));
+		end
+	end
 	return true;
 end
 
@@ -280,6 +306,7 @@
 	for i, item in ipairs(data) do
 		d[i] = "item(" .. serialize(item) .. ");\n";
 	end
+	os_remove(getpath(username, host, datastore, "lidx"));
 	local ok, msg = atomic_store(getpath(username, host, datastore, "list", true), t_concat(d));
 	if not ok then
 		log("error", "Unable to write to %s storage ('%s') for user: %s@%s", datastore, msg, username or "nil", host or "nil");
@@ -294,6 +321,146 @@
 	return true;
 end
 
+local function build_list_index(username, host, datastore, items)
+	log("debug", "Building index for (%s@%s/%s)", username, host, datastore);
+	local filename = getpath(username, host, datastore, "list");
+	local fh, err, errno = io_open(filename);
+	if not fh then
+		return fh, err, errno;
+	end
+	local prev_pos = 0; -- position before reading
+	local last_item_start = 0;
+
+	if items and items[1] then
+		local last_item = items[#items];
+		last_item_start = fh:seek("set", last_item.start + last_item.length);
+	else
+		items = {};
+	end
+
+	for line in fh:lines() do
+		if line:sub(1, 4) == "item" then
+			if prev_pos ~= 0 then
+				t_insert(items, { start = last_item_start; length = prev_pos - last_item_start });
+			end
+			last_item_start = prev_pos
+		end
+		-- seek position is at the start of the next line within each loop iteration
+		-- so we need to collect the "current" position at the end of the previous
+		prev_pos = fh:seek()
+	end
+	if prev_pos ~= 0 then
+		t_insert(items, { start = last_item_start; length = prev_pos - last_item_start });
+	end
+	return items;
+end
+
+local function store_list_index(username, host, datastore, index)
+	local data = { index_magic };
+	for i, v in ipairs(index) do
+		data[i + 1] = string.pack(index_fmt, v.start, v.length);
+	end
+	local filename = getpath(username, host, datastore, "lidx");
+	return atomic_store(filename, t_concat(data));
+end
+
+local index_mt = {
+	__index = function(t, i)
+		if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then
+			return
+		end
+		if i < 0 then
+			return
+		end
+		local fh = t.file;
+		local pos = i * index_item_size;
+		if fh:seek("set", pos) ~= pos then
+			return nil
+		end
+		local data = fh:read(index_item_size);
+		if not data then
+			return nil
+		end
+		local start, length = string.unpack(index_fmt, data);
+		local v = { start = start; length = length };
+		t[i] = v;
+		return v;
+	end;
+	__len = function(t)
+		-- Account for both the header and the fence post error
+		return floor(t.file:seek("end") / index_item_size) - 1;
+	end;
+}
+
+local function get_list_index(username, host, datastore)
+	log("debug", "Loading index for (%s%s/%s)", username, host, datastore);
+	local index_filename = getpath(username, host, datastore, "lidx");
+	local ih = io_open(index_filename);
+	if ih then
+		local magic = ih:read(#index_magic);
+		if magic ~= index_magic then
+			log("warn", "Index %q has wrong version number (got %q, expected %q)", index_filename, magic, index_magic);
+			-- wrong version or something
+			ih:close();
+			ih = nil;
+		end
+	end
+
+	if ih then
+		return setmetatable({ file = ih }, index_mt);
+	end
+
+	local index, err = build_list_index(username, host, datastore);
+	if not index then
+		return index, err
+	end
+
+	-- TODO How to handle failure to store the index?
+	local dontcare = store_list_index(username, host, datastore, index); -- luacheck: ignore 211/dontcare
+	return index;
+end
+
+local function list_load_one(fh, start, length)
+	if fh:seek("set", start) ~= start then
+		return nil
+	end
+	local raw_data = fh:read(length)
+	if not raw_data or #raw_data ~= length then
+		return
+	end
+	local item;
+	local data, err, errno = envload(raw_data, "@list", {
+		item = function(i)
+			item = i;
+		end;
+	});
+	if not data then
+		return data, err, errno
+	end
+	local success, ret = pcall(data);
+	if not success then
+		return success, ret;
+	end
+	return item;
+end
+
+local indexed_list_mt = {
+	__index = function(t, i)
+		if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then
+			return
+		end
+		local ix = t.index[i];
+		if not ix then
+			return
+		end
+		local item = list_load_one(t.file, ix.start, ix.length);
+		return item;
+	end;
+	__len = function(t)
+		return #t.index;
+	end;
+}
+
 local function list_load(username, host, datastore)
 	local items = {};
 	local data, err, errno = envloadfile(getpath(username, host, datastore, "list"), {item = function(i) t_insert(items, i); end});
@@ -314,6 +481,27 @@
 	return items;
 end
 
+local function list_open(username, host, datastore)
+	if not index_magic then
+		log("warn", "Falling back from lazy loading to to loading full list for %s storage for user: %s@%s", datastore, username or "nil", host or "nil");
+		return list_load(username, host, datastore);
+	end
+	local filename = getpath(username, host, datastore, "list");
+	local file, err, errno = io_open(filename);
+	if not file then
+		if errno == ENOENT then
+			return nil;
+		end
+		return file, err, errno;
+	end
+	local index, err = get_list_index(username, host, datastore);
+	if not index then
+		file:close()
+		return index, err;
+	end
+	return setmetatable({ file = file; index = index }, indexed_list_mt);
+end
+
 local type_map = {
 	keyval = "dat";
 	list = "list";
@@ -414,4 +602,7 @@
 	purge = purge;
 	path_decode = decode;
 	path_encode = encode;
+
+	build_list_index = build_list_index;
+	list_open = list_open;
 };