util/datamanager.lua
changeset 13138 638f627e707f
parent 12979 d10957394a3c
child 13141 b417a49cc31b
--- a/util/datamanager.lua	Tue May 11 02:04:59 2021 +0200
+++ b/util/datamanager.lua	Tue May 11 02:09:56 2021 +0200
@@ -7,6 +7,7 @@
 --
 
 
+local string = string;
 local format = string.format;
 local setmetatable = setmetatable;
 local ipairs = ipairs;
@@ -17,11 +18,13 @@
 local os_remove = os.remove;
 local os_rename = os.rename;
 local tonumber = tonumber;
+local floor = math.floor;
 local next = next;
 local type = type;
 local t_insert = table.insert;
 local t_concat = table.concat;
 local envloadfile = require"prosody.util.envload".envloadfile;
+local envload = require"prosody.util.envload".envload;
 local serialize = require "prosody.util.serialization".serialize;
 local lfs = require "lfs";
 -- Extract directory separator from package.config (an undocumented string that comes with lua)
@@ -255,6 +258,13 @@
 	return true, pos;
 end
 
+local index_fmt, index_item_size, index_magic;
+if string.packsize then
+	index_fmt = "TT"; -- struct { size_t start, size_t length }
+	index_item_size = string.packsize(index_fmt);
+	index_magic = string.pack(index_fmt, 7767639, 1); -- Magic string: T9 for "prosody", version number
+end
+
 local function list_append(username, host, datastore, data)
 	if not data then return; end
 	if callback(username, host, datastore) == false then return true; end
@@ -267,6 +277,22 @@
 			datastore, msg, where, username or "nil", host or "nil");
 		return ok, msg;
 	end
