-
Notifications
You must be signed in to change notification settings - Fork 172
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DRAFT] CUDA Scan implementation #1250
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
/* | ||
* Copyright (c) 20224 NVIDIA Corporation | ||
* | ||
* Licensed under the Apache License Version 2.0 with LLVM Exceptions | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* https://llvm.org/LICENSE.txt | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
#pragma once | ||
|
||
#include "../../stdexec/execution.hpp" | ||
#include <type_traits> | ||
#include <ranges> | ||
|
||
#include <cuda/std/type_traits> | ||
|
||
#include <cub/device/device_scan.cuh> | ||
|
||
#include "algorithm_base.cuh" | ||
#include "common.cuh" | ||
#include "../detail/throw_on_cuda_error.cuh" | ||
|
||
namespace nvexec { | ||
namespace STDEXEC_STREAM_DETAIL_NS { | ||
namespace scan_ { | ||
|
||
|
||
template <class SenderId, class ReceiverId, class InitT, class Fun> | ||
struct receiver_t | ||
: public __algo_range_init_fun::receiver_t< | ||
SenderId, | ||
ReceiverId, | ||
InitT, | ||
Fun, | ||
receiver_t<SenderId, ReceiverId, InitT, Fun>> { | ||
using base = __algo_range_init_fun:: | ||
receiver_t<SenderId, ReceiverId, InitT, Fun, receiver_t<SenderId, ReceiverId, InitT, Fun>>; | ||
|
||
template <class Range> | ||
using result_t = typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>; | ||
|
||
template <class Range> | ||
static void set_value_impl(base::__t&& self, Range&& range) noexcept { | ||
cudaError_t status{cudaSuccess}; | ||
cudaStream_t stream = self.op_state_.get_stream(); | ||
|
||
// `range` is produced asynchronously, so we need to wait for it to be ready | ||
if (status = STDEXEC_DBG_ERR(cudaStreamSynchronize(stream)); status != cudaSuccess) { | ||
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); | ||
return; | ||
} | ||
|
||
using value_t = result_t<Range>; | ||
value_t* d_out = static_cast<value_t*>(self.op_state_.temp_storage_); | ||
|
||
void* d_temp_storage{}; | ||
std::size_t temp_storage_size{}; | ||
|
||
auto first = begin(range); | ||
auto last = end(range); | ||
|
||
std::size_t num_items = std::distance(first, last); | ||
|
||
if (status = STDEXEC_DBG_ERR(cub::DeviceScan::ExclusiveScan( | ||
d_temp_storage, | ||
temp_storage_size, | ||
first, | ||
d_out, | ||
self.fun_, | ||
self.init_, | ||
num_items, | ||
stream)); | ||
status != cudaSuccess) { | ||
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); | ||
return; | ||
} | ||
|
||
if (status = STDEXEC_DBG_ERR( // | ||
cudaMallocAsync(&d_temp_storage, temp_storage_size, stream)); | ||
status != cudaSuccess) { | ||
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); | ||
return; | ||
} | ||
|
||
if (status = STDEXEC_DBG_ERR(cub::DeviceScan::ExclusiveScan( | ||
d_temp_storage, | ||
temp_storage_size, | ||
first, | ||
d_out, | ||
self.fun_, | ||
self.init_, | ||
num_items, | ||
stream)); | ||
status != cudaSuccess) { | ||
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); | ||
return; | ||
} | ||
|
||
status = STDEXEC_DBG_ERR(cudaFreeAsync(d_temp_storage, stream)); | ||
self.op_state_.defer_temp_storage_destruction(d_out); | ||
if (status == cudaSuccess) { | ||
int* host_out = new int[10]; | ||
cudaMemcpy(host_out, d_out, 10 * sizeof(int), cudaMemcpyDeviceToHost); | ||
for (size_t i = 0; i < 10; ++i) { | ||
std::cout << host_out[i] << std::endl; | ||
} | ||
self.op_state_.propagate_completion_signal(stdexec::set_value, d_out); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you change |
||
} else { | ||
self.op_state_.propagate_completion_signal(stdexec::set_error, std::move(status)); | ||
} | ||
} | ||
}; | ||
|
||
template <class SenderId, class InitT, class Fun> | ||
struct sender_t | ||
: public __algo_range_init_fun:: | ||
sender_t<SenderId, InitT, Fun, sender_t<SenderId, InitT, Fun>> { | ||
template <class Receiver> | ||
using receiver_t = | ||
stdexec::__t<scan_::receiver_t<SenderId, stdexec::__id<Receiver>, InitT, Fun>>; | ||
|
||
template <class Range> | ||
using _set_value_t = completion_signatures<set_value_t( | ||
::std::add_lvalue_reference_t< | ||
typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>>)>; | ||
|
||
// template <class Range> | ||
// using _set_value_t = completion_signatures<set_value_t( | ||
// std::vector<typename __algo_range_init_fun::binary_invoke_result_t<Range, InitT, Fun>>)>; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure how to get the completion signatures right. Hoping to get some guidance |
||
// // template <class Range, class InitT, class Fun> | ||
// using _set_value_t_spanc = | ||
// completion_signatures<set_value_t(binary_invoke_result_t<Range, InitT, Fun>*)>; | ||
}; | ||
} // namespace scan_ | ||
|
||
struct scan_t { | ||
template <class Sender, class InitT, class Fun> | ||
using __sender = stdexec::__t<scan_::sender_t<stdexec::__id<__decay_t<Sender>>, InitT, Fun>>; | ||
|
||
template <sender Sender, __movable_value InitT, __movable_value Fun = cub::Sum> | ||
__sender<Sender, InitT, Fun> operator()(Sender&& sndr, InitT init, Fun fun) const { | ||
return __sender<Sender, InitT, Fun>{{}, (Sender&&) sndr, (InitT&&) init, (Fun&&) fun}; | ||
} | ||
|
||
template <class InitT, class Fun = cub::Sum> | ||
__binder_back<scan_t, InitT, Fun> operator()(InitT init, Fun fun = {}) const { | ||
return { | ||
{}, | ||
{}, | ||
{(InitT&&) init, (Fun&&) fun} | ||
}; | ||
} | ||
}; | ||
} // namespace STDEXEC_STREAM_DETAIL_NS | ||
|
||
inline constexpr STDEXEC_STREAM_DETAIL_NS::scan_t scan{}; | ||
} // namespace nvexec | ||
|
||
namespace stdexec::__detail { | ||
template <class SenderId, class Init, class Fun> | ||
extern __mconst< | ||
nvexec::STDEXEC_STREAM_DETAIL_NS::scan_::sender_t<__name_of<__t<SenderId>>, Init, Fun>> | ||
__name_of_v<nvexec::STDEXEC_STREAM_DETAIL_NS::scan_::sender_t<SenderId, Init, Fun>>; | ||
} // namespace stdexec::__detail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reuses the
algorithm_base.cuh
. TheExclusiveScan
API I used was the one that allows you to specify an initial value, so I could easily reuse this base. Nearly all ofscan.cuh
is identical toreduce
with the exception of the CUB api they call and the final return type.The difference between the reduce is that it returns a single value where as a scan is to return an array of data so it is very similar.