Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove non portable use of pthread_t #218

Merged
merged 4 commits into from
Jan 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/aws/io/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
* Invokes the destroy() fn for the event loop implementation.
* If the event loop is still in a running state, this function will block waiting on the event loop to shutdown.
* If you do not want this function to block, call aws_event_loop_stop() manually first.
* If the event loop is shared by multiple threads then destroy must be called by exactly one thread. All other threads
* must ensure their API calls to the event loop happen-before the call to destroy.
*/
AWS_IO_API
void aws_event_loop_destroy(struct aws_event_loop *event_loop);
Expand Down
38 changes: 23 additions & 15 deletions source/bsd/kqueue_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,15 @@ enum pipe_fd_index {
};

struct kqueue_loop {
struct aws_thread thread;
struct aws_atomic_var thread_id;
/* thread_created_on is the handle to the event loop thread. */
struct aws_thread thread_created_on;
/* thread_joined_to is used by the thread destroying the event loop. */
aws_thread_id_t thread_joined_to;
/* running_thread_id is NULL if the event loop thread is stopped or points-to the thread_id of the thread running
* the event loop (either thread_created_on or thread_joined_to). Atomic because of concurrent writes (e.g.,
* run/stop) and reads (e.g., is_event_loop_thread).
* An aws_thread_id_t variable itself cannot be atomic because it is an opaque type that is platform-dependent. */
struct aws_atomic_var running_thread_id;
int kq_fd; /* kqueue file descriptor */

/* Pipe for signaling to event-thread that cross_thread_data has changed. */
Expand Down Expand Up @@ -162,11 +169,11 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
if (!impl) {
goto clean_up;
}
/* intialize thread id to 0. It will be set when the event loop thread starts. */
aws_atomic_init_int(&impl->thread_id, (size_t)0);
/* intialize thread id to NULL. It will be set when the event loop thread starts. */
aws_atomic_init_ptr(&impl->running_thread_id, NULL);
clean_up_impl_mem = true;

err = aws_thread_init(&impl->thread, alloc);
err = aws_thread_init(&impl->thread_created_on, alloc);
if (err) {
goto clean_up;
}
Expand Down Expand Up @@ -267,7 +274,7 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
close(impl->kq_fd);
}
if (clean_up_thread) {
aws_thread_clean_up(&impl->thread);
aws_thread_clean_up(&impl->thread_created_on);
}
if (clean_up_impl_mem) {
aws_mem_release(alloc, impl);
Expand Down Expand Up @@ -297,7 +304,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
return;
}
/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
impl->thread_joined_to = aws_thread_current_thread_id();
aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);

