Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distribution: Add support for external pid as group leader #1480

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/libAtomVM/context.c
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,17 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
return true;
}

bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal)
{
size_t leader_term_size = memory_estimate_usage(signal->signal_term);
ctx->group_leader = UNDEFINED_ATOM;
if (UNLIKELY(memory_ensure_free_opt(ctx, leader_term_size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
return false;
}
ctx->group_leader = memory_copy_term_tree(&ctx->heap, signal->signal_term);
return true;
}

void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info)
{
context_update_flags(ctx, ~Trap, NoFlags);
Expand Down
9 changes: 9 additions & 0 deletions src/libAtomVM/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,15 @@ bool context_process_signal_trap_answer(Context *ctx, struct TermSignal *signal)
*/
void context_process_flush_monitor_signal(Context *ctx, uint64_t ref_ticks, bool info);

/**
* @brief Process set group leader signal
*
* @param ctx the context being executed
* @param signal the message with the group leader term
* @return \c true if successful, \c false in case of memory error
*/
bool context_process_signal_set_group_leader(Context *ctx, struct TermSignal *signal);

/**
* @brief Get process information.
*
Expand Down
15 changes: 9 additions & 6 deletions src/libAtomVM/dist_nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -496,10 +496,10 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
}
term roots[4];
roots[0] = argv[0];
roots[1] = argv[1];
roots[1] = argv[1]; // dist handle, ensure it's not garbage collected until we return
roots[2] = control;
roots[3] = externalterm_to_term_with_roots(data + 1 + bytes_read, binary_len - 1 - bytes_read, ctx, ExternalTermCopy, &bytes_read, 3, roots);
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(4)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
if (UNLIKELY(memory_ensure_free_with_roots(ctx, LIST_SIZE(1, TUPLE_SIZE(2) + TUPLE_SIZE(5)), 4, roots, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
control = roots[2];
Expand All @@ -516,15 +516,18 @@ static term nif_erlang_dist_ctrl_put_data(Context *ctx, int argc, term argv[])
if (UNLIKELY(!term_is_pid(from))) {
RAISE_ERROR(BADARG_ATOM);
}
// term groupleader = term_get_tuple_element(control, 3);
// TODO: handle groupleader which is an externalpid
term groupleader = term_get_tuple_element(control, 3);
if (UNLIKELY(!term_is_pid(groupleader))) {
RAISE_ERROR(BADARG_ATOM);
}
term options = term_get_tuple_element(control, 5);

term request_tuple = term_alloc_tuple(4, &ctx->heap);
term request_tuple = term_alloc_tuple(5, &ctx->heap);
term_put_tuple_element(request_tuple, 0, roots[0]);
term_put_tuple_element(request_tuple, 1, reqid);
term_put_tuple_element(request_tuple, 2, from);
term_put_tuple_element(request_tuple, 3, options);
term_put_tuple_element(request_tuple, 3, groupleader);
term_put_tuple_element(request_tuple, 4, options);
term request_opt = term_alloc_tuple(2, &ctx->heap);
term_put_tuple_element(request_opt, 0, REQUEST_ATOM);
term_put_tuple_element(request_opt, 1, request_tuple);
Expand Down
3 changes: 2 additions & 1 deletion src/libAtomVM/mailbox.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ void mailbox_message_dispose(MailboxMessage *m, Heap *heap)
break;
}
case KillSignal:
case TrapAnswerSignal: {
case TrapAnswerSignal:
case SetGroupLeaderSignal: {
struct TermSignal *term_signal = CONTAINER_OF(m, struct TermSignal, base);
term mso_list = term_signal->storage[STORAGE_MSO_LIST_INDEX];
HeapFragment *fragment = mailbox_message_to_heap_fragment(term_signal, term_signal->heap_end);
Expand Down
1 change: 1 addition & 0 deletions src/libAtomVM/mailbox.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ enum MessageType
TrapExceptionSignal,
FlushMonitorSignal,
FlushInfoMonitorSignal,
SetGroupLeaderSignal,
};

struct MailboxMessage
Expand Down
5 changes: 5 additions & 0 deletions src/libAtomVM/memory.c
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ static enum MemoryGCResult memory_gc(Context *ctx, size_t new_size, size_t num_r
TRACE("- Running copy GC on exit reason\n");
ctx->exit_reason = memory_shallow_copy_term(old_root_fragment, ctx->exit_reason, &ctx->heap.heap_ptr, true);

TRACE("- Running copy GC on group leader\n");
ctx->group_leader = memory_shallow_copy_term(old_root_fragment, ctx->group_leader, &ctx->heap.heap_ptr, true);

TRACE("- Running copy GC on provided roots\n");
for (size_t i = 0; i < num_roots; i++) {
roots[i] = memory_shallow_copy_term(old_root_fragment, roots[i], &ctx->heap.heap_ptr, 1);
Expand Down Expand Up @@ -373,6 +376,8 @@ static enum MemoryGCResult memory_shrink(Context *ctx, size_t new_size, size_t n
}
// ...exit_reason
memory_scan_and_rewrite(1, &ctx->exit_reason, old_heap_root, old_end, delta, true);
// ...group_leader
memory_scan_and_rewrite(1, &ctx->group_leader, old_heap_root, old_end, delta, true);
// ...and MSO list.
term *mso_ptr = &ctx->heap.root->mso_list;
while (!term_is_nil(*mso_ptr)) {
Expand Down
61 changes: 36 additions & 25 deletions src/libAtomVM/nifs.c
Original file line number Diff line number Diff line change
Expand Up @@ -1211,14 +1211,21 @@ static NativeHandlerResult process_console_mailbox(Context *ctx)

// Common handling of spawn/1, spawn/3, spawn_opt/2, spawn_opt/4
// opts_term is [] for spawn/1,3
static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
static term do_spawn(Context *ctx, Context *new_ctx, size_t arity, size_t n_freeze, term opts_term)
{
term min_heap_size_term = interop_proplist_get_value(opts_term, MIN_HEAP_SIZE_ATOM);
term max_heap_size_term = interop_proplist_get_value(opts_term, MAX_HEAP_SIZE_ATOM);
term link_term = interop_proplist_get_value(opts_term, LINK_ATOM);
term monitor_term = interop_proplist_get_value(opts_term, MONITOR_ATOM);
term heap_growth_strategy = interop_proplist_get_value_default(opts_term, ATOMVM_HEAP_GROWTH_ATOM, BOUNDED_FREE_ATOM);
term request_term = interop_proplist_get_value(opts_term, REQUEST_ATOM);
term group_leader;

if (UNLIKELY(request_term != term_nil())) {
group_leader = term_get_tuple_element(request_term, 3);
} else {
group_leader = ctx->group_leader;
}

if (min_heap_size_term != term_nil()) {
if (UNLIKELY(!term_is_integer(min_heap_size_term))) {
Expand All @@ -1245,6 +1252,21 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
}
}

int size = 0;
for (uint32_t i = 0; i < n_freeze; i++) {
size += memory_estimate_usage(new_ctx->x[i + arity - n_freeze]);
}
size += memory_estimate_usage(group_leader);
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
//TODO: new process should be terminated, however a new pid is returned anyway
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
AVM_ABORT();
}
new_ctx->group_leader = memory_copy_term_tree(&new_ctx->heap, group_leader);
for (uint32_t i = 0; i < arity; i++) {
new_ctx->x[i] = memory_copy_term_tree(&new_ctx->heap, new_ctx->x[i]);
}

switch (heap_growth_strategy) {
case BOUNDED_FREE_ATOM:
new_ctx->heap_growth_strategy = BoundedFreeHeapGrowth;
Expand Down Expand Up @@ -1308,7 +1330,7 @@ static term do_spawn(Context *ctx, Context *new_ctx, term opts_term)
term dhandle = term_get_tuple_element(request_term, 0);
term request_ref = term_get_tuple_element(request_term, 1);
term request_from = term_get_tuple_element(request_term, 2);
term request_opts = term_get_tuple_element(request_term, 3);
term request_opts = term_get_tuple_element(request_term, 4);
monitor_term = interop_proplist_get_value(request_opts, MONITOR_ATOM);
// TODO handle link with external nodes
// link_term = interop_proplist_get_value(request_opts, LINK_ATOM);
Expand Down Expand Up @@ -1344,7 +1366,6 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])
VALIDATE_VALUE(opts_term, term_is_list);

Context *new_ctx = context_new(ctx->global);
new_ctx->group_leader = ctx->group_leader;

const term *boxed_value = term_to_const_term_ptr(fun_term);

Expand All @@ -1365,24 +1386,15 @@ static term nif_erlang_spawn_fun_opt(Context *ctx, int argc, term argv[])

// TODO: new process should fail with badarity if arity != 0

int size = 0;
for (uint32_t i = 0; i < n_freeze; i++) {
size += memory_estimate_usage(boxed_value[i + 3]);
}
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
//TODO: new process should be terminated, however a new pid is returned anyway
fprintf(stderr, "Unable to allocate sufficient memory to spawn process.\n");
AVM_ABORT();
}
for (uint32_t i = 0; i < n_freeze; i++) {
new_ctx->x[i + arity - n_freeze] = memory_copy_term_tree(&new_ctx->heap, boxed_value[i + 3]);
new_ctx->x[i + arity - n_freeze] = boxed_value[i + 3];
}

new_ctx->saved_module = fun_module;
new_ctx->saved_ip = fun_module->labels[label];
new_ctx->cp = module_address(fun_module->module_index, fun_module->end_instruction_ii);

return do_spawn(ctx, new_ctx, opts_term);
return do_spawn(ctx, new_ctx, arity, n_freeze, opts_term);
}

term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
Expand All @@ -1399,7 +1411,6 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
VALIDATE_VALUE(opts_term, term_is_list);

Context *new_ctx = context_new(ctx->global);
new_ctx->group_leader = ctx->group_leader;

AtomString module_string = globalcontext_atomstring_from_term(ctx->global, argv[0]);
AtomString function_string = globalcontext_atomstring_from_term(ctx->global, argv[1]);
Expand Down Expand Up @@ -1439,14 +1450,8 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
new_ctx->min_heap_size = min_heap_size;
}

avm_int_t size = memory_estimate_usage(args_term);
if (UNLIKELY(memory_ensure_free_opt(new_ctx, size, MEMORY_CAN_SHRINK) != MEMORY_GC_OK)) {
// Context was not scheduled yet, we can destroy it.
context_destroy(new_ctx);
RAISE_ERROR(OUT_OF_MEMORY_ATOM);
}
while (term_is_nonempty_list(args_term)) {
new_ctx->x[reg_index] = memory_copy_term_tree(&new_ctx->heap, term_get_list_head(args_term));
new_ctx->x[reg_index] = term_get_list_head(args_term);
reg_index++;

args_term = term_get_list_tail(args_term);
Expand All @@ -1456,7 +1461,7 @@ term nif_erlang_spawn_opt(Context *ctx, int argc, term argv[])
}
}

return do_spawn(ctx, new_ctx, opts_term);
return do_spawn(ctx, new_ctx, reg_index, reg_index, opts_term);
}

static term nif_erlang_send_2(Context *ctx, int argc, term argv[])
Expand Down Expand Up @@ -4003,15 +4008,21 @@ static term nif_erlang_group_leader(Context *ctx, int argc, term argv[])
term leader = argv[0];
term pid = argv[1];
VALIDATE_VALUE(pid, term_is_local_pid);
VALIDATE_VALUE(leader, term_is_local_pid);
VALIDATE_VALUE(leader, term_is_pid);

int local_process_id = term_to_local_process_id(pid);
Context *target = globalcontext_get_process_lock(ctx->global, local_process_id);
if (IS_NULL_PTR(target)) {
RAISE_ERROR(BADARG_ATOM);
}

target->group_leader = leader;
if (term_is_local_pid(leader)) {
// We cannot put leader term on the heap
mailbox_send_term_signal(target, SetGroupLeaderSignal, leader);
} else {
target->group_leader = leader;
}

globalcontext_get_process_unlock(ctx->global, target);
return TRUE_ATOM;
}
Expand Down
9 changes: 9 additions & 0 deletions src/libAtomVM/opcodesswitch.h
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,15 @@ static void destroy_extended_registers(Context *ctx, unsigned int live)
context_process_flush_monitor_signal(ctx, flush_signal->ref_ticks, info); \
break; \
} \
case SetGroupLeaderSignal: { \
struct TermSignal *group_leader \
= CONTAINER_OF(signal_message, struct TermSignal, base); \
if (UNLIKELY(!context_process_signal_set_group_leader(ctx, group_leader))) { \
SET_ERROR(OUT_OF_MEMORY_ATOM); \
next_label = &&handle_error; \
} \
break; \
} \
case NormalMessage: { \
UNREACHABLE(); \
} \
Expand Down
107 changes: 92 additions & 15 deletions tests/libs/estdlib/test_net_kernel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ test() ->
ok = test_rpc_loop_from_beam(Platform),
ok = test_autoconnect_fail(Platform),
ok = test_autoconnect_to_beam(Platform),
ok = test_groupleader(Platform),
ok;
false ->
io:format("~s: skipped\n", [?MODULE]),
Expand Down Expand Up @@ -156,21 +157,24 @@ test_autoconnect_to_beam(Platform) ->
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
Node = node(),
erlang:set_cookie(Node, 'AtomVM'),
spawn_link(fun() ->
execute_command(
Platform,
"erl -sname otp -setcookie AtomVM -eval \""
"register(beam, self()),"
"F = fun(G) ->"
" receive"
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
" {Caller, quit} -> Caller ! {self(), quit}"
" after 5000 -> timeout"
" end "
"end, "
"F(F).\" -s init stop -noshell"
)
end),
{Pid, MonitorRef} = spawn_opt(
fun() ->
[] = execute_command(
Platform,
"erl -sname otp -setcookie AtomVM -eval \""
"register(beam, self()),"
"F = fun(G) ->"
" receive"
" {Caller, ping} -> Caller ! {self(), pong}, G(G);"
" {Caller, quit} -> Caller ! {self(), quit}"
" after 5000 -> exit(timeout)"
" end "
"end, "
"F(F).\" -s init stop -noshell"
)
end,
[link, monitor]
),
% Wait sufficiently for beam to be up, without connecting to it since
% that's part of the test
timer:sleep(1000),
Expand Down Expand Up @@ -200,6 +204,79 @@ test_autoconnect_to_beam(Platform) ->
{OTPPid, quit} -> ok
after 5000 -> timeout
end,
normal =
receive
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
after 5000 -> timeout
end,
net_kernel:stop(),
ok.

test_groupleader(Platform) ->
{ok, _NetKernelPid} = net_kernel_start(Platform, atomvm),
Node = node(),
erlang:set_cookie(Node, 'AtomVM'),
register(atomvm, self()),
Parent = self(),
{Pid, MonitorRef} = spawn_opt(
fun() ->
Result = execute_command(
Platform,
"erl -sname otp -setcookie AtomVM -eval \""
"{atomvm, '" ++ atom_to_list(Node) ++
"'} ! {beam, self()}, "
"F = fun(G) ->"
" receive"
" {Caller, apply, M, F, A} -> Result = apply(M, F, A), Caller ! {self(), Result}, G(G);"
" {Caller, quit} -> Caller ! {self(), quit}"
" after 5000 -> exit(timeout)"
" end "
"end, "
"F(F).\" -s init stop -noshell"
),
Parent ! {io_result, Result}
end,
[link, monitor]
),
BeamMainPid =
receive
{beam, BeamMainPid0} ->
BeamMainPid0;
{io_result, Result0} ->
io:format("~s\n", [Result0]),
exit(timeout)
after 5000 -> exit(timeout)
end,
BeamMainPid ! {self(), apply, rpc, call, [Node, io, format, ["hello group leader"]]},
ok =
receive
{BeamMainPid, Result} ->
Result;
{io_result, Result1} ->
io:format("~s\n", [Result1]),
exit(timeout)
after 5000 -> exit(timeout)
end,
BeamMainPid ! {self(), quit},
ok =
receive
{BeamMainPid, quit} ->
ok;
{io_result, Result2} ->
io:format("~s\n", [Result2]),
exit(timeout)
after 5000 -> timeout
end,
"hello group leader" =
receive
{io_result, IOResult} -> IOResult
after 5000 -> timeout
end,
normal =
receive
{'DOWN', MonitorRef, process, Pid, Reason} -> Reason
after 5000 -> timeout
end,
net_kernel:stop(),
ok.

Expand Down
Loading