From 41869dd9565148c12a6ade634b95fdea648c5d4a Mon Sep 17 00:00:00 2001 From: "Jonathan M. Henson" Date: Thu, 24 Aug 2023 13:44:01 -0700 Subject: [PATCH] Added deferrment API for use in preventing priority inversions inside event-loops. --- include/aws/common/task_scheduler.h | 18 +++++++++ source/task_scheduler.c | 62 ++++++++++++++++++++++++----- tests/CMakeLists.txt | 1 + tests/task_scheduler_test.c | 44 ++++++++++++++++++++ 4 files changed, 116 insertions(+), 9 deletions(-) diff --git a/include/aws/common/task_scheduler.h b/include/aws/common/task_scheduler.h index a51d6e41e..3c7cbad31 100644 --- a/include/aws/common/task_scheduler.h +++ b/include/aws/common/task_scheduler.h @@ -48,6 +48,8 @@ struct aws_task_scheduler { struct aws_priority_queue timed_queue; /* Tasks scheduled to run at specific times */ struct aws_linked_list timed_list; /* If timed_queue runs out of memory, further timed tests are stored here */ struct aws_linked_list asap_list; /* Tasks scheduled to run as soon as possible */ + struct aws_linked_list deferment_list; + bool in_deferment_boundary; }; AWS_EXTERN_C_BEGIN @@ -105,6 +107,22 @@ void aws_task_scheduler_schedule_future( struct aws_task *task, uint64_t time_to_run); +/** + * Upon entering this boundary, all tasks scheduled will not be executed until the boundary is exited via. + * aws_task_scheduler_exit_deferment_boundary(). This is done in scenarios when you want users to be + * able to schedule, but want to ensure that their tasks are not executed before you intend. + * This is for preventing priority inversions in event loops. + */ +AWS_COMMON_API +void aws_task_scheduler_enter_deferment_boundary(struct aws_task_scheduler *scheduler); + +/** + * Upon exiting this boundary, all tasks scheduled while inside the boundary will be allowed to execute the next time + * the scheduler runs its tasks. + */ +AWS_COMMON_API +void aws_task_scheduler_exit_deferment_boundary(struct aws_task_scheduler *scheduler); + /** * Removes task from the scheduler and invokes the task with the AWS_TASK_STATUS_CANCELED status. */ diff --git a/source/task_scheduler.c b/source/task_scheduler.c index 4467b1249..47fd4e14b 100644 --- a/source/task_scheduler.c +++ b/source/task_scheduler.c @@ -65,6 +65,7 @@ int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_all scheduler->alloc = alloc; aws_linked_list_init(&scheduler->timed_list); aws_linked_list_init(&scheduler->asap_list); + aws_linked_list_init(&scheduler->deferment_list); AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler)); return AWS_OP_SUCCESS; @@ -129,16 +130,26 @@ void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struc AWS_ASSERT(task); AWS_ASSERT(task->fn); + task->priority_queue_node.current_index = SIZE_MAX; + aws_linked_list_node_reset(&task->node); + task->timestamp = 0; + + if (scheduler->in_deferment_boundary) { + AWS_LOGF_DEBUG( + AWS_LS_COMMON_TASK_SCHEDULER, + "id=%p: Deferring scheduling %s task for deferred execution", + (void *)task, + task->type_tag); + aws_linked_list_push_back(&scheduler->deferment_list, &task->node); + return; + } + AWS_LOGF_DEBUG( AWS_LS_COMMON_TASK_SCHEDULER, "id=%p: Scheduling %s task for immediate execution", (void *)task, task->type_tag); - task->priority_queue_node.current_index = SIZE_MAX; - aws_linked_list_node_reset(&task->node); - task->timestamp = 0; - aws_linked_list_push_back(&scheduler->asap_list, &task->node); task->abi_extension.scheduled = true; } @@ -152,6 +163,21 @@ void aws_task_scheduler_schedule_future( AWS_ASSERT(task); AWS_ASSERT(task->fn); + task->timestamp = time_to_run; + + task->priority_queue_node.current_index = SIZE_MAX; + aws_linked_list_node_reset(&task->node); + + if (scheduler->in_deferment_boundary) { + AWS_LOGF_DEBUG( + AWS_LS_COMMON_TASK_SCHEDULER, + "id=%p: Deferring scheduling %s task for deferred execution", + (void *)task, + task->type_tag); + aws_linked_list_push_back(&scheduler->deferment_list, &task->node); + return; + } + AWS_LOGF_DEBUG( AWS_LS_COMMON_TASK_SCHEDULER, "id=%p: Scheduling %s task for future execution at time %" PRIu64, @@ -159,10 +185,7 @@ void aws_task_scheduler_schedule_future( task->type_tag, time_to_run); - task->timestamp = time_to_run; - - task->priority_queue_node.current_index = SIZE_MAX; - aws_linked_list_node_reset(&task->node); + task->abi_extension.scheduled = true; int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node); if (AWS_UNLIKELY(err)) { /* In the (very unlikely) case that we can't push into the timed_queue, @@ -179,7 +202,28 @@ void aws_task_scheduler_schedule_future( } aws_linked_list_insert_before(node_i, &task->node); } - task->abi_extension.scheduled = true; +} + +void aws_task_scheduler_enter_deferment_boundary(struct aws_task_scheduler *scheduler) { + scheduler->in_deferment_boundary = true; +} + +void aws_task_scheduler_exit_deferment_boundary(struct aws_task_scheduler *scheduler) { + scheduler->in_deferment_boundary = false; + + while (!aws_linked_list_empty(&scheduler->deferment_list)) { + struct aws_linked_list_node *deferred_list_node = aws_linked_list_begin(&scheduler->deferment_list); + struct aws_task *deferred_list_task = AWS_CONTAINER_OF(deferred_list_node, struct aws_task, node); + + aws_linked_list_remove(deferred_list_node); + aws_linked_list_node_reset(deferred_list_node); + + if (deferred_list_task->timestamp) { + aws_task_scheduler_schedule_future(scheduler, deferred_list_task, deferred_list_task->timestamp); + } else { + aws_task_scheduler_schedule_now(scheduler, deferred_list_task); + } + } } void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index f8d9a8d44..f09985a58 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -162,6 +162,7 @@ add_test_case(scheduler_cleanup_reentrants) add_test_case(scheduler_schedule_cancellation) add_test_case(scheduler_cleanup_idempotent) add_test_case(scheduler_task_delete_on_run) +add_test_case(scheduler_task_deferrment) add_test_case(test_hash_table_create_find) add_test_case(test_hash_table_string_create_find) diff --git a/tests/task_scheduler_test.c b/tests/task_scheduler_test.c index 49705f48a..9b8437c46 100644 --- a/tests/task_scheduler_test.c +++ b/tests/task_scheduler_test.c @@ -416,6 +416,49 @@ static int s_test_scheduler_task_delete_on_run(struct aws_allocator *allocator, return 0; } +static int s_test_scheduler_deferrment(struct aws_allocator *allocator, void *ctx) { + (void)ctx; + s_executed_tasks_n = 0; + + struct aws_task_scheduler scheduler; + aws_task_scheduler_init(&scheduler, allocator); + + aws_task_scheduler_enter_deferment_boundary(&scheduler); + + struct aws_task task1; + aws_task_init(&task1, s_task_n_fn, (void *)0, "scheduler_deferrment_boundary1"); + aws_task_scheduler_schedule_future(&scheduler, &task1, 5); + + struct aws_task task2; + aws_task_init(&task2, s_task_n_fn, (void *)0, "scheduler_deferrment_boundary2"); + aws_task_scheduler_schedule_now(&scheduler, &task2); + + /* Run scheduler after task is supposed to execute, check that it didn't execute */ + aws_task_scheduler_run_all(&scheduler, 10); + + ASSERT_UINT_EQUALS(0, s_executed_tasks_n); + + aws_task_scheduler_exit_deferment_boundary(&scheduler); + + /* Run scheduler after task is supposed to execute but outside the boundary, and check that it did execute */ + aws_task_scheduler_run_all(&scheduler, 10); + ASSERT_UINT_EQUALS(2, s_executed_tasks_n); + + /* in a single run, asap tasks run first, followed by the timed tasks, so the order should be reversed here. */ + struct executed_task_data *task1_data = &s_executed_tasks[0]; + ASSERT_PTR_EQUALS(&task2, task1_data->task); + ASSERT_PTR_EQUALS(task2.arg, task1_data->arg); + ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task1_data->status); + + struct executed_task_data *task2_data = &s_executed_tasks[1]; + ASSERT_PTR_EQUALS(&task1, task2_data->task); + ASSERT_PTR_EQUALS(task1.arg, task2_data->arg); + ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task2_data->status); + + aws_task_scheduler_clean_up(&scheduler); + return 0; +} + AWS_TEST_CASE(scheduler_pops_task_late_test, s_test_scheduler_pops_task_fashionably_late); AWS_TEST_CASE(scheduler_ordering_test, s_test_scheduler_ordering); AWS_TEST_CASE(scheduler_has_tasks_test, s_test_scheduler_has_tasks); @@ -425,3 +468,4 @@ AWS_TEST_CASE(scheduler_cleanup_reentrants, s_test_scheduler_cleanup_reentrants) AWS_TEST_CASE(scheduler_schedule_cancellation, s_test_scheduler_schedule_cancellation); AWS_TEST_CASE(scheduler_cleanup_idempotent, s_test_scheduler_cleanup_idempotent); AWS_TEST_CASE(scheduler_task_delete_on_run, s_test_scheduler_task_delete_on_run); +AWS_TEST_CASE(scheduler_task_deferrment, s_test_scheduler_deferrment)