|
1 local queue = require("util.queue"); |
|
2 |
|
3 local lib = { smqueue = {} } |
|
4 |
|
5 local smqueue = lib.smqueue; |
|
6 |
|
7 function smqueue:push(v) |
|
8 self._head = self._head + 1; |
|
9 |
|
10 assert(self._queue:push(v)); |
|
11 end |
|
12 |
|
13 function smqueue:ack(h) |
|
14 if h < self._tail then |
|
15 return nil, "tail" |
|
16 elseif h > self._head then |
|
17 return nil, "head" |
|
18 end |
|
19 |
|
20 local acked = {}; |
|
21 self._tail = h; |
|
22 local expect = self._head - self._tail; |
|
23 while expect < self._queue:count() do |
|
24 local v = self._queue:pop(); |
|
25 if not v then return nil, "pop" end |
|
26 table.insert(acked, v); |
|
27 end |
|
28 return acked |
|
29 end |
|
30 |
|
31 function smqueue:count_unacked() return self._head - self._tail end |
|
32 |
|
33 function smqueue:count_acked() return self._tail end |
|
34 |
|
35 function smqueue:resumable() return self._queue:count() >= (self._head - self._tail) end |
|
36 |
|
37 function smqueue:resume() return self._queue:items() end |
|
38 |
|
39 function smqueue:consume() return self._queue:consume() end |
|
40 |
|
41 local compat_mt = {} |
|
42 |
|
43 function compat_mt:__index(i) |
|
44 if i < self._queue._tail then return nil end |
|
45 return self._queue._queue._items[(i + self._queue._tail) % self._queue._queue.size] |
|
46 end |
|
47 |
|
48 function compat_mt:__len() return self._queue:count_unacked() end |
|
49 |
|
50 function smqueue:table() return setmetatable({ _queue = self }, compat_mt) end |
|
51 |
|
52 local function freeze(q) return { head = q._head; tail = q._tail } end |
|
53 |
|
54 local queue_mt = { __name = "smqueue"; __index = smqueue; __len = smqueue.count_unacked; __freeze = freeze } |
|
55 |
|
56 function lib.new(size) |
|
57 assert(size > 0); |
|
58 return setmetatable({ _head = 0; _tail = 0; _queue = queue.new(size, true) }, queue_mt) |
|
59 end |
|
60 |
|
61 return lib |