Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE YET: Added deferrment API for use in preventing priority inversions inside… #1052

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions include/aws/common/task_scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ struct aws_task_scheduler {
struct aws_priority_queue timed_queue; /* Tasks scheduled to run at specific times */
struct aws_linked_list timed_list; /* If timed_queue runs out of memory, further timed tests are stored here */
struct aws_linked_list asap_list; /* Tasks scheduled to run as soon as possible */
struct aws_linked_list deferment_list;
bool in_deferment_boundary;
};

AWS_EXTERN_C_BEGIN
Expand Down Expand Up @@ -105,6 +107,22 @@ void aws_task_scheduler_schedule_future(
struct aws_task *task,
uint64_t time_to_run);

/**
* Upon entering this boundary, all tasks scheduled will not be executed until the boundary is exited via.
* aws_task_scheduler_exit_deferment_boundary(). This is done in scenarios when you want users to be
* able to schedule, but want to ensure that their tasks are not executed before you intend.
* This is for preventing priority inversions in event loops.
*/
AWS_COMMON_API
void aws_task_scheduler_enter_deferment_boundary(struct aws_task_scheduler *scheduler);

/**
* Upon exiting this boundary, all tasks scheduled while inside the boundary will be allowed to execute the next time
* the scheduler runs its tasks.
*/
AWS_COMMON_API
void aws_task_scheduler_exit_deferment_boundary(struct aws_task_scheduler *scheduler);

/**
* Removes task from the scheduler and invokes the task with the AWS_TASK_STATUS_CANCELED status.
*/
Expand Down
62 changes: 53 additions & 9 deletions source/task_scheduler.c
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ int aws_task_scheduler_init(struct aws_task_scheduler *scheduler, struct aws_all
scheduler->alloc = alloc;
aws_linked_list_init(&scheduler->timed_list);
aws_linked_list_init(&scheduler->asap_list);
aws_linked_list_init(&scheduler->deferment_list);

AWS_POSTCONDITION(aws_task_scheduler_is_valid(scheduler));
return AWS_OP_SUCCESS;
Expand Down Expand Up @@ -129,16 +130,26 @@ void aws_task_scheduler_schedule_now(struct aws_task_scheduler *scheduler, struc
AWS_ASSERT(task);
AWS_ASSERT(task->fn);

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);
task->timestamp = 0;

if (scheduler->in_deferment_boundary) {
AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Deferring scheduling %s task for deferred execution",
(void *)task,
task->type_tag);
aws_linked_list_push_back(&scheduler->deferment_list, &task->node);
return;
}

AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Scheduling %s task for immediate execution",
(void *)task,
task->type_tag);

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);
task->timestamp = 0;

aws_linked_list_push_back(&scheduler->asap_list, &task->node);
task->abi_extension.scheduled = true;
}
Expand All @@ -152,17 +163,29 @@ void aws_task_scheduler_schedule_future(
AWS_ASSERT(task);
AWS_ASSERT(task->fn);

task->timestamp = time_to_run;

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);

if (scheduler->in_deferment_boundary) {
AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Deferring scheduling %s task for deferred execution",
(void *)task,
task->type_tag);
aws_linked_list_push_back(&scheduler->deferment_list, &task->node);
return;
}

AWS_LOGF_DEBUG(
AWS_LS_COMMON_TASK_SCHEDULER,
"id=%p: Scheduling %s task for future execution at time %" PRIu64,
(void *)task,
task->type_tag,
time_to_run);

task->timestamp = time_to_run;