+	if string.packsize then
+		local offset = type(msg) == "number" and msg or 0;
+		local index_entry = string.pack(index_fmt, offset, #data);
+		if offset == 0 then
+			index_entry = index_magic .. index_entry;
+		end
+		local ok, off = append(username, host, datastore, "lidx", index_entry);
+		off = off or 0;
+		-- If this was the first item, then both the data and index offsets should
+		-- be zero, otherwise there's some kind of mismatch and we should drop the
+		-- index and recreate it from scratch
+		-- TODO Actually rebuild the index in this case?
+		if not ok or (off == 0 and offset ~= 0) or (off ~= 0 and offset == 0) then
+			os_remove(getpath(username, host, datastore, "lidx"));
+		end
+	end
 	return true;
 end
 
@@ -280,6 +306,7 @@
 	for i, item in ipairs(data) do
 		d[i] = "item(" .. serialize(item) .. ");\n";
 	end
+	os_remove(getpath(username, host, datastore, "lidx"));
 	local ok, msg = atomic_store(getpath(username, host, datastore, "list", true), t_concat(d));
 	if not ok then
 		log("error", "Unable to write to %s storage ('%s') for user: %s@%s", datastore, msg, username or "nil", host or "nil");
@@ -294,6 +321,146 @@
 	return true;
 end
 
+local function build_list_index(username, host, datastore, items)
+	log("debug", "Building index for (%s@%s/%s)", username, host, datastore);
+	local filename = getpath(username, host, datastore, "list");
+	local fh, err, errno = io_open(filename);
+	if not fh then
+		return fh, err, errno;
+	end
+	local prev_pos = 0; -- position before reading
+	local last_item_start = 0;
+
+	if items and items[1] then
+		local last_item = items[#items];
+		last_item_start = fh:seek("set", last_item.start + last_item.length);
+	else
+		items = {};
+	end
+
+	for line in fh:lines() do
+		if line:sub(1, 4) == "item" then
+			if prev_pos ~= 0 then
+				t_insert(items, { start = last_item_start; length = prev_pos - last_item_start });
+			end
+			last_item_start = prev_pos
+		end
+		-- seek position is at the start of the next line within each loop iteration
+		-- so we need to collect the "current" position at the end of the previous
+		prev_pos = fh:seek()
+	end
+	if prev_pos ~= 0 then
+		t_insert(items, { start = last_item_start; length = prev_pos - last_item_start });
+	end
+	return items;
+end
+
+local function store_list_index(username, host, datastore, index)
+	local data = { index_magic };
+	for i, v in ipairs(index) do
+		data[i + 1] = string.pack(index_fmt, v.start, v.length);
+	end
+	local filename = getpath(username, host, datastore, "lidx");
+	return atomic_store(filename, t_concat(data));
+end
+
+local index_mt = {
+	__index = function(t, i)
+		if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then
+			return
+		end
+		if i < 0 then
+			return
+		end
+		local fh = t.file;
+		local pos = i * index_item_size;
+		if fh:seek("set", pos) ~= pos then
+			return nil
+		end
+		local data = fh:read(index_item_size);
+		if not data then
+			return nil
+		end
+		local start, length = string.unpack(index_fmt, data);
+		local v = { start = start; length = length };
+		t[i] = v;
+		return v;
+	end;
+	__len = function(t)
+		-- Account for both the header and the fence post error
+		return floor(t.file:seek("end") / index_item_size) - 1;
+	end;
+}
+
+local function get_list_index(username, host, datastore)
+	log("debug", "Loading index for (%s%s/%s)", username, host, datastore);
+	local index_filename = getpath(username, host, datastore, "lidx");
+	local ih = io_open(index_filename);
+	if ih then
+		local magic = ih:read(#index_magic);
+		if magic ~= index_magic then
+			log("warn", "Index %q has wrong version number (got %q, expected %q)", index_filename, magic, index_magic);
+			-- wrong version or something
+			ih:close();
+			ih = nil;
+		end
+	end
+
+	if ih then
+		return setmetatable({ file = ih }, index_mt);
+	end
+
+	local index, err = build_list_index(username, host, datastore);
+	if not index then
+		return index, err
+	end
+
+	-- TODO How to handle failure to store the index?
+	local dontcare = store_list_index(username, host, datastore, index); -- luacheck: ignore 211/dontcare
+	return index;
+end
+
+local function list_load_one(fh, start, length)
+	if fh:seek("set", start) ~= start then
+		return nil
+	end
+	local raw_data = fh:read(length)
+	if not raw_data or #raw_data ~= length then
+		return
+	end
+	local item;
+	local data, err, errno = envload(raw_data, "@list", {
+		item = function(i)
+			item = i;
+		end;
+	});
+	if not data then
+		return data, err, errno
+	end
+	local success, ret = pcall(data);
+	if not success then
+		return success, ret;
+	end
+	return item;
+end
+
+local indexed_list_mt = {
+	__index = function(t, i)
+		if type(i) ~= "number" or i % 1 ~= 0 or i < 1 then
+			return
+		end
+		local ix = t.index[i];
+		if not ix then
+			return
+		end
+		local item = list_load_one(t.file, ix.start, ix.length);
+		return item;
+	end;
+	__len = function(t)
+		return #t.index;
+	end;
+}
+
 local function list_load(username, host, datastore)
 	local items = {};
 	local data, err, errno = envloadfile(getpath(username, host, datastore, "list"), {item = function(i) t_insert(items, i); end});
@@ -314,6 +481,27 @@
 	return items;
 end
 
+local function list_open(username, host, datastore)
+	if not index_magic then
+		log("warn", "Falling back from lazy loading to to loading full list for %s storage for user: %s@%s", datastore, username or "nil", host or "nil");
+		return list_load(username, host, datastore);
+	end
+	local filename = getpath(username, host, datastore, "list");
+	local file, err, errno = io_open(filename);
+	if not file then
+		if errno == ENOENT then
+			return nil;
+		end
+		return file, err, errno;
+	end
+	local index, err = get_list_index(username, host, datastore);
+	if not index then
+		file:close()
+		return index, err;
+	end
+	return setmetatable({ file = file; index = index }, indexed_list_mt);
+end
+
 local type_map = {
 	keyval = "dat";
 	list = "list";
@@ -414,4 +602,7 @@
 	purge = purge;
 	path_decode = decode;
 	path_encode = encode;
+
+	build_list_index = build_list_index;
+	list_open = list_open;
 };