/* Clean up task-related stuff first. It's possible the a cancelled task adds further tasks to this event_loop.
* Tasks added in this way will be in cross_thread_data.tasks_to_schedule, so we clean that up last */
Expand Down Expand Up @@ -337,7 +345,7 @@ static void s_destroy(struct aws_event_loop *event_loop) {
close(impl->cross_thread_signal_pipe[READ_FD]);
close(impl->cross_thread_signal_pipe[WRITE_FD]);
close(impl->kq_fd);
aws_thread_clean_up(&impl->thread);
aws_thread_clean_up(&impl->thread_created_on);
aws_mem_release(event_loop->alloc, impl);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);
Expand All @@ -355,7 +363,7 @@ static int s_run(struct aws_event_loop *event_loop) {
* and it's ok to touch cross_thread_data without locking the mutex */
impl->cross_thread_data.state = EVENT_THREAD_STATE_RUNNING;

int err = aws_thread_launch(&impl->thread, s_event_thread_main, (void *)event_loop, NULL);
int err = aws_thread_launch(&impl->thread_created_on, s_event_thread_main, (void *)event_loop, NULL);
if (err) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
goto clean_up;
Expand Down Expand Up @@ -415,7 +423,7 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
aws_mutex_unlock(&impl->cross_thread_data.mutex);
#endif

int err = aws_thread_join(&impl->thread);
int err = aws_thread_join(&impl->thread_created_on);
if (err) {
return AWS_OP_ERR;
}
Expand Down Expand Up @@ -711,8 +719,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
static bool s_is_event_thread(struct aws_event_loop *event_loop) {
struct kqueue_loop *impl = event_loop->impl_data;

uint64_t thread_id = aws_atomic_load_int(&impl->thread_id);
return aws_thread_current_thread_id() == thread_id;
aws_thread_id_t *thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
nchong-at-aws marked this conversation as resolved.
Show resolved Hide resolved
}

/* Called from thread.
Expand Down Expand Up @@ -798,7 +806,7 @@ static void s_event_thread_main(void *user_data) {
struct kqueue_loop *impl = event_loop->impl_data;

/* set thread id to the event-loop's thread. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);

AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
Expand Down Expand Up @@ -957,6 +965,6 @@ static void s_event_thread_main(void *user_data) {
}

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
/* reset to 0. This should be updated again during destroy before tasks are canceled. */
aws_atomic_store_int(&impl->thread_id, 0);
/* reset to NULL. This should be updated again during destroy before tasks are canceled. */
aws_atomic_store_ptr(&impl->running_thread_id, NULL);
}
32 changes: 17 additions & 15 deletions source/linux/epoll_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ static struct aws_event_loop_vtable s_vtable = {

struct epoll_loop {
struct aws_task_scheduler scheduler;
struct aws_thread thread;
struct aws_atomic_var thread_id;
struct aws_thread thread_created_on;
aws_thread_id_t thread_joined_to;
struct aws_atomic_var running_thread_id;
struct aws_io_handle read_task_handle;
struct aws_io_handle write_task_handle;
struct aws_mutex task_pre_queue_mutex;
Expand Down Expand Up @@ -135,8 +136,8 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
goto cleanup_base_loop;
}

/* initialize thread id to 0, it should be updated when the event loop thread starts. */
aws_atomic_init_int(&epoll_loop->thread_id, 0);
/* initialize thread id to NULL, it should be updated when the event loop thread starts. */
aws_atomic_init_ptr(&epoll_loop->running_thread_id, NULL);

aws_linked_list_init(&epoll_loop->task_pre_queue);
epoll_loop->task_pre_queue_mutex = (struct aws_mutex)AWS_MUTEX_INIT;
Expand All @@ -149,7 +150,7 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
goto clean_up_epoll;
}

if (aws_thread_init(&epoll_loop->thread, alloc)) {
if (aws_thread_init(&epoll_loop->thread_created_on, alloc)) {
goto clean_up_epoll;
}

Expand Down Expand Up @@ -207,7 +208,7 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
#endif

clean_up_thread:
aws_thread_clean_up(&epoll_loop->thread);
aws_thread_clean_up(&epoll_loop->thread_created_on);

clean_up_epoll:
if (epoll_loop->epoll_fd >= 0) {
Expand Down Expand Up @@ -236,7 +237,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
s_wait_for_stop_completion(event_loop);

/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id());
epoll_loop->thread_joined_to = aws_thread_current_thread_id();
aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_joined_to);
aws_task_scheduler_clean_up(&epoll_loop->scheduler);

