Skip to content

Commit

Permalink
[ci][coro_rpc] fix ci, add parallel test for coro_rpc
Browse files Browse the repository at this point in the history
  • Loading branch information
poor-circle committed Feb 6, 2025
1 parent 0868878 commit f1e2b7a
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 54 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ubuntu_clang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ jobs:

- name: Test
working-directory: ${{github.workspace}}/build
run: |
export TSAN_OPTIONS="halt_on_error=1"
ctest -C ${{matrix.mode}} -j 1 -V
run: ctest -C ${{matrix.mode}} -j 1 -V

ubuntu_clang_for_liburing:
strategy:
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/ubuntu_gcc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,42 @@ jobs:
working-directory: ${{github.workspace}}/build
run: ctest -C ${{matrix.mode}} -j 1 -V

ubuntu_gcc_tsan:
strategy:
matrix:
mode: [ Debug ]
ssl: [ ON ]
runs-on: ubuntu-22.04

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install Dependencies
run: sudo apt-get install openssl libssl-dev

- name: Install ninja-build tool
uses: seanmiddleditch/gha-setup-ninja@master

- name: ccache
uses: hendrikmuhs/ccache-action@v1.2
with:
key: ${{ github.job }}-${{ matrix.mode}}-ssl( ${{ matrix.ssl}} )

- name: Configure
run: |
CXX=g++ CC=gcc
cmake -B ${{github.workspace}}/build -G Ninja \
-DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DYLT_ENABLE_SSL=${{matrix.ssl}} \
-DUSE_CCACHE=${{env.ccache}} -DENABLE_TSAN=ON
- name: Build
run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}}

- name: Test
working-directory: ${{github.workspace}}/build
run: ctest -C ${{matrix.mode}} -j 1 -V

ubuntu_gcc_for_liburing:
strategy:
matrix:
Expand Down
2 changes: 1 addition & 1 deletion cmake/develop.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ message(STATUS "CORO_RPC_USE_OTHER_RPC: ${CORO_RPC_USE_OTHER_RPC}")
# Enable address sanitizer
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON)

option(ENABLE_TSAN "Enable thread sanitizer" ON)
option(ENABLE_TSAN "Enable thread sanitizer" OFF)

if(ENABLE_SANITIZER AND NOT MSVC)
if (ENABLE_TSAN)
Expand Down
60 changes: 32 additions & 28 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ class coro_rpc_client {
"client has been closed"};
struct config {
uint64_t client_id = get_global_client_id();
std::optional<std::chrono::milliseconds> connect_timeout_duration;
std::optional<std::chrono::milliseconds> request_timeout_duration;
std::chrono::milliseconds connect_timeout_duration =
std::chrono::seconds{30};
std::chrono::milliseconds request_timeout_duration =
std::chrono::seconds{30};
std::string host{};
std::string port{};
bool enable_tcp_no_delay = true;
Expand Down Expand Up @@ -235,8 +237,7 @@ class coro_rpc_client {
*/
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port,
std::chrono::steady_clock::duration connect_timeout_duration =
std::chrono::seconds(30)) {
std::chrono::steady_clock::duration connect_timeout_duration) {
auto lock_ok = connect_mutex_.tryLock();
if (!lock_ok) {
co_await connect_mutex_.coScopedLock();
Expand All @@ -250,27 +251,33 @@ class coro_rpc_client {
if (config_.port.empty()) {
config_.port = std::move(port);
}
if (!config_.connect_timeout_duration) {
config_.connect_timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
connect_timeout_duration);
}

auto ret = co_await connect_impl();
auto ret = co_await connect_impl(
std::chrono::duration_cast<std::chrono::milliseconds>(
connect_timeout_duration));
connect_mutex_.unlock();
co_return std::move(ret);
}
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint,
std::chrono::steady_clock::duration connect_timeout_duration =
std::chrono::seconds(30)) {
std::chrono::steady_clock::duration connect_timeout_duration) {
auto pos = endpoint.find(':');
std::string host(endpoint.substr(0, pos));
std::string port(endpoint.substr(pos + 1));

return connect(std::move(host), std::move(port), connect_timeout_duration);
}

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint) {
return connect(endpoint, config_.connect_timeout_duration);
}

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port) {
return connect(std::move(host), std::move(port),
config_.connect_timeout_duration);
}

