Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/william/atomic stream handoff #282

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions c_src/quicer_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1364,9 +1364,11 @@ controlling_process(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
ErlNifPid target, caller;
ERL_NIF_TERM new_owner = argv[1];
ERL_NIF_TERM res = ATOM_OK;
if (argc != 2)
BOOLEAN is_locked = FALSE;
if (argc == 3 || IS_SAME_TERM(argv[2], ATOM_TRUE))
{
return ATOM_BADARG;
// give hint to this call that stream mutex is locked.
is_locked = TRUE;
}

// precheck
Expand All @@ -1388,7 +1390,10 @@ controlling_process(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return ERROR_TUPLE_2(ATOM_CLOSED);
}

enif_mutex_lock(s_ctx->lock);
if (!is_locked)
{
enif_mutex_lock(s_ctx->lock);
}
res = stream_controlling_process(env, s_ctx, &caller, &new_owner);
enif_mutex_unlock(s_ctx->lock);
put_stream_handle(s_ctx);
Expand Down Expand Up @@ -1561,6 +1566,7 @@ static ErlNifFunc nif_funcs[] = {
{ "getopt", 3, getopt3, 0},
{ "setopt", 4, setopt4, 0},
{ "controlling_process", 2, controlling_process, 0},
{ "controlling_process", 3, controlling_process, 0},
{ "peercert", 1, peercert1, 0},
/* for DEBUG */
{ "get_conn_rid", 1, get_conn_rid1, 1},
Expand All @@ -1571,7 +1577,9 @@ static ErlNifFunc nif_funcs[] = {
{ "get_connections", 1, get_connectionsX, 0},
{ "get_conn_owner", 1, get_conn_owner1, 0},
{ "get_stream_owner", 1, get_stream_owner1, 0},
{ "get_listener_owner", 1, get_listener_owner1, 0}
{ "get_listener_owner", 1, get_listener_owner1, 0},
{ "lock_stream", 1, lock_stream, 0},
{ "unlock_stream", 1, unlock_stream, 0}
// clang-format on
};

Expand Down
29 changes: 29 additions & 0 deletions c_src/quicer_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -1268,6 +1268,35 @@ get_stream_owner1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[])
return res;
}

ERL_NIF_TERM
lock_stream(ErlNifEnv *env,
__unused_parm__ int args,
const ERL_NIF_TERM argv[])
{
QuicerStreamCTX *s_ctx;
if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}
enif_mutex_lock(s_ctx->lock);
return ATOM_OK;
}

ERL_NIF_TERM
unlock_stream(ErlNifEnv *env,
__unused_parm__ int args,
const ERL_NIF_TERM argv[])
{
QuicerStreamCTX *s_ctx;
if (!enif_get_resource(env, argv[0], ctx_stream_t, (void **)&s_ctx))
{
return ERROR_TUPLE_2(ATOM_BADARG);
}

enif_mutex_unlock(s_ctx->lock);
return ATOM_OK;
}

///_* Emacs
///====================================================================
/// Local Variables:
Expand Down
6 changes: 6 additions & 0 deletions c_src/quicer_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,9 @@ get_stream_rid1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
get_stream_owner1(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
lock_stream(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]);

ERL_NIF_TERM
unlock_stream(ErlNifEnv *env, int args, const ERL_NIF_TERM argv[]);
9 changes: 8 additions & 1 deletion src/quicer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1150,13 +1150,20 @@ handoff_stream(Stream, NewOwner, HandoffData) ->
case quicer:getopt(Stream, active) of
{ok, ActiveN} ->
ActiveN =/= false andalso quicer:setopt(Stream, active, false),
ok = quicer_nif:lock_stream(Stream),
Res =
case forward_stream_msgs(Stream, NewOwner) of
ok ->
_ = quicer:controlling_process(Stream, NewOwner),
case quicer_nif:controlling_process(Stream, NewOwner, true) of
{error, _} ->
quicer_nif:unlock_stream(Stream);
ok ->
ok
end,
NewOwner ! {handoff_done, Stream, HandoffData},
ok;
{error, _} = Other ->
quicer_nif:unlock_stream(Stream),
Other
end,
ActiveN =/= false andalso quicer:setopt(Stream, active, ActiveN),
Expand Down
19 changes: 18 additions & 1 deletion src/quicer_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
getopt/3,
setopt/4,
controlling_process/2,
peercert/1
controlling_process/3,
peercert/1,
lock_stream/1,
unlock_stream/1
]).

-export([
Expand Down Expand Up @@ -337,11 +340,25 @@ get_stream_rid(_Handle) ->
controlling_process(_H, _P) ->
erlang:nif_error(nif_library_not_loaded).

-spec controlling_process(connection_handle() | stream_handle(), pid(), boolean()) ->
ok
| {error, closed | badarg | owner_dead | not_owner}.
controlling_process(_H, _P, IsLocked) ->
erlang:nif_error(nif_library_not_loaded).

-spec peercert(connection_handle() | stream_handle()) ->
{ok, CertDerEncoded :: binary()} | {error, any()}.
peercert(_Handle) ->
erlang:nif_error(nif_library_not_loaded).

-spec lock_stream(stream_handle()) -> ok | {error, badarg}.
lock_stream(_Handle) ->
erlang:nif_error(nif_library_not_loaded).

-spec unlock_stream(stream_handle()) -> ok | {error, badarg}.
unlock_stream(_Handle) ->
erlang:nif_error(nif_library_not_loaded).

-spec get_conn_owner(connection_handle()) -> get_owner().
get_conn_owner(_) ->
erlang:nif_error(nif_library_not_loaded).
Expand Down
Loading