while (!aws_linked_list_empty(&epoll_loop->task_pre_queue)) {
Expand All @@ -245,7 +247,7 @@ static void s_destroy(struct aws_event_loop *event_loop) {
task->fn(task, task->arg, AWS_TASK_STATUS_CANCELED);
}

aws_thread_clean_up(&epoll_loop->thread);
aws_thread_clean_up(&epoll_loop->thread_created_on);
#if USE_EFD
close(epoll_loop->write_task_handle.data.fd);
epoll_loop->write_task_handle.data.fd = -1;
Expand All @@ -267,7 +269,7 @@ static int s_run(struct aws_event_loop *event_loop) {
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);

epoll_loop->should_continue = true;
if (aws_thread_launch(&epoll_loop->thread, &s_main_loop, event_loop, NULL)) {
if (aws_thread_launch(&epoll_loop->thread_created_on, &s_main_loop, event_loop, NULL)) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
epoll_loop->should_continue = false;
return AWS_OP_ERR;
Expand Down Expand Up @@ -311,7 +313,7 @@ static int s_stop(struct aws_event_loop *event_loop) {

static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
struct epoll_loop *epoll_loop = event_loop->impl_data;
return aws_thread_join(&epoll_loop->thread);
return aws_thread_join(&epoll_loop->thread_created_on);
}

static void s_schedule_task_common(struct aws_event_loop *event_loop, struct aws_task *task, uint64_t run_at_nanos) {
Expand Down Expand Up @@ -474,8 +476,8 @@ static int s_unsubscribe_from_io_events(struct aws_event_loop *event_loop, struc
static bool s_is_on_callers_thread(struct aws_event_loop *event_loop) {
struct epoll_loop *epoll_loop = event_loop->impl_data;

uint64_t thread_id = aws_atomic_load_int(&epoll_loop->thread_id);
return aws_thread_current_thread_id() == thread_id;
aws_thread_id_t *thread_id = aws_atomic_load_ptr(&epoll_loop->running_thread_id);
return thread_id && aws_thread_thread_id_equal(*thread_id, aws_thread_current_thread_id());
}

/* We treat the pipe fd with a subscription to io events just like any other managed file descriptor.
Expand Down Expand Up @@ -545,7 +547,7 @@ static void s_main_loop(void *args) {
struct epoll_loop *epoll_loop = event_loop->impl_data;

/* set thread id to the thread of the event loop */
aws_atomic_store_int(&epoll_loop->thread_id, (size_t)aws_thread_current_thread_id());
aws_atomic_store_ptr(&epoll_loop->running_thread_id, &epoll_loop->thread_created_on.thread_id);

int err = s_subscribe_to_io_events(
event_loop, &epoll_loop->read_task_handle, AWS_IO_EVENT_TYPE_READABLE, s_on_tasks_to_schedule, NULL);
Expand Down Expand Up @@ -658,6 +660,6 @@ static void s_main_loop(void *args) {

AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
s_unsubscribe_from_io_events(event_loop, &epoll_loop->read_task_handle);
/* set thread id back to 0. This should be updated again in destroy, before tasks are canceled. */
aws_atomic_store_int(&epoll_loop->thread_id, (size_t)0);
/* set thread id back to NULL. This should be updated again in destroy, before tasks are canceled. */
aws_atomic_store_ptr(&epoll_loop->running_thread_id, NULL);
}
32 changes: 17 additions & 15 deletions source/windows/iocp/iocp_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ typedef enum event_thread_state {

struct iocp_loop {
HANDLE iocp_handle;
struct aws_thread thread;
struct aws_atomic_var thread_id;
struct aws_thread thread_created_on;
aws_thread_id_t thread_joined_to;
struct aws_atomic_var running_thread_id;

/* synced_data holds things that must be communicated across threads.
* When the event-thread is running, the mutex must be locked while anyone touches anything in synced_data.
Expand Down Expand Up @@ -192,8 +193,8 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
goto clean_up;
}

/* initialize thread id to 0. This will be updated once the event loop thread starts. */
aws_atomic_init_int(&impl->thread_id, 0);
/* initialize thread id to NULL. This will be updated once the event loop thread starts. */
aws_atomic_init_ptr(&impl->running_thread_id, NULL);

impl->iocp_handle = CreateIoCompletionPort(
INVALID_HANDLE_VALUE, /* FileHandle: passing invalid handle creates a new IOCP */
Expand All @@ -211,7 +212,7 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
}
clean_up_iocp_handle = true;

err = aws_thread_init(&impl->thread, alloc);
err = aws_thread_init(&impl->thread_created_on, alloc);
if (err) {
goto clean_up;
}
Expand Down Expand Up @@ -248,7 +249,7 @@ struct aws_event_loop *aws_event_loop_new_default(struct aws_allocator *alloc, a
}

if (clean_up_thread) {
aws_thread_clean_up(&impl->thread);
aws_thread_clean_up(&impl->thread_created_on);
}

if (clean_up_iocp_handle) {
Expand Down Expand Up @@ -290,7 +291,8 @@ static void s_destroy(struct aws_event_loop *event_loop) {
}

/* setting this so that canceled tasks don't blow up when asking if they're on the event-loop thread. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
impl->thread_joined_to = aws_thread_current_thread_id();
aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_joined_to);

/* Clean up task-related stuff first.
* It's possible the a cancelled task adds further tasks to this event_loop, these new tasks would end up in
Expand All @@ -310,7 +312,7 @@ static void s_destroy(struct aws_event_loop *event_loop) {
(void)close_iocp_success;

aws_mutex_clean_up(&impl->synced_data.mutex);
aws_thread_clean_up(&impl->thread);
aws_thread_clean_up(&impl->thread_created_on);
aws_mem_release(event_loop->alloc, impl);
aws_event_loop_clean_up_base(event_loop);
aws_mem_release(event_loop->alloc, event_loop);
Expand Down Expand Up @@ -347,7 +349,7 @@ static int s_run(struct aws_event_loop *event_loop) {
impl->synced_data.state = EVENT_THREAD_STATE_RUNNING;

AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: Starting event-loop thread.", (void *)event_loop);
int err = aws_thread_launch(&impl->thread, s_event_thread_main, event_loop, NULL);
int err = aws_thread_launch(&impl->thread_created_on, s_event_thread_main, event_loop, NULL);
if (err) {
AWS_LOGF_FATAL(AWS_LS_IO_EVENT_LOOP, "id=%p: thread creation failed.", (void *)event_loop);
goto clean_up;
Expand Down Expand Up @@ -396,7 +398,7 @@ static int s_wait_for_stop_completion(struct aws_event_loop *event_loop) {
aws_mutex_unlock(&impl->synced_data.mutex);
#endif

int err = aws_thread_join(&impl->thread);
int err = aws_thread_join(&impl->thread_created_on);
if (err) {
return AWS_OP_ERR;
}
Expand Down Expand Up @@ -483,8 +485,8 @@ static bool s_is_event_thread(struct aws_event_loop *event_loop) {
struct iocp_loop *impl = event_loop->impl_data;
AWS_ASSERT(impl);

uint64_t el_thread_id = aws_atomic_load_int(&impl->thread_id);
return el_thread_id == aws_thread_current_thread_id();
aws_thread_id_t *el_thread_id = aws_atomic_load_ptr(&impl->running_thread_id);
return el_thread_id && aws_thread_thread_id_equal(*el_thread_id, aws_thread_current_thread_id());
}

/* Called from any thread */
Expand Down Expand Up @@ -617,7 +619,7 @@ static void s_event_thread_main(void *user_data) {
struct iocp_loop *impl = event_loop->impl_data;

/* Set thread id to event loop thread id. */
aws_atomic_store_int(&impl->thread_id, (size_t)aws_thread_current_thread_id());
aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);

AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
Expand Down Expand Up @@ -721,6 +723,6 @@ static void s_event_thread_main(void *user_data) {
}
}
AWS_LOGF_DEBUG(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
/* set back to 0. This should be updated again in destroy, right before task cancelation happens. */
aws_atomic_store_int(&impl->thread_id, (size_t)0);
/* set back to NULL. This should be updated again in destroy, right before task cancelation happens. */
aws_atomic_store_ptr(&impl->running_thread_id, NULL);
}
8 changes: 4 additions & 4 deletions tests/event_loop_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
struct task_args {
bool invoked;
bool was_in_thread;
uint64_t thread_id;
aws_thread_id_t thread_id;
struct aws_event_loop *loop;
struct aws_event_loop_group *el_group;
enum aws_task_status status;
Expand Down Expand Up @@ -85,7 +85,7 @@ static int s_test_event_loop_xthread_scheduled_tasks_execute(struct aws_allocato
ASSERT_TRUE(task_args.invoked);
aws_mutex_unlock(&task_args.mutex);

ASSERT_FALSE(task_args.thread_id == aws_thread_current_thread_id());
ASSERT_FALSE(aws_thread_thread_id_equal(task_args.thread_id, aws_thread_current_thread_id()));

/* Test "now" tasks */
task_args.invoked = false;
Expand Down Expand Up @@ -152,7 +152,7 @@ static int s_test_event_loop_canceled_tasks_run_in_el_thread(struct aws_allocato
&task1_args.condition_variable, &task1_args.mutex, s_task_ran_predicate, &task1_args));
ASSERT_TRUE(task1_args.invoked);
ASSERT_TRUE(task1_args.was_in_thread);
ASSERT_FALSE(task1_args.thread_id == aws_thread_current_thread_id());
ASSERT_FALSE(aws_thread_thread_id_equal(task1_args.thread_id, aws_thread_current_thread_id()));
ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task1_args.status);
aws_mutex_unlock(&task1_args.mutex);

Expand All @@ -166,7 +166,7 @@ static int s_test_event_loop_canceled_tasks_run_in_el_thread(struct aws_allocato
aws_mutex_unlock(&task2_args.mutex);

ASSERT_TRUE(task2_args.was_in_thread);
ASSERT_TRUE(task2_args.thread_id == aws_thread_current_thread_id());
ASSERT_TRUE(aws_thread_thread_id_equal(task2_args.thread_id, aws_thread_current_thread_id()));
ASSERT_INT_EQUALS(AWS_TASK_STATUS_CANCELED, task2_args.status);

return AWS_OP_SUCCESS;
Expand Down