#ifdef YLT_ENABLE_SSL

[[nodiscard]] bool init_ssl(std::string_view cert_base_path,
Expand All @@ -296,7 +303,7 @@ class coro_rpc_client {
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_result<decltype(get_return_type<func>())>> call(
Args &&...args) {
return call_for<func>(std::chrono::seconds(30),
return call_for<func>(config_.request_timeout_duration,
std::forward<Args>(args)...);
}

Expand Down Expand Up @@ -385,7 +392,8 @@ class coro_rpc_client {
}
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl() {
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl(
std::chrono::milliseconds conn_timeout_dur) {
if (should_reset_) {
co_await reset();
}
Expand All @@ -402,7 +410,6 @@ class coro_rpc_client {

ELOG_INFO << "client_id " << config_.client_id << " begin to connect "
<< config_.port;
auto conn_timeout_dur = *config_.connect_timeout_duration;
if (conn_timeout_dur.count() >= 0) {
timeout(*this->timer_, conn_timeout_dur, "connect timer canceled")
.start([](auto &&) {
Expand Down Expand Up @@ -935,27 +942,27 @@ class coro_rpc_client {
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request(Args &&...args) {
return send_request_for_with_attachment<func>(std::chrono::seconds{30}, {},
std::forward<Args>(args)...);
return send_request_for_with_attachment<func>(
config_.request_timeout_duration, {}, std::forward<Args>(args)...);
}

template <auto func, typename... Args>
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request_with_attachment(std::string_view request_attachment,
Args &&...args) {
return send_request_for_with_attachment<func>(std::chrono::seconds{30},
request_attachment,
std::forward<Args>(args)...);
return send_request_for_with_attachment<func>(
config_.request_timeout_duration, request_attachment,
std::forward<Args>(args)...);
}

template <auto func, typename... Args>
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request_for(Args &&...args) {
return send_request_for_with_attachment<func>(std::chrono::seconds{30},
std::string_view{},
std::forward<Args>(args)...);
return send_request_for_with_attachment<func>(
config_.request_timeout_duration, std::string_view{},
std::forward<Args>(args)...);
}

struct recving_guard {
Expand Down Expand Up @@ -986,14 +993,11 @@ class coro_rpc_client {
using rpc_return_t = decltype(get_return_type<func>());
recving_guard guard(control_.get());
uint32_t id;
if (!config_.request_timeout_duration) {
config_.request_timeout_duration = request_timeout_duration;
}

auto timer = std::make_unique<coro_io::period_timer>(
control_->executor_.get_asio_executor());
auto result = co_await send_request_for_impl<func>(
*config_.request_timeout_duration, id, *timer, request_attachment,
request_timeout_duration, id, *timer, request_attachment,
std::forward<Args>(args)...);
auto &control = *control_;
if (!result) {
Expand Down
1 change: 1 addition & 0 deletions src/coro_rpc/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(TEST_SRCS
test_connection.cpp
test_function_name.cpp
test_variadic.cpp
test_parallel.cpp
)
set(TEST_COMMON
rpc_api.cpp
Expand Down
26 changes: 4 additions & 22 deletions src/coro_rpc/tests/test_coro_rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -491,11 +491,8 @@ TEST_CASE("testing client timeout") {
config.connect_timeout_duration = 0ms;
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
1000ms); // this arg won't update config connect timeout duration.
auto ret = client.connect("127.0.0.1", "8801");
auto val = syncAwait(ret);

if (val) {
CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message());
}
Expand All @@ -508,8 +505,8 @@ TEST_CASE("testing client timeout") {
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
1000ms); // this arg won't update config connect timeout duration.
"127.0.0.1",
"8801"); // this arg won't update config connect timeout duration.
auto val = syncAwait(ret);

CHECK(!val);
Expand All @@ -522,14 +519,11 @@ TEST_CASE("testing client timeout") {
config.request_timeout_duration = 0ms;
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
0ms); // 0ms won't cover config connect timeout duration.
auto ret = client.connect("127.0.0.1", "8801");
auto val = syncAwait(ret);

CHECK(!val);
auto result = syncAwait(client.call<hello>());

if (result.has_value()) {
std::cout << result.value() << std::endl;
}
Expand Down Expand Up @@ -558,18 +552,6 @@ TEST_CASE("testing client timeout") {
auto val = syncAwait(ret);
CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message());
}
// SUBCASE("call, 0ms timeout") {
// coro_rpc_server server(2, 8801);
// server.async_start().start([](auto&&) {
// });
// coro_rpc_client client;
// auto ec_lazy = client.connect("127.0.0.1", "8801", 5ms);
// auto ec = syncAwait(ec_lazy);
// assert(ec == std::errc{});
// auto ret = client.call_for<hi>(0ms);
// auto val = syncAwait(ret);
// CHECK_MESSAGE(val.error().code == std::errc::timed_out, val.error().msg);
// }
}
TEST_CASE("testing client connect err") {
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
Expand Down
105 changes: 105 additions & 0 deletions src/coro_rpc/tests/test_parallel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
#include <future>
#include <memory>
#include <vector>
#include <ylt/coro_rpc/coro_rpc_client.hpp>
#include <ylt/coro_rpc/coro_rpc_server.hpp>

#include "async_simple/coro/Semaphore.h"
#include "doctest.h"
#include "ylt/coro_io/io_context_pool.hpp"
#include "ylt/coro_rpc/impl/coro_rpc_client.hpp"
#include "ylt/easylog.hpp"
#include "ylt/easylog/record.hpp"
using namespace async_simple::coro;
std::string_view echo(std::string_view data) { return data; }
TEST_CASE("test parall coro_rpc call") {
int thread_cnt = 100;
coro_rpc::coro_rpc_server server(thread_cnt, 9001);
coro_io::io_context_pool pool(thread_cnt);
std::thread thrd{[&] {
pool.run();
}};

server.register_handler<echo>();
server.async_start();
int client_cnt = 500;
int64_t work_cnt_tot = thread_cnt * client_cnt * 10;
std::vector<std::unique_ptr<coro_rpc::coro_rpc_client>> clients;
clients.resize(client_cnt);
std::atomic<int> connected_cnt = 0;
std::atomic<int> work_cnt = 0;
std::promise<void> p, p2;
for (auto& cli : clients) {
cli = std::make_unique<coro_rpc::coro_rpc_client>();
cli->connect("127.0.0.1", "9001", std::chrono::seconds{5})
.via(pool.get_executor())
.start([&](auto&&) {
if (++connected_cnt == client_cnt) {
p.set_value();
}
});
}
p.get_future().wait();
for (int i = 0; i < work_cnt_tot; ++i) {
[](coro_rpc::coro_rpc_client& cli) -> Lazy<void> {
if (!cli.has_closed()) {
auto future_resp = co_await cli.send_request<echo>("hello");
auto result = co_await std::move(future_resp);
if (result.has_value()) {
CHECK(result.value().result() == "hello");
}
}
}(*clients[i % client_cnt])
.via(pool.get_executor())
.start([&](auto&&) {
auto i = ++work_cnt;
if (i == work_cnt_tot) {
p2.set_value();
}
});
}
p2.get_future().wait();
pool.stop();
thrd.join();
};

TEST_CASE("test parall coro_rpc call2") {
int thread_cnt = 100;
coro_rpc::coro_rpc_server server(thread_cnt, 9001);
coro_io::io_context_pool pool(thread_cnt);
std::thread thrd{[&] {
pool.run();
}};
server.register_handler<echo>();
server.async_start();
int client_cnt = 500;
std::vector<std::unique_ptr<coro_rpc::coro_rpc_client>> clients;
clients.resize(client_cnt);
std::atomic<int> work_cnt = 0;
std::promise<void> p;
for (auto& cli : clients) {
cli = std::make_unique<coro_rpc::coro_rpc_client>();
cli->connect("127.0.0.1", "9001", std::chrono::seconds{5})
.via(pool.get_executor())
.start([&](auto&&) {
[](coro_rpc::coro_rpc_client& cli) -> Lazy<void> {
for (int i = 0; i < 1000; ++i)
if (!cli.has_closed()) {
auto result = co_await cli.call<echo>("hello");
if (result.has_value()) {
CHECK(result.value() == "hello");
}
}
}(*cli)
.start([&](auto&&) {
auto i = ++work_cnt;
if (i == client_cnt) {
p.set_value();
}
});
});
}
p.get_future().wait();
pool.stop();
thrd.join();
};

0 comments on commit f1e2b7a

Please sign in to comment.