diff --git a/CMakeLists.txt b/CMakeLists.txt index 56292f0e..6f0e943e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -112,6 +112,8 @@ set(SOURCES c_src/quicer_config.h c_src/quicer_queue.c c_src/quicer_queue.h + c_src/quicer_owner_queue.c + c_src/quicer_owner_queue.h c_src/quicer_ctx.c c_src/quicer_ctx.h c_src/quicer_listener.c diff --git a/c_src/quicer_ctx.c b/c_src/quicer_ctx.c index 7cc1410f..20e9f6bb 100644 --- a/c_src/quicer_ctx.c +++ b/c_src/quicer_ctx.c @@ -270,6 +270,7 @@ init_s_ctx() s_ctx->is_recv_pending = FALSE; s_ctx->is_closed = TRUE; // init s_ctx->event_mask = 0; + s_ctx->sig_queue = NULL; return s_ctx; } diff --git a/c_src/quicer_ctx.h b/c_src/quicer_ctx.h index b1f454c1..5d5aa32d 100644 --- a/c_src/quicer_ctx.h +++ b/c_src/quicer_ctx.h @@ -18,6 +18,7 @@ limitations under the License. #define __QUICER_CTX_H_ #include "quicer_nif.h" +#include "quicer_owner_queue.h" #include "quicer_queue.h" #include #include @@ -137,6 +138,8 @@ typedef struct QuicerStreamCTX // Track lifetime of Stream handle CXPLAT_REF_COUNT ref_count; uint32_t event_mask; + // for ownership handoff + OWNER_SIGNAL_QUEUE *sig_queue; void *reserved1; void *reserved2; void *reserved3; diff --git a/c_src/quicer_nif.c b/c_src/quicer_nif.c index 91eec7a2..0331ec5e 100644 --- a/c_src/quicer_nif.c +++ b/c_src/quicer_nif.c @@ -1486,10 +1486,11 @@ stream_controlling_process(ErlNifEnv *env, { // rollback, must success enif_self(env, &s_ctx->owner->Pid); + flush_sig_buffer(env, s_ctx); enif_monitor_process(env, s_ctx, caller, &s_ctx->owner_mon); return ERROR_TUPLE_2(ATOM_OWNER_DEAD); } - + flush_sig_buffer(env, s_ctx); TP_NIF_3(exit, (uintptr_t)s_ctx->Stream, (uintptr_t)&s_ctx->owner->Pid); return ATOM_OK; } @@ -1575,6 +1576,8 @@ static ErlNifFunc nif_funcs[] = { { "setopt", 4, setopt4, 0}, { "controlling_process", 2, controlling_process, 0}, { "peercert", 1, peercert1, 0}, + { "enable_sig_buffer", 1, enable_sig_buffer, 0}, + { "flush_stream_buffered_sigs", 1, flush_stream_buffered_sigs, 0}, /* for DEBUG */ { "get_conn_rid", 1, get_conn_rid1, 1}, { "get_stream_rid", 1, get_stream_rid1, 1}, @@ -1584,7 +1587,9 @@ static ErlNifFunc nif_funcs[] = { { "get_connections", 1, get_connectionsX, 0}, { "get_conn_owner", 1, get_conn_owner1, 0}, { "get_stream_owner", 1, get_stream_owner1, 0}, - { "get_listener_owner", 1, get_listener_owner1, 0} + { "get_listener_owner", 1, get_listener_owner1, 0}, + /* for testing */ + { "mock_buffer_sig", 3, mock_buffer_sig, 0} // clang-format on }; diff --git a/c_src/quicer_owner_queue.c b/c_src/quicer_owner_queue.c new file mode 100644 index 00000000..c0116a70 --- /dev/null +++ b/c_src/quicer_owner_queue.c @@ -0,0 +1,75 @@ +/*-------------------------------------------------------------------- +Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +-------------------------------------------------------------------*/ +#include "quicer_owner_queue.h" + +OWNER_SIGNAL_QUEUE * +OwnerSignalQueueNew() +{ + OWNER_SIGNAL_QUEUE *sig + = CxPlatAlloc(sizeof(OWNER_SIGNAL_QUEUE), QUICER_OWNER_SIGNAL); + return sig; +} + +void +OwnerSignalQueueInit(OWNER_SIGNAL_QUEUE *queue) +{ + queue->env = enif_alloc_env(); + CxPlatListInitializeHead(&queue->List); +} + +void +OwnerSignalQueueDestroy(OWNER_SIGNAL_QUEUE *queue) +{ + CXPLAT_DBG_ASSERT(CxPlatListIsEmpty(&queue->List)); + enif_free_env(queue->env); + CxPlatFree(queue, QUICER_OWNER_SIGNAL); +} + +OWNER_SIGNAL * +OwnerSignalAlloc() +{ + OWNER_SIGNAL *sig = CxPlatAlloc(sizeof(OWNER_SIGNAL), QUICER_OWNER_SIGNAL); + return sig; +} + +void +OwnerSignalFree(OWNER_SIGNAL *sig) +{ + CXPLAT_FREE(sig, QUICER_OWNER_SIGNAL); +} + +void +OwnerSignalEnqueue(_In_ OWNER_SIGNAL_QUEUE *queue, _In_ OWNER_SIGNAL *sig) +{ + CxPlatListInsertTail(&queue->List, &sig->Link); +} + +OWNER_SIGNAL * +OwnerSignalDequeue(_In_ OWNER_SIGNAL_QUEUE *queue) +{ + OWNER_SIGNAL *sig; + if (CxPlatListIsEmpty(&queue->List)) + { + sig = NULL; + } + else + { + sig = CXPLAT_CONTAINING_RECORD( + CxPlatListRemoveHead(&queue->List), OWNER_SIGNAL, Link); + } + + return sig; +} diff --git a/c_src/quicer_owner_queue.h b/c_src/quicer_owner_queue.h new file mode 100644 index 00000000..4bd3024c --- /dev/null +++ b/c_src/quicer_owner_queue.h @@ -0,0 +1,52 @@ +/*-------------------------------------------------------------------- +Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +-------------------------------------------------------------------*/ +#ifndef QUICER_OWNER_QUEUE_H_ +#define QUICER_OWNER_QUEUE_H_ + +#include + +// clang-format off +#include +#include +#include +// clang-format on + +#define QUICER_OWNER_SIGNAL 'E0rQ' // 'Er0d' QUICER_OWNER_SIGNAL + +// Owner Signal Queue +typedef struct OWNER_SIGNAL_QUEUE +{ + ErlNifEnv *env; + CXPLAT_LIST_ENTRY List; +} OWNER_SIGNAL_QUEUE; + +typedef struct OWNER_SIGNAL +{ + CXPLAT_LIST_ENTRY Link; + ERL_NIF_TERM msg; // resides in `env` of OWNER_SIGNAL_QUEUE + ERL_NIF_TERM orig_owner; // owner when msg is generated +} OWNER_SIGNAL; + +OWNER_SIGNAL_QUEUE *OwnerSignalQueueNew(); +OWNER_SIGNAL *OwnerSignalAlloc(); +void OwnerSignalQueueInit(OWNER_SIGNAL_QUEUE *queue); +void OwnerSignalQueueDestroy(OWNER_SIGNAL_QUEUE *queue); +void OwnerSignalFree(OWNER_SIGNAL *sig); +void OwnerSignalEnqueue(_In_ OWNER_SIGNAL_QUEUE *queue, + _In_ OWNER_SIGNAL *sig); +OWNER_SIGNAL *OwnerSignalDequeue(_In_ OWNER_SIGNAL_QUEUE *queue); + +#endif // QUICER_OWNER_QUEUE_H_ diff --git a/c_src/quicer_stream.c b/c_src/quicer_stream.c index 0acbadcf..9576f5fa 100644 --- a/c_src/quicer_stream.c +++ b/c_src/quicer_stream.c @@ -59,6 +59,9 @@ handle_stream_event_send_shutdown_complete(QuicerStreamCTX *s_ctx, static void reset_stream_recv(QuicerStreamCTX *s_ctx); +static int +signal_or_buffer(QuicerStreamCTX *s_ctx, ErlNifPid *owner, ERL_NIF_TERM sig); + QUIC_STATUS ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event) { @@ -129,6 +132,7 @@ ServerStreamCallback(HQUIC Stream, void *Context, QUIC_STREAM_EVENT *Event) if (is_destroy) { + flush_sig_buffer(NULL, s_ctx); s_ctx->is_closed = TRUE; } @@ -229,6 +233,7 @@ _IRQL_requires_max_(DISPATCH_LEVEL) if (is_destroy) { s_ctx->is_closed = TRUE; + flush_sig_buffer(NULL, s_ctx); MsQuic->SetCallbackHandler(Stream, NULL, NULL); } @@ -1054,7 +1059,7 @@ handle_stream_event_start_complete(QuicerStreamCTX *s_ctx, props_name, props_value, 3); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); } return QUIC_STATUS_SUCCESS; } @@ -1073,7 +1078,7 @@ handle_stream_event_peer_send_shutdown( enif_make_copy(env, s_ctx->eHandle), ATOM_UNDEFINED); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1094,7 +1099,7 @@ handle_stream_event_peer_send_aborted(QuicerStreamCTX *s_ctx, enif_make_copy(env, s_ctx->eHandle), enif_make_uint64(env, Event->PEER_SEND_ABORTED.ErrorCode)); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1114,7 +1119,7 @@ handle_stream_event_peer_receive_aborted(QuicerStreamCTX *s_ctx, ATOM_PEER_RECEIVE_ABORTED, enif_make_copy(env, s_ctx->eHandle), enif_make_uint64(env, Event->PEER_RECEIVE_ABORTED.ErrorCode)); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1147,7 +1152,7 @@ handle_stream_event_shutdown_complete(QuicerStreamCTX *s_ctx, props_name, props_value, 6); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1164,7 +1169,7 @@ handle_stream_event_peer_accepted(QuicerStreamCTX *s_ctx, ATOM_PEER_ACCEPTED, enif_make_copy(env, s_ctx->eHandle), ATOM_UNDEFINED); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1212,7 +1217,8 @@ handle_stream_event_send_shutdown_complete(QuicerStreamCTX *s_ctx, ATOM_SEND_SHUTDOWN_COMPLETE, enif_make_copy(env, s_ctx->eHandle), ATOM_BOOLEAN(is_graceful)); - enif_send(NULL, &(s_ctx->owner->Pid), NULL, report); + + signal_or_buffer(s_ctx, &(s_ctx->owner->Pid), report); return QUIC_STATUS_SUCCESS; } @@ -1268,6 +1274,135 @@ get_stream_owner1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) return res; } +// s_ctx MUST be locked +int +signal_or_buffer(QuicerStreamCTX *s_ctx, + ErlNifPid *owner_pid, + ERL_NIF_TERM msg) +{ + if (s_ctx && s_ctx->sig_queue != NULL) + { + ErlNifEnv *q_env = s_ctx->sig_queue->env; + OWNER_SIGNAL *sig = OwnerSignalAlloc(); + sig->msg = enif_make_copy(q_env, msg); + sig->orig_owner = enif_make_pid(q_env, owner_pid); + OwnerSignalEnqueue(s_ctx->sig_queue, sig); + return TRUE; + } + else + { + return enif_send(NULL, owner_pid, NULL, msg); + } +} + +// s_ctx MUST be locked +BOOLEAN +flush_sig_buffer(ErlNifEnv *env, QuicerStreamCTX *s_ctx) +{ + OWNER_SIGNAL *sig = NULL; + if (!s_ctx->sig_queue) + { + return FALSE; + } + + while ((sig = OwnerSignalDequeue(s_ctx->sig_queue))) + { + // if send failed, msg will be cleared in `OwnerSignalQueueDestroy` + enif_send(env, &(s_ctx->owner->Pid), NULL, sig->msg); + + OwnerSignalFree(sig); + } + OwnerSignalQueueDestroy(s_ctx->sig_queue); + s_ctx->sig_queue = NULL; + return TRUE; +} + +ERL_NIF_TERM +mock_buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerStreamCTX *s_ctx; + ErlNifPid orig_pid; + ERL_NIF_TERM res = ATOM_OK; + + CXPLAT_FRE_ASSERT(argc == 3); + + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + + if (!enif_get_local_pid(env, argv[1], &orig_pid)) + { + return ERROR_TUPLE_2(ATOM_BAD_PID); + } + + enif_mutex_lock(s_ctx->lock); + if (!s_ctx->sig_queue) + { + res = ERROR_TUPLE_2(ATOM_NONE); + goto Exit; + } + + if (!signal_or_buffer(s_ctx, &orig_pid, argv[2])) + { + res = ERROR_TUPLE_2(ATOM_FALSE); + } +Exit: + enif_mutex_unlock(s_ctx->lock); + return res; +} + +ERL_NIF_TERM +flush_stream_buffered_sigs(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerStreamCTX *s_ctx = NULL; + ERL_NIF_TERM res = ATOM_OK; + + CXPLAT_FRE_ASSERT(argc == 1); + + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + enif_mutex_lock(s_ctx->lock); + if (!flush_sig_buffer(env, s_ctx)) + { + res = ERROR_TUPLE_2(ATOM_NONE); + } + enif_mutex_unlock(s_ctx->lock); + return res; +} + +/* +** Enable signal buffering. +** Signals are buffered instead of being sent to the owner. +** call `flush_stream_buffered_sigs` to flush the buffer. +*/ +ERL_NIF_TERM +enable_sig_buffer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + QuicerStreamCTX *s_ctx = NULL; + ERL_NIF_TERM res = ATOM_OK; + + CXPLAT_FRE_ASSERT(argc == 1); + + if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx)) + { + return ERROR_TUPLE_2(ATOM_BADARG); + } + + enif_mutex_lock(s_ctx->lock); + if (!s_ctx->sig_queue) + { + s_ctx->owner->active = ACCEPTOR_RECV_MODE_PASSIVE; + s_ctx->sig_queue = OwnerSignalQueueNew(); + OwnerSignalQueueInit(s_ctx->sig_queue); + } + enif_mutex_unlock(s_ctx->lock); + + return res; +} + ///_* Emacs ///==================================================================== /// Local Variables: diff --git a/c_src/quicer_stream.h b/c_src/quicer_stream.h index 4037d68a..eda43d10 100644 --- a/c_src/quicer_stream.h +++ b/c_src/quicer_stream.h @@ -18,11 +18,14 @@ limitations under the License. #define __QUICER_STREAM_H_ #include "quicer_config.h" +#include "quicer_ctx.h" #include "quicer_internal.h" #include "quicer_nif.h" #define UNSET_STREAMID 0xFFFFFFFFFFFFFFF +struct QuicerStreamCTX; + typedef enum QUICER_SEND_FLAGS { QUICER_SEND_FLAGS_SYNC = 0x1000 @@ -66,3 +69,18 @@ get_stream_rid1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]); ERL_NIF_TERM get_stream_owner1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM +mock_buffer_sig(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM +flush_stream_buffered_sigs(ErlNifEnv *env, + int argc, + const ERL_NIF_TERM argv[]); + +ERL_NIF_TERM +enable_sig_buffer(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); + +typedef struct QuicerStreamCTX QuicerStreamCTX; +BOOLEAN +flush_sig_buffer(ErlNifEnv *env, QuicerStreamCTX *s_ctx); diff --git a/src/quicer.erl b/src/quicer.erl index e3966feb..2e4fed89 100644 --- a/src/quicer.erl +++ b/src/quicer.erl @@ -1101,7 +1101,9 @@ get_listener_owner(Listener) -> quicer_nif:get_listener_owner(Listener). %% @doc set controlling process for Connection/Stream. +%% For Stream, also flush the sig buffer to old owner if failed or new owner if succeeded. %% mimic {@link ssl:controlling_process/2} +%% @see handoff_stream/2 %% @see wait_for_handoff/2 %% @end -spec controlling_process(connection_handle() | stream_handle(), pid()) -> @@ -1129,15 +1131,17 @@ handoff_stream(Stream, NewOwner) -> handoff_stream(Stream, NewOwner, undefined). %% @doc Used by Old stream owner to handoff to the new stream owner. -%% 1. The Stream will be put into passive mode. -%% 2. Stream messages in the current owners process messages queue will +%% 1. The Stream will be put into passive mode so the data is paused. +%% 2. The Stream signal buffer will be enabled, so the signal is paused. +%% 3. Stream messages (for both data and sig )in the current owners process messages queue will %% be forwarded to the New Owner's mailbox in the same recv order. -%% 3. Set the control process of the stream to the new owner. -%% 4. A signal msg `{handoff_done, Stream, PostHandoff}' will be sent to the new owner. +%% 4. Set the control process of the stream to the new owner, signal buffer will be flushed to new owner if succeed, otherwise to the old owner +%% 5. A signal msg `{handoff_done, Stream, PostHandoff}' will be sent to the new owner. %% The new owner should block for this message before handle any stream data to %% ensure the ordering. -%% 5. Revert stream active mode whatever handoff fail or success. +%% 6. Revert stream active mode whatever handoff fail or success. %% also @see wait_for_handoff/2 +%% also @see controlling_process/2 %% @end -spec handoff_stream(stream_handle(), pid(), term()) -> ok | {error, any()}. handoff_stream(Stream, NewOwner, HandoffData) when NewOwner == self() -> @@ -1150,6 +1154,7 @@ handoff_stream(Stream, NewOwner, HandoffData) -> case quicer:getopt(Stream, active) of {ok, ActiveN} -> ActiveN =/= false andalso quicer:setopt(Stream, active, false), + ok = quicer_nif:enable_sig_buffer(Stream), Res = case forward_stream_msgs(Stream, NewOwner) of ok -> @@ -1157,6 +1162,7 @@ handoff_stream(Stream, NewOwner, HandoffData) -> NewOwner ! {handoff_done, Stream, HandoffData}, ok; {error, _} = Other -> + _ = quicer_nif:flush_stream_buffered_sigs(Stream), Other end, ActiveN =/= false andalso quicer:setopt(Stream, active, ActiveN), diff --git a/src/quicer_nif.erl b/src/quicer_nif.erl index 4bb0ca03..edc89784 100644 --- a/src/quicer_nif.erl +++ b/src/quicer_nif.erl @@ -44,7 +44,9 @@ getopt/3, setopt/4, controlling_process/2, - peercert/1 + peercert/1, + enable_sig_buffer/1, + flush_stream_buffered_sigs/1 ]). -export([ @@ -62,7 +64,8 @@ get_connections/1, get_conn_owner/1, get_stream_owner/1, - get_listener_owner/1 + get_listener_owner/1, + mock_buffer_sig/3 ]). -export([abi_version/0]). @@ -373,6 +376,28 @@ get_connections() -> get_connections(_RegHandle) -> erlang:nif_error(nif_library_not_loaded). +%% @doc enable signal buffering, used in stream handoff. +%% * not exposed API. +-spec enable_sig_buffer(stream_handle()) -> ok. +enable_sig_buffer(_H) -> + erlang:nif_error(nif_library_not_loaded). + +%% @doc flush buffered stream signals to the current owner +%% * not exposed API. +%% also @see quicer:controlling_process/2 +%% @end +-spec flush_stream_buffered_sigs(stream_handle()) -> ok | {error, badarg | none}. +flush_stream_buffered_sigs(_H) -> + erlang:nif_error(nif_library_not_loaded). + +%% @doc mock buffer a signal in sig_buffer. +%% for testing sig_buffer +%% @end +-spec mock_buffer_sig(stream_handle(), OrigOwner :: pid(), term()) -> + ok | {error, false | none | bad_pid | bad_arg}. +mock_buffer_sig(_StreamHandle, _OrigOwner, _Msg) -> + erlang:nif_error(nif_library_not_loaded). + %% Internals -spec locate_lib(file:name(), file:name()) -> {ok, file:filename()} | {error, not_found}. diff --git a/test/example_client_connection.erl b/test/example_client_connection.erl index 8791bc2b..7f34d3fd 100644 --- a/test/example_client_connection.erl +++ b/test/example_client_connection.erl @@ -108,8 +108,13 @@ new_stream( %% Spawn new stream case quicer_remote_stream:start_link(example_server_stream, Stream, Conn, SOpts, Flags) of {ok, StreamOwner} -> - quicer:handoff_stream(Stream, StreamOwner), - {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}}; + case quicer:handoff_stream(Stream, StreamOwner) of + ok -> + {ok, CBState#{streams := [{StreamOwner, Stream} | Streams]}}; + {error, E} -> + %% record bad stream + {ok, CBState#{streams := [{E, Stream} | Streams]}} + end; Other -> Other end. @@ -152,5 +157,12 @@ handle_info({quic, Sig, Stream, _} = Msg, #{streams := Streams} = S) when Sig == peer_send_shutdown orelse Sig == stream_closed -> {OwnerPid, Stream} = lists:keyfind(Stream, 2, Streams), - OwnerPid ! Msg, + NewS = + case OwnerPid == owner_down orelse OwnerPid == closed of + true -> + quicer:async_shutdown_stream(Stream), + S#{streams := lists:keydelete(Stream, 2, Streams)}; + false -> + error(fixme) + end, {ok, S}. diff --git a/test/prop_stream_sig_queue.erl b/test/prop_stream_sig_queue.erl new file mode 100644 index 00000000..b8bd67e8 --- /dev/null +++ b/test/prop_stream_sig_queue.erl @@ -0,0 +1,112 @@ +-module(prop_stream_sig_queue). +-include_lib("proper/include/proper.hrl"). +-include_lib("quicer/include/quicer_types.hrl"). +-include("prop_quic_types.hrl"). + +%%%%%%%%%%%%%%%%%% +%%% Properties %%% +%%%%%%%%%%%%%%%%%% +prop_buffer_sig_err_none() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, Term}, + {valid_stream(), pid(), term()}, + begin + Res = quicer_nif:mock_buffer_sig(S, Pid, Term), + Destructor(), + Res == {error, none} + end + ). + +prop_enable_sig_queue() -> + ?FORALL( + #prop_handle{type = stream, handle = S, destructor = Destructor}, + valid_stream(), + begin + ok = quicer:setopt(S, active, 100), + ok = quicer_nif:enable_sig_buffer(S), + Res = quicer:getopt(S, active), + Destructor(), + Res == {ok, false} + end + ). + +prop_buffer_sig_success() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, Term}, + {valid_stream(), pid(), term()}, + begin + ok = quicer_nif:enable_sig_buffer(S), + Res = quicer_nif:mock_buffer_sig(S, Pid, Term), + Destructor(), + Res == ok + end + ). + +prop_flush_buffered_sig_no_owner_change() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, TermList}, + {valid_stream(), pid(), list(term())}, + begin + ok = quicer_nif:enable_sig_buffer(S), + Ref = erlang:make_ref(), + lists:foreach( + fun(Term) -> + quicer_nif:mock_buffer_sig(S, Pid, {Ref, Term}) + end, + TermList + ), + ok = quicer_nif:flush_stream_buffered_sigs(S), + Destructor(), + Rcvd = receive_n(length(TermList), Ref), + Rcvd == TermList + end + ). + +prop_flush_buffered_sig_success() -> + ?FORALL( + {#prop_handle{handle = S, destructor = Destructor}, Pid, TermList}, + {valid_stream(), pid(), list(integer())}, + begin + ok = quicer_nif:enable_sig_buffer(S), + Ref = erlang:make_ref(), + lists:foreach( + fun(Term) -> + ok = quicer_nif:mock_buffer_sig(S, Pid, {Ref, Term}) + end, + TermList + ), + ok = quicer:controlling_process(S, self()), + {ok, NewOwner} = quicer:get_stream_owner(S), + NewOwner = self(), + %% assert already flushed by quicer:controlling_process/2 + {error, none} = quicer_nif:flush_stream_buffered_sigs(S), + Res = receive_n(length(TermList), Ref), + Destructor(), + Res == TermList + end + ). + +%%%%%%%%%%%%%%% +%%% Helpers %%% +%%%%%%%%%%%%%%% +receive_n(N, Ref) -> + receive_n(N, Ref, []). +receive_n(0, _Ref, Acc) -> + lists:reverse(Acc); +receive_n(N, Ref, Acc) -> + receive + {Ref, X} -> + receive_n(N - 1, Ref, [X | Acc]); + {quic, _, _, _} = _Drop -> + receive_n(N, Ref, Acc) + after 500 -> + {timeout, N} + end. + +%%%%%%%%%%%%%%%%%% +%%% Generators %%% +%%%%%%%%%%%%%%%%%% +valid_stream() -> quicer_prop_gen:valid_stream_handle(). + +pid() -> + quicer_prop_gen:pid().