-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtest_gsoc_taskqueue.c
143 lines (119 loc) · 3.97 KB
/
test_gsoc_taskqueue.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
#define TEST_USE_TASK_ID
#define _GNU_SOURCE
#include "gsoc_taskqueue.h"
#include "gsoc_time.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <assert.h>
#include <utmpx.h>
#include <unistd.h>
#ifndef TESTVAL_EXTENDS /* It may be defined by Makefile */
#define TESTVAL_EXTENDS 1
#endif
typedef struct _worker {
int id;
cpu_set_t cpu;
gsoc_taskqueue* my_taskq;
gsoc_task* tasks;
struct _worker* workers;
long num_workers;
int logged_worker;
} worker;
void* parallel_push_pop_take(void* s)
{
worker* data = (worker*)s;
gsoc_task* task;
size_t victim;
int i;
pthread_setaffinity_np(pthread_self(), sizeof(data->cpu), &data->cpu);
/* Think about fib where two tasks are created by one task */
for (i = 0; i < GSOC_TASKQUEUE_INIT_SIZE * TESTVAL_EXTENDS / 2; ++i) {
gsoc_taskqueue_push(data->my_taskq, &data->tasks[2*i]);
gsoc_taskqueue_push(data->my_taskq, &data->tasks[2*i + 1]);
task = gsoc_taskqueue_pop(data->my_taskq);
if (task && data->id == data->logged_worker)
printf("%lld is popped by CPU%d\n", task->test_id, sched_getcpu()); /* These values are evaluated by `make test' script */
}
/* All tasks are created, now just consume then (with other worker queue).
If no task is in the queue, steal from others. */
while (1) {
task = gsoc_taskqueue_pop(data->my_taskq);
if (task && data->id == data->logged_worker)
printf("%lld is popped by CPU%d\n", task->test_id, sched_getcpu()); /* These values are evaluated by `make test' script */
if (!task)
{
do
{
victim = random() % data->num_workers;
}
while (victim == data->id);
task = gsoc_taskqueue_take(data->workers[victim].my_taskq);
if (task && victim == data->logged_worker)
printf("%lld is taken by CPU%d from CPU%d\n", task->test_id, sched_getcpu(), (int)victim); /* These values are evaluated by `make test' script */
else
return NULL;
}
}
}
int main()
{
int i;
/* for sequential use */
gsoc_taskqueue* q;
gsoc_task *tasks;
/* for parallel use */
long num_cpu = sysconf(_SC_NPROCESSORS_CONF);
gsoc_taskqueue *taskqs[num_cpu];
cpu_set_t cpuset;
pthread_t tids[num_cpu];
worker workers[num_cpu];
int logged_worker = random() % num_cpu;
double t1, t2;
/* initializations for test */
tasks = malloc(sizeof(gsoc_task) * GSOC_TASKQUEUE_INIT_SIZE * 100);
for (i = 0; i < GSOC_TASKQUEUE_INIT_SIZE * 100; ++i)
tasks[i].test_id = i;
q = gsoc_taskqueue_new();
for (i = 0; i < num_cpu; ++i)
taskqs[i] = gsoc_taskqueue_new();
/* push+pop */
for (i = 0; i < GSOC_TASKQUEUE_INIT_SIZE * 100; ++i)
gsoc_taskqueue_push(q, &tasks[i]);
for (i = GSOC_TASKQUEUE_INIT_SIZE * 100 - 1; i >= 0 ; --i)
assert(gsoc_taskqueue_pop(q)->test_id == i);
/* pop for empty deque */
assert(gsoc_taskqueue_pop(q) == NULL);
/* push+take */
for (i = 0; i < GSOC_TASKQUEUE_INIT_SIZE * 100; ++i)
gsoc_taskqueue_push(q, &tasks[i]);
for (i = 0; i < GSOC_TASKQUEUE_INIT_SIZE * 100; ++i)
assert(gsoc_taskqueue_take(q)->test_id == i);
/* take for empty deque */
assert(gsoc_taskqueue_take(q) == NULL);
/* Emulate workers */
fprintf(stderr, "==========\nWith %d CPUs\n===========\n", (int)num_cpu);
t1 = gettimeofday_sec();
for (i = 0; i < num_cpu; ++i) {
CPU_ZERO(&cpuset);
CPU_SET(i, &cpuset);
workers[i].cpu = cpuset;
workers[i].id = i;
workers[i].my_taskq = taskqs[i];
workers[i].tasks = tasks;
workers[i].workers = workers;
workers[i].num_workers = num_cpu;
workers[i].logged_worker = logged_worker;
pthread_create(&tids[i], NULL, parallel_push_pop_take, &workers[i]);
}
for (i = 0; i < num_cpu; ++i)
pthread_join(tids[i], NULL);
t2 = gettimeofday_sec();
fprintf(stderr, "%f sec elaplsed for parallel_push_pop_take()\n", t2-t1);
/* finalization for test */
for (i = 0; i < num_cpu; ++i)
gsoc_taskqueue_delete(taskqs[i]);
gsoc_taskqueue_delete(q);
free(tasks);
return 0;
}