From 38e075a84af0090b398669a1176bb32cfb001edb Mon Sep 17 00:00:00 2001 From: zmstone Date: Mon, 3 Mar 2025 15:54:43 +0100 Subject: [PATCH] feat: add api to close and purge dir --- .github/workflows/hex_publish.yml | 2 +- .github/workflows/main.yml | 19 +++++++----- src/replayq.app.src | 2 +- src/replayq.appup.src | 2 +- src/replayq.erl | 20 +++++++++++- test/prop_tests.erl | 4 +-- test/replayq_tests.erl | 51 +++++++++++-------------------- 7 files changed, 51 insertions(+), 49 deletions(-) diff --git a/.github/workflows/hex_publish.yml b/.github/workflows/hex_publish.yml index 02ac9d1..4c8c2c6 100644 --- a/.github/workflows/hex_publish.yml +++ b/.github/workflows/hex_publish.yml @@ -8,7 +8,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Publish to Hex.pm uses: erlangpack/github-action@v1 diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index c44bc0c..1311a1d 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -1,25 +1,28 @@ name: CI -on: [push, pull_request] +on: [pull_request] jobs: build: - runs-on: ubuntu-latest + runs-on: ubuntu-22.04 strategy: matrix: - otp: [23.2, 24.0.2] + otp: [['23', '3.20.0'], ['24', '3.21.0'], ['25', '3.23.0']] steps: - name: Checkout - uses: actions/checkout@v2 + uses: actions/checkout@v4.2.1 with: submodules: recursive - - uses: gleam-lang/setup-erlang@v1.1.2 + - name: Setup Erlang/OTP + uses: erlef/setup-beam@v1.17.5 with: - otp-version: ${{ matrix.otp }} + version-type: strict + otp-version: ${{ matrix.otp[0] }} + rebar3-version: ${{ matrix.otp[1] }} - run: | make - name: Archive CT Logs - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v4.4.3 with: - name: ct-logs + name: ct-logs-${{ matrix.otp[0] }} path: _build/test/ retention-days: 1 diff --git a/src/replayq.app.src b/src/replayq.app.src index 11ab5a9..0b341b9 100644 --- a/src/replayq.app.src +++ b/src/replayq.app.src @@ -1,6 +1,6 @@ {application, replayq, [{description, "A Disk Queue for Log Replay in Erlang"}, - {vsn, "0.3.5.1"}, + {vsn, "0.3.5.2"}, {registered, []}, {applications, [kernel, diff --git a/src/replayq.appup.src b/src/replayq.appup.src index 43c17eb..960d2ea 100644 --- a/src/replayq.appup.src +++ b/src/replayq.appup.src @@ -1,5 +1,5 @@ %% -*-: erlang -*- -{"0.3.5.1", +{"0.3.5.2", [ {<<".*">>, [ {load_module, replayq, brutal_purge, soft_purge, []} ]} ], [ {<<".*">>, [ {load_module, replayq, brutal_purge, soft_purge, []} ]} ] }. diff --git a/src/replayq.erl b/src/replayq.erl index 8c921aa..220a350 100644 --- a/src/replayq.erl +++ b/src/replayq.erl @@ -1,6 +1,6 @@ -module(replayq). --export([open/1, close/1]). +-export([open/1, close/1, close_and_purge/1]). -export([append/2, pop/2, ack/2, ack_sync/2, peek/1, overflow/1]). -export([count/1, bytes/1, is_empty/1, is_mem_only/1]). %% exported for troubleshooting @@ -130,6 +130,24 @@ close(#{w_cur := W_Cur, committer := Pid} = Q) -> do_close(#{fd := ?NO_FD}) -> ok; do_close(#{fd := Fd}) -> file:close(Fd). +%% @doc Close the queue and purge all the files on disk. +close_and_purge(#{config := mem_only}) -> + ok; +close_and_purge(#{config := #{dir := Dir}} = Q) -> + close(Q), + del_dir_r(Dir). + +-if(?OTP_RELEASE >= 24). +del_dir_r(Dir) -> + ok = file:del_dir_r(Dir). +-else. +del_dir_r(Dir) -> + Files = list_segments(Dir), + ok = lists:foreach(fun(F) -> ok = file:delete(filename:join(Dir, F)) end, Files), + _ = file:delete(filename:join(Dir, "COMMIT")), + ok = file:del_dir(Dir). +-end. + %% In case of offload mode, dump the unacked (and un-popped) on disk %% before close. this serves as a best-effort data loss protection maybe_dump_back_to_disk(#{config := Config} = Q) -> diff --git a/test/prop_tests.erl b/test/prop_tests.erl index 9889f7a..3b885e1 100644 --- a/test/prop_tests.erl +++ b/test/prop_tests.erl @@ -74,9 +74,7 @@ prop_op_list(IsOffload) -> proper_types:list(proper_types:oneof(Union)). delete_dir(Dir) -> - lists:foreach(fun(F) -> ok = file:delete(filename:join([Dir, F])) end, - filelib:wildcard("*", Dir)), - ok = file:del_dir(Dir). + ok = file:del_dir_r(Dir). compare_stats(MQ, DQ) -> ?assertEqual(replayq:count(MQ), replayq:count(DQ)), diff --git a/test/replayq_tests.erl b/test/replayq_tests.erl index 65cbe46..fd32041 100644 --- a/test/replayq_tests.erl +++ b/test/replayq_tests.erl @@ -16,8 +16,7 @@ init_test() -> Q2 = replayq:open(Config), ?assertEqual(0, replayq:count(Q2)), ?assertEqual(0, replayq:bytes(Q2)), - ok = replayq:close(Q2), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q2). reopen_test() -> Dir = ?DIR, @@ -28,7 +27,7 @@ reopen_test() -> Q2 = replayq:open(Config), ?assertEqual(2, replayq:count(Q2)), ?assertEqual(10, replayq:bytes(Q2)), - ok = cleanup(Dir). + replayq:close_and_purge(Q2). %% when popping from in-mem segment, the segment size stats may overflow %% but not consuming as much memory @@ -44,8 +43,7 @@ offload_in_mem_seg_overflow_test() -> Q3 = replayq:append(Q2, [<<"item3">>]), %% still in mem ?assertMatch([], list_segments(Dir)), % not offloading to disk yet ?assertMatch(#{w_cur := #{fd := no_fd}}, Q3), - ok = replayq:close(Q3), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q3). offload_file_test() -> Dir = ?DIR, @@ -71,8 +69,7 @@ offload_file_test() -> {Q7, AckRef3, Items3} = replayq:pop(Q6, #{}), ?assertEqual([<<"item5">>], Items3), ok = replayq:ack_sync(Q7, AckRef3), - ok = replayq:close(Q7), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q7). offload_reopen_test() -> Dir = ?DIR, @@ -95,7 +92,7 @@ offload_reopen_test() -> Q5 = replayq:open(Config), ?assertEqual(0, replayq:count(Q5)), ?assertEqual(0, replayq:bytes(Q5)), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q5). reopen_v0_test() -> Dir = ?DIR, @@ -110,8 +107,7 @@ reopen_v0_test() -> %% do not expect item3 because it was appended to a corrupted tail ?assertEqual([<<"item1">>, <<"item2">>], Items), ?assert(replayq:is_empty(Q4)), - ok = replayq:close(Q4), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q4). append_pop_disk_default_marshaller_test() -> Dir = ?DIR, @@ -152,8 +148,7 @@ test_append_pop_disk(#{dir := Dir} = Config) -> ?assertEqual(empty, replayq:peek(Q6)), ?assertEqual({Q6, nothing_to_ack, []}, replayq:pop(Q6, #{})), ok = replayq:ack(Q6, nothing_to_ack), - ok = replayq:close(Q6), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q6). append_pop_mem_default_marshaller_test_test() -> Config = #{mem_only => true}, @@ -208,8 +203,7 @@ append_max_total_bytes_disk_test() -> end, max_total_bytes => 10 }, - test_append_max_total_bytes(Config), - ok = cleanup(Dir). + test_append_max_total_bytes(Config). test_append_max_total_bytes(Config) -> Q0 = replayq:open(Config), @@ -218,13 +212,12 @@ test_append_max_total_bytes(Config) -> ?assertEqual(10, replayq:overflow(Q1)), {Q2, _AckRef, _Items} = replayq:pop(Q1, #{count_limit => 2}), ?assertEqual(0, replayq:overflow(Q2)), - ok = replayq:close(Q2). + ok = replayq:close_and_purge(Q2). pop_limit_disk_test() -> Dir = ?DIR, Config = #{dir => Dir, seg_bytes => 1}, - ok = test_pop_limit(Config), - ok = cleanup(Dir). + ok = test_pop_limit(Config). pop_limit_mem_test() -> Config = #{mem_only => true}, @@ -240,7 +233,7 @@ test_pop_limit(Config) -> {Q4, _AckRef2, Items2} = replayq:pop(Q3, #{count_limit => 10, bytes_limit => 1}), ?assertEqual([<<"item2">>], Items2), - ok = replayq:close(Q4). + ok = replayq:close_and_purge(Q4). commit_in_the_middle_test() -> Dir = ?DIR, @@ -261,8 +254,7 @@ commit_in_the_middle_test() -> ?assertEqual([<<"item2">>], Items2), ?assertEqual(1, replayq:count(Q5)), ?assertEqual(5, replayq:bytes(Q5)), - ok = replayq:close(Q5), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q5). first_segment_corrupted_test() -> Dir = ?DIR, @@ -282,8 +274,7 @@ first_segment_corrupted_test() -> {Q4, _AckRef, Items} = replayq:pop(Q3, #{count_limit => 3}), ?assertEqual([Item], Items), ?assert(replayq:is_empty(Q4)), - ok = replayq:close(Q4), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q4). second_segment_corrupted_test() -> Dir = ?DIR, @@ -304,8 +295,7 @@ second_segment_corrupted_test() -> {Q5, _AckRef, Items} = replayq:pop(Q4, #{count_limit => 3}), ?assertEqual([Item, Item], Items), ?assert(replayq:is_empty(Q5)), - ok = replayq:close(Q5), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q5). last_segment_corrupted_test() -> Dir = ?DIR, @@ -334,7 +324,7 @@ last_segment_corrupted_test() -> Q8 = replayq:open(Config), ?assert(replayq:is_empty(Q8)), replayq:ack(Q7, AckRef), - ok = cleanup(Dir). + replayq:close_and_purge(Q7). corrupted_segment_test_() -> [{"ramdom", fun() -> test_corrupted_segment(<<"foo">>) end}, @@ -360,7 +350,7 @@ test_corrupted_segment(BadBytes) -> ?assertEqual([<<"item1">>, Item2], Items), ?assert(replayq:is_empty(Q4)), ok = replayq:close(Q4), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q4). comitter_crash_test() -> Dir = ?DIR, @@ -407,17 +397,10 @@ corrupted_commit_test() -> ok = file:write_file(CommitFile, <<"bad-erlang-term">>), %% assert no crash Q4 = replayq:open(Config), - ok = replayq:close(Q4), - ok = cleanup(Dir). + ok = replayq:close_and_purge(Q4). %% helpers =========================================================== -cleanup(Dir) -> - Files = list_segments(Dir), - ok = lists:foreach(fun(F) -> ok = file:delete(filename:join(Dir, F)) end, Files), - _ = file:delete(filename:join(Dir, "COMMIT")), - ok = file:del_dir(Dir). - list_segments(Dir) -> filelib:wildcard("*."?SUFFIX, Dir). data_dir() -> "./test-data".