Skip to content

Commit

Permalink
Merge pull request #27 from emqx/250304-add-close-and-purge
Browse files Browse the repository at this point in the history
250304 add `replayq:close_and_purge/1`
  • Loading branch information
zmstone authored Mar 4, 2025
2 parents d0de03e + 36b4bb3 commit 05d0f00
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 76 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/hex_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Publish to Hex.pm
uses: erlangpack/github-action@v1
Expand Down
15 changes: 6 additions & 9 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
name: CI
on: [push, pull_request]
on: [pull_request]

jobs:
build:
runs-on: ubuntu-22.04
strategy:
matrix:
otp: ['26.2.5', '27.1']
rebar3: ['3.22.1']
otp: [['26.2.5', '3.22.1'], ['27.1', '3.22.1']]
steps:
- name: Checkout
uses: actions/checkout@v4.2.1
with:
submodules: recursive
- name: Setup Erlang/OTP
uses: erlef/setup-beam@v1.17.5
uses: erlef/setup-beam@v1
with:
otp-version: ${{ matrix.otp }}
rebar3-version: ${{ matrix.rebar3 }}
otp-version: ${{ matrix.otp[0] }}
rebar3-version: ${{ matrix.otp[1] }}
- run: |
make fmt-check
make
- name: Archive CT Logs
uses: actions/upload-artifact@v4.4.3
with:
name: ct-logs-${{ matrix.otp }}
name: ct-logs-${{ matrix.otp[0] }}
path: _build/test/
retention-days: 1
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
- 0.4.1
- Port `replayq:close_and_purge/1` from `dev-0.3.5` to master

