forked from ifilippov/nyc_taxi
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaggregate.h
156 lines (143 loc) · 6.17 KB
/
aggregate.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#ifndef AGGREGATE_H
#define AGGREGATE_H
#include <arrow/api.h>
#include <group_by.h>
#include <print.h>
#include <util.h>
#include <cassert>
//#if USE_TBB
#include <tbb/tbb.h>
//#endif
// This helper is equivalent to runtime dispatch like switch((E) t) {case Args[0]:...; case Args[1]:...;...}
// except it predefines all the case branches so it is easier to support multiple instances of runtime dispatch
// over the same type & values. Example:
// using aggregate_dispatcher = runtime_dispatcher<aggregate_task_type, count, sum, min, max, average>;
// aggregate_dispatcher::call<template body>(dispatch_value, c, gb, &ptask);
template<typename E, E T, E... Args>
struct runtime_dispatcher {
template<template<E> typename F, typename... A>
static auto call(E t, A... args) -> decltype(F<T>()(args...)) {
if(t==T)
return F<T>()(args...);
else if constexpr(sizeof...(Args))
return runtime_dispatcher<E, Args...>::template call<F>(t, args...);
assert(false);
}
};
//++++++++++++++++++++++++++++++
// AGGREGATE
//++++++++++++++++++++++++++++++
enum aggregate_task_type { count, sum, min, max, average /*TODO median*/ };
using aggregate_dispatcher = runtime_dispatcher<aggregate_task_type, count, sum, min, max, average>;
struct aggregate_task {
aggregate_task_type type;
int from_column;
};
template <typename T>
struct partial_aggregate_task {
aggregate_task *task;
// mutable buffers?
std::vector<T> partial;
std::vector<T> partial1;
};
template<aggregate_task_type a, typename T>
void aggregate_internal(partial_aggregate_task<T>* p_task, int row_number, int value, int c) {
if constexpr(a == sum) {
p_task->partial[row_number] += value;
}
if constexpr(a == count) {
p_task->partial1[row_number] += c;
}
if constexpr(a == min) {
if (value < p_task->partial[row_number]) {
p_task->partial[row_number] = value;
}
}
if constexpr(a == max) {
if (value > p_task->partial[row_number]) {
p_task->partial[row_number] = value;
}
}
if constexpr(a == average) {
p_task->partial[row_number] += value;
p_task->partial1[row_number] += c;
}
}
template <typename T, typename T2, typename T4>
struct aggregate_types {
template<aggregate_task_type A>
struct function {
void sequential(std::shared_ptr<T2> c, group *gb, partial_aggregate_task<T>* p_task, int chunk_number) {
auto cv = c->raw_values();
if (gb) {
for (int j = 0; j < c->length(); j++) {
aggregate_internal<A>(p_task, gb->redirection[chunk_number][j], cv[j], 1);
}
} else {
for (int j = 0; j < c->length(); j++) {
aggregate_internal<A>(p_task, 0, cv[j], 1);
}
}
}
std::shared_ptr<arrow::Array> finalize(partial_aggregate_task<T> &&p_task) {
if constexpr(A == count) {
return vector_to_array<T, T4>(p_task.partial1);
}
if constexpr(A == average) {
for (int j = 0; j < p_task.partial.size(); j++) {
p_task.partial[j] = p_task.partial[j] / p_task.partial1[j];
}
}
return vector_to_array<T, T4>(p_task.partial);
}
std::shared_ptr<arrow::Array> operator()(std::shared_ptr<arrow::ChunkedArray> column, group *gb, aggregate_task *task) {
int N = 10;
int s = gb != NULL ? gb->get_max_index() : 1;
auto init = [task,s]{return partial_aggregate_task<T>{task, std::vector<T>(s), std::vector<T>(s)};};
tbb::enumerable_thread_specific<partial_aggregate_task<T>> p_tasks(init); // TODO: argument initializer instead
// for all available chunks or sequential for each incoming chunk
tbb::parallel_for(0, column->num_chunks(), [&](auto j) {
sequential(std::static_pointer_cast<T2>(column->chunk(j)), gb, &p_tasks.local(), j);
});
return finalize(p_tasks.combine([](auto l, auto r) { // TODO check reduction times
for (int row_number = 0, sz = l.partial.size(); row_number < sz; row_number++) {
// TODO: get rid of the second vector for functions which don't need it
aggregate_internal<A>(&l, row_number, r.partial[row_number], r.partial1[row_number]);
}
return l;
}));
}
};
};
std::shared_ptr<arrow::Table> aggregate(std::shared_ptr<arrow::Table> table, group *gb, std::vector<aggregate_task*> tasks) {
printf("TASK: aggregating %ld columns %s.\n", tasks.size(), gb != NULL ? "based on group_by" : "to zero column (no group_by)");
auto begin = std::chrono::steady_clock::now();
std::vector<std::shared_ptr<arrow::Column>> clmns;
std::vector<std::shared_ptr<arrow::Field>> flds;
if (gb != NULL) {
// TODO Reserve
clmns = std::move(gb->columns);
flds = std::move(gb->fields);
}
// TBB in parallel for all available tasks TODO
for (auto t : tasks) {
auto column = table->column(t->from_column);
std::shared_ptr<arrow::Array> data;
if (column->type()->id() == arrow::Type::STRING) {
// TODO data = aggregate_PARALLEL<std::string, arrow::StringArray, arrow::StringBuilder>(column->data(), gb, tasks[i]);
} else if (column->type()->id() == arrow::Type::INT64) {
using types = aggregate_types<arrow::Int64Type::c_type, arrow::Int64Array, arrow::Int64Builder>;
data = aggregate_dispatcher::call<types::template function>(t->type, column->data(), gb, t);
} else {
using types = aggregate_types<arrow::DoubleType::c_type, arrow::DoubleArray, arrow::DoubleBuilder>;
data = aggregate_dispatcher::call<types::template function>(t->type, column->data(), gb, t);
}
auto field = column->field();
clmns.push_back(std::make_shared<arrow::Column>(field->name(), data));
flds.push_back(field);
}
auto new_table = arrow::Table::Make(std::make_shared<arrow::Schema>(flds), clmns);
print_time(begin);
return new_table;
}
#endif