Skip to content

Commit

Permalink
feat: add api to close and purge dir
Browse files Browse the repository at this point in the history
  • Loading branch information
zmstone committed Mar 3, 2025
1 parent 5f3c9bf commit 9024d7e
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 54 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/hex_publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Check out
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Publish to Hex.pm
uses: erlangpack/github-action@v1
Expand Down
24 changes: 11 additions & 13 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
name: CI
on: [push, pull_request]
on: [pull_request]

jobs:
build:
runs-on: ubuntu-latest
strategy:
matrix:
otp: [23.2, 24.0.2]
otp:
- '24'
- '25'
steps:
- name: Checkout
uses: actions/checkout@v2
uses: actions/checkout@v4
# Install Erlang
- name: Install Erlang/OTP
uses: erlef/setup-beam@v1
with:
submodules: recursive
- uses: gleam-lang/setup-erlang@v1.1.2
with:
otp-version: ${{ matrix.otp }}
version-type: strict
otp-version: ${{matrix.otp}}
rebar3-version: '3.17.0'
- run: |
make
- name: Archive CT Logs
uses: actions/upload-artifact@v2
with:
name: ct-logs
path: _build/test/
retention-days: 1
2 changes: 1 addition & 1 deletion src/replayq.app.src
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{application, replayq,
[{description, "A Disk Queue for Log Replay in Erlang"},
{vsn, "0.3.5.1"},
{vsn, "0.3.5.2"},
{registered, []},
{applications,
[kernel,
Expand Down
2 changes: 1 addition & 1 deletion src/replayq.appup.src
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
%% -*-: erlang -*-
{"0.3.5.1",
{"0.3.5.2",
[ {<<".*">>, [ {load_module, replayq, brutal_purge, soft_purge, []} ]} ],
[ {<<".*">>, [ {load_module, replayq, brutal_purge, soft_purge, []} ]} ]
}.
9 changes: 8 additions & 1 deletion src/replayq.erl
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-module(replayq).

-export([open/1, close/1]).
-export([open/1, close/1, close_and_purge/1]).
-export([append/2, pop/2, ack/2, ack_sync/2, peek/1, overflow/1]).
-export([count/1, bytes/1, is_empty/1, is_mem_only/1]).
%% exported for troubleshooting
Expand Down Expand Up @@ -130,6 +130,13 @@ close(#{w_cur := W_Cur, committer := Pid} = Q) ->
do_close(#{fd := ?NO_FD}) -> ok;
do_close(#{fd := Fd}) -> file:close(Fd).

%% @doc Close the queue and purge all the files on disk.
close_and_purge(#{config := mem_only}) ->
ok;
close_and_purge(#{config := #{dir := Dir}} = Q) ->
close(Q),
file:del_dir_r(Dir).

%% In case of offload mode, dump the unacked (and un-popped) on disk
%% before close. this serves as a best-effort data loss protection
maybe_dump_back_to_disk(#{config := Config} = Q) ->
Expand Down
4 changes: 1 addition & 3 deletions test/prop_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ prop_op_list(IsOffload) ->
proper_types:list(proper_types:oneof(Union)).

delete_dir(Dir) ->
lists:foreach(fun(F) -> ok = file:delete(filename:join([Dir, F])) end,
filelib:wildcard("*", Dir)),
ok = file:del_dir(Dir).
ok = file:del_dir_r(Dir).

compare_stats(MQ, DQ) ->
?assertEqual(replayq:count(MQ), replayq:count(DQ)),
Expand Down
51 changes: 17 additions & 34 deletions test/replayq_tests.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ init_test() ->
Q2 = replayq:open(Config),
?assertEqual(0, replayq:count(Q2)),
?assertEqual(0, replayq:bytes(Q2)),
ok = replayq:close(Q2),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q2).

reopen_test() ->
Dir = ?DIR,
Expand All @@ -28,7 +27,7 @@ reopen_test() ->
Q2 = replayq:open(Config),
?assertEqual(2, replayq:count(Q2)),
?assertEqual(10, replayq:bytes(Q2)),
ok = cleanup(Dir).
replayq:close_and_purge(Q2).

%% when popping from in-mem segment, the segment size stats may overflow
%% but not consuming as much memory
Expand All @@ -44,8 +43,7 @@ offload_in_mem_seg_overflow_test() ->
Q3 = replayq:append(Q2, [<<"item3">>]), %% still in mem
?assertMatch([], list_segments(Dir)), % not offloading to disk yet
?assertMatch(#{w_cur := #{fd := no_fd}}, Q3),
ok = replayq:close(Q3),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q3).

offload_file_test() ->
Dir = ?DIR,
Expand All @@ -71,8 +69,7 @@ offload_file_test() ->
{Q7, AckRef3, Items3} = replayq:pop(Q6, #{}),
?assertEqual([<<"item5">>], Items3),
ok = replayq:ack_sync(Q7, AckRef3),
ok = replayq:close(Q7),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q7).

offload_reopen_test() ->
Dir = ?DIR,
Expand All @@ -95,7 +92,7 @@ offload_reopen_test() ->
Q5 = replayq:open(Config),
?assertEqual(0, replayq:count(Q5)),
?assertEqual(0, replayq:bytes(Q5)),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q5).

reopen_v0_test() ->
Dir = ?DIR,
Expand All @@ -110,8 +107,7 @@ reopen_v0_test() ->
%% do not expect item3 because it was appended to a corrupted tail
?assertEqual([<<"item1">>, <<"item2">>], Items),
?assert(replayq:is_empty(Q4)),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

append_pop_disk_default_marshaller_test() ->
Dir = ?DIR,
Expand Down Expand Up @@ -152,8 +148,7 @@ test_append_pop_disk(#{dir := Dir} = Config) ->
?assertEqual(empty, replayq:peek(Q6)),
?assertEqual({Q6, nothing_to_ack, []}, replayq:pop(Q6, #{})),
ok = replayq:ack(Q6, nothing_to_ack),
ok = replayq:close(Q6),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q6).

append_pop_mem_default_marshaller_test_test() ->
Config = #{mem_only => true},
Expand Down Expand Up @@ -208,8 +203,7 @@ append_max_total_bytes_disk_test() ->
end,
max_total_bytes => 10
},
test_append_max_total_bytes(Config),
ok = cleanup(Dir).
test_append_max_total_bytes(Config).

test_append_max_total_bytes(Config) ->
Q0 = replayq:open(Config),
Expand All @@ -218,13 +212,12 @@ test_append_max_total_bytes(Config) ->
?assertEqual(10, replayq:overflow(Q1)),
{Q2, _AckRef, _Items} = replayq:pop(Q1, #{count_limit => 2}),
?assertEqual(0, replayq:overflow(Q2)),
ok = replayq:close(Q2).
ok = replayq:close_and_purge(Q2).

pop_limit_disk_test() ->
Dir = ?DIR,
Config = #{dir => Dir, seg_bytes => 1},
ok = test_pop_limit(Config),
ok = cleanup(Dir).
ok = test_pop_limit(Config).

pop_limit_mem_test() ->
Config = #{mem_only => true},
Expand All @@ -240,7 +233,7 @@ test_pop_limit(Config) ->
{Q4, _AckRef2, Items2} = replayq:pop(Q3, #{count_limit => 10,
bytes_limit => 1}),
?assertEqual([<<"item2">>], Items2),
ok = replayq:close(Q4).
ok = replayq:close_and_purge(Q4).

commit_in_the_middle_test() ->
Dir = ?DIR,
Expand All @@ -261,8 +254,7 @@ commit_in_the_middle_test() ->
?assertEqual([<<"item2">>], Items2),
?assertEqual(1, replayq:count(Q5)),
?assertEqual(5, replayq:bytes(Q5)),
ok = replayq:close(Q5),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q5).

first_segment_corrupted_test() ->
Dir = ?DIR,
Expand All @@ -282,8 +274,7 @@ first_segment_corrupted_test() ->
{Q4, _AckRef, Items} = replayq:pop(Q3, #{count_limit => 3}),
?assertEqual([Item], Items),
?assert(replayq:is_empty(Q4)),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

second_segment_corrupted_test() ->
Dir = ?DIR,
Expand All @@ -304,8 +295,7 @@ second_segment_corrupted_test() ->
{Q5, _AckRef, Items} = replayq:pop(Q4, #{count_limit => 3}),
?assertEqual([Item, Item], Items),
?assert(replayq:is_empty(Q5)),
ok = replayq:close(Q5),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q5).

last_segment_corrupted_test() ->
Dir = ?DIR,
Expand Down Expand Up @@ -334,7 +324,7 @@ last_segment_corrupted_test() ->
Q8 = replayq:open(Config),
?assert(replayq:is_empty(Q8)),
replayq:ack(Q7, AckRef),
ok = cleanup(Dir).
replayq:close_and_purge(Q7).

corrupted_segment_test_() ->
[{"ramdom", fun() -> test_corrupted_segment(<<"foo">>) end},
Expand All @@ -360,7 +350,7 @@ test_corrupted_segment(BadBytes) ->
?assertEqual([<<"item1">>, Item2], Items),
?assert(replayq:is_empty(Q4)),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

comitter_crash_test() ->
Dir = ?DIR,
Expand Down Expand Up @@ -407,17 +397,10 @@ corrupted_commit_test() ->
ok = file:write_file(CommitFile, <<"bad-erlang-term">>),
%% assert no crash
Q4 = replayq:open(Config),
ok = replayq:close(Q4),
ok = cleanup(Dir).
ok = replayq:close_and_purge(Q4).

%% helpers ===========================================================

cleanup(Dir) ->
Files = list_segments(Dir),
ok = lists:foreach(fun(F) -> ok = file:delete(filename:join(Dir, F)) end, Files),
_ = file:delete(filename:join(Dir, "COMMIT")),
ok = file:del_dir(Dir).

list_segments(Dir) -> filelib:wildcard("*."?SUFFIX, Dir).

data_dir() -> "./test-data".
Expand Down

0 comments on commit 9024d7e

Please sign in to comment.