- 0.4.0
- Add ETS based in-memory queue/segment implementation.
8 changes: 7 additions & 1 deletion rebar.config
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@
{cover_export_enabled, true}.
{coveralls_coverdata, "_build/test/cover/eunit.coverdata"}.
{coveralls_service_name, "travis-ci"}.
{profiles, [{test, [{deps, [proper]}]}]}.
{profiles, [
{test, [
{deps, [
{proper, {git, "https://github.com/zmstone/proper", {tag, "1.4.1"}}}
]}
]}
]}.
{project_plugins, [
rebar3_hex,
coveralls,
Expand Down
5 changes: 0 additions & 5 deletions src/replayq.appup.src

This file was deleted.

33 changes: 30 additions & 3 deletions src/replayq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
%%--------------------------------------------------------------------
-module(replayq).

-export([open/1, close/1]).
-export([open/1, close/1, close_and_purge/1]).
-export([append/2, pop/2, ack/2, ack_sync/2, peek/1, overflow/1]).
-export([count/1, bytes/1, is_empty/1, is_mem_only/1]).
%% exported for troubleshooting
Expand Down Expand Up @@ -179,8 +179,35 @@ close(#{w_cur := W_Cur, committer := Pid, in_mem := InMem, mem_queue_module := M
ok = replayq_registry:deregister_committer(self()),
Res.

do_close(#{fd := ?NO_FD}) -> ok;
do_close(#{fd := Fd}) -> file:close(Fd).
do_close(#{fd := ?NO_FD}) ->
ok;
do_close(#{fd := Fd}) ->
case file:close(Fd) of
ok ->
ok;
{error, einval} ->
ok;
{error, Reason} ->
{error, Reason}
end.

%% @doc Close the queue and purge all the files on disk.
close_and_purge(#{config := mem_only} = Q) ->
close(Q);
close_and_purge(#{config := #{dir := Dir}} = Q) ->
close(Q),
del_dir_r(Dir).

-if(?OTP_RELEASE >= 24).
del_dir_r(Dir) ->
ok = file:del_dir_r(Dir).
-else.
del_dir_r(Dir) ->
Files = list_segments(Dir),
ok = lists:foreach(fun(F) -> ok = file:delete(filename:join(Dir, F)) end, Files),
_ = file:delete(filename:join(Dir, "COMMIT")),
ok = file:del_dir(Dir).
-endif.

%% In case of offload mode, dump the unacked (and un-popped) on disk
%% before close. this serves as a best-effort data loss protection
Expand Down
8 changes: 7 additions & 1 deletion src/replayq_mem_ets_exclusive.erl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ new(_) ->
%% @doc Peek all items from the queue.
-spec peek_all(queue()) -> [term()].
peek_all(#{ets_tab := Tab}) ->
lists:map(fun({_Id, Item}) -> Item end, ets:tab2list(Tab)).
try ets:tab2list(Tab) of
L ->
lists:map(fun({_Id, Item}) -> Item end, L)
catch
error:badarg ->
[]
end.

%% @doc Peek the front item of the queue.
-spec peek(queue()) -> empty | {value, term()}.
Expand Down
4 changes: 2 additions & 2 deletions src/replayq_registry.erl
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ handle_call(#register_committer{dir = Dir, pid = Pid}, _From, State0) ->
#{Dir := SomePid} ->
case is_process_alive(SomePid) of
true ->
{reply, {error, already_registered}, State0};
{reply, {error, {already_registered, Dir}}, State0};
false ->
{_, State1} = pop_committer(State0, SomePid),
State = do_register_committer(State1, Dir, Pid),
Expand All @@ -107,7 +107,7 @@ handle_call(#register_slot_owner{pid = Pid}, _From, #{slot_owners := Owners0} =
Ref = erlang:monitor(process, Pid),
{reply, ok, State0#{slot_owners := Owners0#{Pid => Ref}}};
true ->
{reply, {error, already_registered}, State0}
{reply, {error, {already_registered, Pid}}, State0}
end;
handle_call(#deregister_slot_owner{pid = Pid}, _From, #{slot_owners := Owners0} = State0) ->
case maps:get(Pid, Owners0, undefined) of
Expand Down
71 changes: 26 additions & 45 deletions test/replayq_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@ t_init(CtConfig) ->
Q2 = open(CtConfig, Config),
?assertEqual(0, replayq:count(Q2)),
?assertEqual(0, replayq:bytes(Q2)),
ok = replayq:close(Q2),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q2).

t_reopen(CtConfig) ->
Dir = ?DIR,
Expand All @@ -82,7 +81,9 @@ t_reopen(CtConfig) ->
Q2 = open(CtConfig, Config),
?assertEqual(2, replayq:count(Q2)),
?assertEqual(10, replayq:bytes(Q2)),
ok = cleanup(Dir).
ok = replayq:close(Q2),
%% ok to close after closed already
ok = replayq:close_and_purge(Q2).

t_volatile(CtConfig) ->
Dir = ?DIR,
Expand All @@ -95,8 +96,7 @@ t_volatile(CtConfig) ->
?assertEqual(0, replayq:bytes(Q2)),
{Q3, _QAckRef, Items} = replayq:pop(Q2, #{count_limit => 10}),
?assertEqual([], Items),
ok = replayq:close(Q3),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q3).

%% when popping from in-mem segment, the segment size stats may overflow
%% but not consuming as much memory
Expand All @@ -117,7 +117,7 @@ t_offload_in_mem_seg_overflow(CtConfig) ->
?assertMatch([], list_segments(Dir)),
?assertMatch(#{w_cur := #{fd := no_fd}}, Q3),
ok = replayq:close(Q3),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q3).

t_offload_file(CtConfig) ->
Dir = ?DIR,
Expand Down Expand Up @@ -149,7 +149,7 @@ t_offload_file(CtConfig) ->
?assertEqual([<<"item5">>], Items3),
ok = replayq:ack_sync(Q7, AckRef3),
ok = replayq:close(Q7),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q7).

t_offload_reopen(CtConfig) ->
Dir = ?DIR,
Expand All @@ -173,7 +173,7 @@ t_offload_reopen(CtConfig) ->
Q5 = open(CtConfig, Config),
?assertEqual(0, replayq:count(Q5)),
?assertEqual(0, replayq:bytes(Q5)),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q5).

t_reopen_v0(CtConfig) ->
Dir = ?DIR,
Expand All @@ -191,8 +191,7 @@ t_reopen_v0(CtConfig) ->
%% do not expect item3 because it was appended to a corrupted tail
?assertEqual([<<"item1">>, <<"item2">>], Items),
?assert(replayq:is_empty(Q4)),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

t_append_pop_disk_default_marshaller(CtConfig) ->
Dir = ?DIR,
Expand All @@ -212,7 +211,7 @@ t_append_pop_disk_my_marshaller(CtConfig) ->
},
test_append_pop_disk(CtConfig, Config).

test_append_pop_disk(CtConfig, #{dir := Dir} = Config) ->
test_append_pop_disk(CtConfig, Config) ->
Q0 = open(CtConfig, Config),
Q1 = replayq:append(Q0, [<<"item1">>, <<"item2">>]),
Q2 = replayq:append(Q1, [<<"item3">>]),
Expand All @@ -239,8 +238,7 @@ test_append_pop_disk(CtConfig, #{dir := Dir} = Config) ->
?assertEqual(empty, replayq:peek(Q6)),
?assertEqual({Q6, nothing_to_ack, []}, replayq:pop(Q6, #{})),
ok = replayq:ack(Q6, nothing_to_ack),
ok = replayq:close(Q6),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q6).

t_append_pop_mem_default_marshaller(CtConfig) ->
Config = #{mem_only => true},
Expand Down Expand Up @@ -288,8 +286,7 @@ t_append_max_total_bytes_mem(CtConfig) ->
end,
max_total_bytes => 10
},
test_append_max_total_bytes(CtConfig, Config),
ok.
test_append_max_total_bytes(CtConfig, Config).

t_append_max_total_bytes_disk(CtConfig) ->
Dir = ?DIR,
Expand All @@ -303,8 +300,7 @@ t_append_max_total_bytes_disk(CtConfig) ->
end,
max_total_bytes => 10
},
test_append_max_total_bytes(CtConfig, Config),
ok = cleanup(Dir).
test_append_max_total_bytes(CtConfig, Config).

test_append_max_total_bytes(CtConfig, Config) ->
Q0 = open(CtConfig, Config),
Expand All @@ -313,13 +309,12 @@ test_append_max_total_bytes(CtConfig, Config) ->
?assertEqual(10, replayq:overflow(Q1)),
{Q2, _AckRef, _Items} = replayq:pop(Q1, #{count_limit => 2}),
?assertEqual(0, replayq:overflow(Q2)),
ok = replayq:close(Q2).
ok = replayq:close_and_purge(Q2).

t_pop_limit_disk(CtConfig) ->
Dir = ?DIR,
Config = #{dir => Dir, seg_bytes => 1},
ok = test_pop_limit(CtConfig, Config),
ok = cleanup(Dir).
ok = test_pop_limit(CtConfig, Config).

t_pop_limit_mem(CtConfig) ->
Config = #{mem_only => true},
Expand All @@ -339,7 +334,7 @@ test_pop_limit(CtConfig, Config) ->
bytes_limit => 1
}),
?assertEqual([<<"item2">>], Items2),
ok = replayq:close(Q4).
ok = replayq:close_and_purge(Q4).

t_commit_in_the_middle(CtConfig) ->
Dir = ?DIR,
Expand All @@ -360,8 +355,7 @@ t_commit_in_the_middle(CtConfig) ->
?assertEqual([<<"item2">>], Items2),
?assertEqual(1, replayq:count(Q5)),
?assertEqual(5, replayq:bytes(Q5)),
ok = replayq:close(Q5),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q5).

t_first_segment_corrupted(CtConfig) ->
Dir = ?DIR,
Expand All @@ -383,8 +377,7 @@ t_first_segment_corrupted(CtConfig) ->
{Q4, _AckRef, Items} = replayq:pop(Q3, #{count_limit => 3}),
?assertEqual([Item], Items),
?assert(replayq:is_empty(Q4)),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

t_second_segment_corrupted(CtConfig) ->
Dir = ?DIR,
Expand All @@ -408,8 +401,7 @@ t_second_segment_corrupted(CtConfig) ->
{Q5, _AckRef, Items} = replayq:pop(Q4, #{count_limit => 3}),
?assertEqual([Item, Item], Items),
?assert(replayq:is_empty(Q5)),
ok = replayq:close(Q5),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q5).

t_last_segment_corrupted(CtConfig) ->
Dir = ?DIR,
Expand Down Expand Up @@ -441,7 +433,7 @@ t_last_segment_corrupted(CtConfig) ->
Q8 = open(CtConfig, Config),
?assert(replayq:is_empty(Q8)),
replayq:ack(Q7, AckRef),
ok = cleanup(Dir).
replayq:close_and_purge(Q7).

t_corrupted_segment(CtConfig) ->
?assert(test_corrupted_segment(CtConfig, <<"foo">>)),
Expand All @@ -467,8 +459,7 @@ test_corrupted_segment(CtConfig, BadBytes) ->
%% do not expect item3 because it was appended to a corrupted tail
?assertEqual([<<"item1">>, Item2], Items),
?assert(replayq:is_empty(Q4)),
ok = replayq:close(Q4),
ok = cleanup(Dir),
ok = replayq:close_and_purge(Q4),
true.

t_comitter_crash(CtConfig) ->
Expand Down Expand Up @@ -501,7 +492,8 @@ t_same_directory_committer_clash(CtConfig) ->
try open(CtConfig, Config) of
Q2 -> error({"should not allow opening a second replayq", Q2})
catch
error:{badmatch, {error, already_registered}} ->
error:{badmatch, {error, {already_registered, Dir1}}} ->
?assertEqual(iolist_to_binary(Dir), Dir1),
ok
end,
replayq:close(Q1),
Expand Down Expand Up @@ -623,8 +615,7 @@ t_corrupted_commit(CtConfig) ->
ok = file:write_file(CommitFile, <<"bad-erlang-term">>),
%% assert no crash
Q4 = open(CtConfig, Config),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

t_pop_bytes_mem(CtConfig) ->
Config = #{
Expand All @@ -645,11 +636,8 @@ t_pop_bytes_disk(CtConfig) ->
sizer => fun(Item) -> size(Item) end
},
ok = test_pop_bytes(CtConfig, Config, default),
ok = cleanup(Dir),
ok = test_pop_bytes(CtConfig, Config, at_most),
ok = cleanup(Dir),
ok = test_pop_bytes(CtConfig, Config, at_least),
ok = cleanup(Dir).
ok = test_pop_bytes(CtConfig, Config, at_least).

test_pop_bytes(CtConfig, Config, BytesMode) ->
Q0 = open(CtConfig, Config),
Expand All @@ -675,8 +663,7 @@ test_pop_bytes(CtConfig, Config, BytesMode) ->
spop(Q1, #{count_limit => 10, bytes_limit => {at_least, ItemSize + 1}})
)
end,
ok = replayq:close(Q0),
ok.
ok = replayq:close_and_purge(Q0).

owner_down_cause_purge(CtConfig) ->
{Owner, Ref} = spawn_monitor(fun() ->
Expand Down Expand Up @@ -711,12 +698,6 @@ spop(Q, Opts) ->
{_Q1, _AckRef, Items} = replayq:pop(Q, Opts),
Items.

cleanup(Dir) ->
Files = list_segments(Dir),
ok = lists:foreach(fun(F) -> ok = file:delete(filename:join(Dir, F)) end, Files),
_ = file:delete(filename:join(Dir, "COMMIT")),
ok = file:del_dir(Dir).

list_segments(Dir) -> filelib:wildcard("*." ?SUFFIX, Dir).

data_dir() -> "./test-data".
Expand Down
Loading

0 comments on commit 05d0f00

Please sign in to comment.