task->priority_queue_node.current_index = SIZE_MAX;
aws_linked_list_node_reset(&task->node);
task->abi_extension.scheduled = true;
int err = aws_priority_queue_push_ref(&scheduler->timed_queue, &task, &task->priority_queue_node);
if (AWS_UNLIKELY(err)) {
/* In the (very unlikely) case that we can't push into the timed_queue,
Expand All @@ -179,7 +202,28 @@ void aws_task_scheduler_schedule_future(
}
aws_linked_list_insert_before(node_i, &task->node);
}
task->abi_extension.scheduled = true;
}

void aws_task_scheduler_enter_deferment_boundary(struct aws_task_scheduler *scheduler) {
scheduler->in_deferment_boundary = true;
}

void aws_task_scheduler_exit_deferment_boundary(struct aws_task_scheduler *scheduler) {
scheduler->in_deferment_boundary = false;

while (!aws_linked_list_empty(&scheduler->deferment_list)) {
struct aws_linked_list_node *deferred_list_node = aws_linked_list_begin(&scheduler->deferment_list);
struct aws_task *deferred_list_task = AWS_CONTAINER_OF(deferred_list_node, struct aws_task, node);

aws_linked_list_remove(deferred_list_node);
aws_linked_list_node_reset(deferred_list_node);

if (deferred_list_task->timestamp) {
aws_task_scheduler_schedule_future(scheduler, deferred_list_task, deferred_list_task->timestamp);
} else {
aws_task_scheduler_schedule_now(scheduler, deferred_list_task);
}
}
}

void aws_task_scheduler_run_all(struct aws_task_scheduler *scheduler, uint64_t current_time) {
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ add_test_case(scheduler_cleanup_reentrants)
add_test_case(scheduler_schedule_cancellation)
add_test_case(scheduler_cleanup_idempotent)
add_test_case(scheduler_task_delete_on_run)
add_test_case(scheduler_task_deferrment)

add_test_case(test_hash_table_create_find)
add_test_case(test_hash_table_string_create_find)
Expand Down
44 changes: 44 additions & 0 deletions tests/task_scheduler_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,49 @@ static int s_test_scheduler_task_delete_on_run(struct aws_allocator *allocator,
return 0;
}

static int s_test_scheduler_deferrment(struct aws_allocator *allocator, void *ctx) {
(void)ctx;
s_executed_tasks_n = 0;

struct aws_task_scheduler scheduler;
aws_task_scheduler_init(&scheduler, allocator);

aws_task_scheduler_enter_deferment_boundary(&scheduler);

struct aws_task task1;
aws_task_init(&task1, s_task_n_fn, (void *)0, "scheduler_deferrment_boundary1");
aws_task_scheduler_schedule_future(&scheduler, &task1, 5);

struct aws_task task2;
aws_task_init(&task2, s_task_n_fn, (void *)0, "scheduler_deferrment_boundary2");
aws_task_scheduler_schedule_now(&scheduler, &task2);

/* Run scheduler after task is supposed to execute, check that it didn't execute */
aws_task_scheduler_run_all(&scheduler, 10);

ASSERT_UINT_EQUALS(0, s_executed_tasks_n);

aws_task_scheduler_exit_deferment_boundary(&scheduler);

/* Run scheduler after task is supposed to execute but outside the boundary, and check that it did execute */
aws_task_scheduler_run_all(&scheduler, 10);
ASSERT_UINT_EQUALS(2, s_executed_tasks_n);

/* in a single run, asap tasks run first, followed by the timed tasks, so the order should be reversed here. */
struct executed_task_data *task1_data = &s_executed_tasks[0];
ASSERT_PTR_EQUALS(&task2, task1_data->task);
ASSERT_PTR_EQUALS(task2.arg, task1_data->arg);
ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task1_data->status);

struct executed_task_data *task2_data = &s_executed_tasks[1];
ASSERT_PTR_EQUALS(&task1, task2_data->task);
ASSERT_PTR_EQUALS(task1.arg, task2_data->arg);
ASSERT_INT_EQUALS(AWS_TASK_STATUS_RUN_READY, task2_data->status);

aws_task_scheduler_clean_up(&scheduler);
return 0;
}

AWS_TEST_CASE(scheduler_pops_task_late_test, s_test_scheduler_pops_task_fashionably_late);
AWS_TEST_CASE(scheduler_ordering_test, s_test_scheduler_ordering);
AWS_TEST_CASE(scheduler_has_tasks_test, s_test_scheduler_has_tasks);
Expand All @@ -425,3 +468,4 @@ AWS_TEST_CASE(scheduler_cleanup_reentrants, s_test_scheduler_cleanup_reentrants)
AWS_TEST_CASE(scheduler_schedule_cancellation, s_test_scheduler_schedule_cancellation);
AWS_TEST_CASE(scheduler_cleanup_idempotent, s_test_scheduler_cleanup_idempotent);
AWS_TEST_CASE(scheduler_task_delete_on_run, s_test_scheduler_task_delete_on_run);
AWS_TEST_CASE(scheduler_task_deferrment, s_test_scheduler_deferrment)