--- a/mod_storage_s3/mod_storage_s3.lua Sat Oct 14 21:41:01 2023 +0200
+++ b/mod_storage_s3/mod_storage_s3.lua Sat Oct 14 21:44:14 2023 +0200
@@ -1,8 +1,10 @@
local http = require "prosody.net.http";
local array = require "prosody.util.array";
local async = require "prosody.util.async";
+local dt = require "prosody.util.datetime";
local hashes = require "prosody.util.hashes";
local httputil = require "prosody.util.http";
+local uuid = require "prosody.util.uuid";
local it = require "prosody.util.iterators";
local jid = require "prosody.util.jid";
local json = require "prosody.util.json";
@@ -10,6 +12,7 @@
local xml = require "prosody.util.xml";
local url = require "socket.url";
+local new_uuid = uuid.v7 or uuid.generate;
local hmac_sha256 = hashes.hmac_sha256;
local sha256 = hashes.sha256;
@@ -114,6 +117,9 @@
-- coerce result back into Prosody data type
local function on_result(response)
+ if response.code >= 400 then
+ error(response.body);
+ end
local content_type = response.headers["content-type"];
if content_type == "application/json" then
return json.decode(response.body);
@@ -172,4 +178,117 @@
end
end
+local archive = {};
+driver.archive = { __index = archive };
+
+archive.caps = {
+};
+
+function archive:_path(username, date, when, with, key)
+ return url.build_path({
+ is_absolute = true;
+ bucket;
+ jid.escape(module.host);
+ jid.escape(self.store);
+ jid.escape(username);
+ jid.escape(jid.prep(with));
+ date or dt.date(when);
+ key;
+ })
+end
+
+
+-- PUT .../with/when/id
+function archive:append(username, key, value, when, with)
+ local wrapper = st.stanza("wrapper");
+ -- Minio had trouble with timestamps, probably the ':' characters, in paths.
+ wrapper:tag("delay", { xmlns = "urn:xmpp:delay"; stamp = dt.datetime(when) }):up();
+ wrapper:add_direct_child(value);
+ key = key or new_uuid();
+ return async.wait_for(new_request("PUT", self:_path(username, nil, when, with, key), nil, wrapper):next(function(r)
+ if r.code == 200 then
+ return key;
+ else
+ error(r.body);
+ end
+ end));
+end
+
+function archive:find(username, query)
+ local bucket_path = url.build_path({ is_absolute = true; bucket; is_directory = true });
+ local prefix = { jid.escape(module.host); jid.escape(self.store); is_directory = true };
+ table.insert(prefix, jid.escape(username or "@"));
+ if query["with"] then
+ table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24));
+ if query["start"] and query["end"] and dt.date(query["start"]) == dt.date(query["end"]) then
+ table.insert(prefix, sha256(jid.prep(query["with"]), true):sub(1,24));
+ end
+ end
+
+ prefix = url.build_path(prefix);
+ local list_result, err = async.wait_for(new_request("GET", bucket_path, {
+ prefix = prefix;
+ ["max-keys"] = query["max"] and tostring(query["max"]);
+ }));
+ if err or list_result.code ~= 200 then
+ return nil, err;
+ end
+ local list_bucket_result = xml.parse(list_result.body);
+ if list_bucket_result:get_child_text("IsTruncated") == "true" then
+ local max_keys = list_bucket_result:get_child_text("MaxKeys");
+ module:log("warn", "Paging truncated results not implemented, max %s %s returned", max_keys, self.store);
+ end
+ local keys = array();
+ local iterwrap = function(...)
+ return ...;
+ end
+ if query["reverse"] then
+ query["before"], query["after"] = query["after"], query["before"];
+ iterwrap = it.reverse;
+ end
+ local found = not query["after"];
+ for content in iterwrap(list_bucket_result:childtags("Contents")) do
+ local key = url.parse_path(content:get_child_text("Key"));
+ if found and query["before"] == key[6] then
+ break
+ end
+ if (not query["with"] or query["with"] == jid.unescape(key[5]))
+ and (not query["start"] or dt.date(query["start"]) >= key[6])
+ and (not query["end"] or dt.date(query["end"]) <= key[6])
+ and found then
+ keys:push({ key = key[6]; date = key[5]; with = jid.unescape(key[4]) });
+ end
+ if not found and key[6] == query["after"] then
+ found = not found
+ end
+ end
+ local i = 0;
+ return function()
+ i = i + 1;
+ local item = keys[i];
+ if item == nil then
+ return nil;
+ end
+ -- luacheck: ignore 431/err
+ local value, err = async.wait_for(new_request("GET", self:_path(username or "@", item.date, nil, item.with, item.key)):next(on_result));
+ if not value then
+ module:log("error", "%s", err);
+ return nil;
+ end
+ local delay = value:get_child("delay", "urn:xmpp:delay");
+
+ return item.key, value.tags[2], dt.parse(delay.attr.stamp), item.with;
+ end
+end
+
+function archive:users()
+ return it.unique(keyval.users(self));
+end
+
+--[[ TODO
+function archive:delete(username, query)
+ return nil, "not-implemented";
+end
+--]]
+
module:provides("storage", driver);