From d66ec25dcc26ae1241da10a8ad6096f9f61ae10d Mon Sep 17 00:00:00 2001 From: saipubw Date: Fri, 7 Feb 2025 12:27:37 +0800 Subject: [PATCH] [ci][coro_rpc] fix ci, add parallel test for coro_rpc (#896) * [ci][coro_rpc] fix ci, add parallel test for coro_rpc * fix logger * fix case --- .github/workflows/ubuntu_clang.yml | 4 +- .github/workflows/ubuntu_gcc.yml | 36 ++++++ cmake/develop.cmake | 2 +- include/ylt/coro_rpc/impl/coro_rpc_client.hpp | 60 +++++----- include/ylt/easylog.hpp | 11 +- src/coro_rpc/tests/CMakeLists.txt | 1 + src/coro_rpc/tests/test_coro_rpc_client.cpp | 26 +--- src/coro_rpc/tests/test_parallel.cpp | 111 ++++++++++++++++++ 8 files changed, 195 insertions(+), 56 deletions(-) create mode 100644 src/coro_rpc/tests/test_parallel.cpp diff --git a/.github/workflows/ubuntu_clang.yml b/.github/workflows/ubuntu_clang.yml index 116378786f..88671482c2 100644 --- a/.github/workflows/ubuntu_clang.yml +++ b/.github/workflows/ubuntu_clang.yml @@ -100,9 +100,7 @@ jobs: - name: Test working-directory: ${{github.workspace}}/build - run: | - export TSAN_OPTIONS="halt_on_error=1" - ctest -C ${{matrix.mode}} -j 1 -V + run: ctest -C ${{matrix.mode}} -j 1 -V ubuntu_clang_for_liburing: strategy: diff --git a/.github/workflows/ubuntu_gcc.yml b/.github/workflows/ubuntu_gcc.yml index 8caf389271..2d8ab7408a 100644 --- a/.github/workflows/ubuntu_gcc.yml +++ b/.github/workflows/ubuntu_gcc.yml @@ -48,6 +48,42 @@ jobs: working-directory: ${{github.workspace}}/build run: ctest -C ${{matrix.mode}} -j 1 -V + ubuntu_gcc_tsan: + strategy: + matrix: + mode: [ Debug ] + ssl: [ ON ] + runs-on: ubuntu-22.04 + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Install Dependencies + run: sudo apt-get install openssl libssl-dev + + - name: Install ninja-build tool + uses: seanmiddleditch/gha-setup-ninja@master + + - name: ccache + uses: hendrikmuhs/ccache-action@v1.2 + with: + key: ${{ github.job }}-${{ matrix.mode}}-ssl( ${{ matrix.ssl}} ) + + - name: Configure + run: | + CXX=g++ CC=gcc + cmake -B ${{github.workspace}}/build -G Ninja \ + -DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DYLT_ENABLE_SSL=${{matrix.ssl}} \ + -DUSE_CCACHE=${{env.ccache}} -DENABLE_TSAN=ON + + - name: Build + run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}} + + - name: Test + working-directory: ${{github.workspace}}/build + run: ctest -C ${{matrix.mode}} -j 1 -V + ubuntu_gcc_for_liburing: strategy: matrix: diff --git a/cmake/develop.cmake b/cmake/develop.cmake index 9bd1adcc2b..5935def221 100644 --- a/cmake/develop.cmake +++ b/cmake/develop.cmake @@ -37,7 +37,7 @@ message(STATUS "CORO_RPC_USE_OTHER_RPC: ${CORO_RPC_USE_OTHER_RPC}") # Enable address sanitizer option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON) -option(ENABLE_TSAN "Enable thread sanitizer" ON) +option(ENABLE_TSAN "Enable thread sanitizer" OFF) if(ENABLE_SANITIZER AND NOT MSVC) if (ENABLE_TSAN) diff --git a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp index 6b0fb67961..753d5b8de7 100644 --- a/include/ylt/coro_rpc/impl/coro_rpc_client.hpp +++ b/include/ylt/coro_rpc/impl/coro_rpc_client.hpp @@ -161,8 +161,10 @@ class coro_rpc_client { "client has been closed"}; struct config { uint64_t client_id = get_global_client_id(); - std::optional connect_timeout_duration; - std::optional request_timeout_duration; + std::chrono::milliseconds connect_timeout_duration = + std::chrono::seconds{30}; + std::chrono::milliseconds request_timeout_duration = + std::chrono::seconds{30}; std::string host{}; std::string port{}; bool enable_tcp_no_delay = true; @@ -235,8 +237,7 @@ class coro_rpc_client { */ [[nodiscard]] async_simple::coro::Lazy connect( std::string host, std::string port, - std::chrono::steady_clock::duration connect_timeout_duration = - std::chrono::seconds(30)) { + std::chrono::steady_clock::duration connect_timeout_duration) { auto lock_ok = connect_mutex_.tryLock(); if (!lock_ok) { co_await connect_mutex_.coScopedLock(); @@ -250,20 +251,15 @@ class coro_rpc_client { if (config_.port.empty()) { config_.port = std::move(port); } - if (!config_.connect_timeout_duration) { - config_.connect_timeout_duration = - std::chrono::duration_cast( - connect_timeout_duration); - } - - auto ret = co_await connect_impl(); + auto ret = co_await connect_impl( + std::chrono::duration_cast( + connect_timeout_duration)); connect_mutex_.unlock(); co_return std::move(ret); } [[nodiscard]] async_simple::coro::Lazy connect( std::string_view endpoint, - std::chrono::steady_clock::duration connect_timeout_duration = - std::chrono::seconds(30)) { + std::chrono::steady_clock::duration connect_timeout_duration) { auto pos = endpoint.find(':'); std::string host(endpoint.substr(0, pos)); std::string port(endpoint.substr(pos + 1)); @@ -271,6 +267,17 @@ class coro_rpc_client { return connect(std::move(host), std::move(port), connect_timeout_duration); } + [[nodiscard]] async_simple::coro::Lazy connect( + std::string_view endpoint) { + return connect(endpoint, config_.connect_timeout_duration); + } + + [[nodiscard]] async_simple::coro::Lazy connect( + std::string host, std::string port) { + return connect(std::move(host), std::move(port), + config_.connect_timeout_duration); + } + #ifdef YLT_ENABLE_SSL [[nodiscard]] bool init_ssl(std::string_view cert_base_path, @@ -296,7 +303,7 @@ class coro_rpc_client { template async_simple::coro::Lazy())>> call( Args &&...args) { - return call_for(std::chrono::seconds(30), + return call_for(config_.request_timeout_duration, std::forward(args)...); } @@ -385,7 +392,8 @@ class coro_rpc_client { } static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; } - [[nodiscard]] async_simple::coro::Lazy connect_impl() { + [[nodiscard]] async_simple::coro::Lazy connect_impl( + std::chrono::milliseconds conn_timeout_dur) { if (should_reset_) { co_await reset(); } @@ -402,7 +410,6 @@ class coro_rpc_client { ELOG_INFO << "client_id " << config_.client_id << " begin to connect " << config_.port; - auto conn_timeout_dur = *config_.connect_timeout_duration; if (conn_timeout_dur.count() >= 0) { timeout(*this->timer_, conn_timeout_dur, "connect timer canceled") .start([](auto &&) { @@ -935,8 +942,8 @@ class coro_rpc_client { async_simple::coro::Lazy())>>> send_request(Args &&...args) { - return send_request_for_with_attachment(std::chrono::seconds{30}, {}, - std::forward(args)...); + return send_request_for_with_attachment( + config_.request_timeout_duration, {}, std::forward(args)...); } template @@ -944,18 +951,18 @@ class coro_rpc_client { async_rpc_result())>>> send_request_with_attachment(std::string_view request_attachment, Args &&...args) { - return send_request_for_with_attachment(std::chrono::seconds{30}, - request_attachment, - std::forward(args)...); + return send_request_for_with_attachment( + config_.request_timeout_duration, request_attachment, + std::forward(args)...); } template async_simple::coro::Lazy())>>> send_request_for(Args &&...args) { - return send_request_for_with_attachment(std::chrono::seconds{30}, - std::string_view{}, - std::forward(args)...); + return send_request_for_with_attachment( + config_.request_timeout_duration, std::string_view{}, + std::forward(args)...); } struct recving_guard { @@ -986,14 +993,11 @@ class coro_rpc_client { using rpc_return_t = decltype(get_return_type()); recving_guard guard(control_.get()); uint32_t id; - if (!config_.request_timeout_duration) { - config_.request_timeout_duration = request_timeout_duration; - } auto timer = std::make_unique( control_->executor_.get_asio_executor()); auto result = co_await send_request_for_impl( - *config_.request_timeout_duration, id, *timer, request_attachment, + request_timeout_duration, id, *timer, request_attachment, std::forward(args)...); auto &control = *control_; if (!result) { diff --git a/include/ylt/easylog.hpp b/include/ylt/easylog.hpp index 68ec4f25f8..aa7f1f76cb 100644 --- a/include/ylt/easylog.hpp +++ b/include/ylt/easylog.hpp @@ -14,6 +14,7 @@ * limitations under the License. */ #pragma once +#include #include #include #include @@ -75,7 +76,9 @@ class logger { enable_console_ = enable_console; } - bool check_severity(Severity severity) { return severity >= min_severity_; } + bool check_severity(Severity severity) { + return severity >= min_severity_.load(std::memory_order::relaxed); + } void add_appender(std::function fn) { appenders_.emplace_back(std::move(fn)); @@ -85,7 +88,9 @@ class logger { // set and get void set_min_severity(Severity severity) { min_severity_ = severity; } - Severity get_min_severity() { return min_severity_; } + Severity get_min_severity() { + return min_severity_.load(std::memory_order::relaxed); + } void set_console(bool enable) { enable_console_ = enable; @@ -122,7 +127,7 @@ class logger { } } - Severity min_severity_ = + std::atomic min_severity_ = #if NDEBUG Severity::WARN; #else diff --git a/src/coro_rpc/tests/CMakeLists.txt b/src/coro_rpc/tests/CMakeLists.txt index 8a3924b0cc..bffb3ead34 100644 --- a/src/coro_rpc/tests/CMakeLists.txt +++ b/src/coro_rpc/tests/CMakeLists.txt @@ -8,6 +8,7 @@ set(TEST_SRCS test_connection.cpp test_function_name.cpp test_variadic.cpp + test_parallel.cpp ) set(TEST_COMMON rpc_api.cpp diff --git a/src/coro_rpc/tests/test_coro_rpc_client.cpp b/src/coro_rpc/tests/test_coro_rpc_client.cpp index 0ded43f0f0..6ac02ea119 100644 --- a/src/coro_rpc/tests/test_coro_rpc_client.cpp +++ b/src/coro_rpc/tests/test_coro_rpc_client.cpp @@ -498,11 +498,8 @@ TEST_CASE("testing client timeout") { config.connect_timeout_duration = 0ms; bool r = client.init_config(config); CHECK(r); - auto ret = client.connect( - "127.0.0.1", "8801", - 1000ms); // this arg won't update config connect timeout duration. + auto ret = client.connect("127.0.0.1", "8801"); auto val = syncAwait(ret); - if (val) { CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message()); } @@ -515,8 +512,8 @@ TEST_CASE("testing client timeout") { bool r = client.init_config(config); CHECK(r); auto ret = client.connect( - "127.0.0.1", "8801", - 1000ms); // this arg won't update config connect timeout duration. + "127.0.0.1", + "8801"); // this arg won't update config connect timeout duration. auto val = syncAwait(ret); CHECK(!val); @@ -529,12 +526,11 @@ TEST_CASE("testing client timeout") { config.request_timeout_duration = 0ms; bool r = client.init_config(config); CHECK(r); - auto ret = client.connect( - "127.0.0.1", "8801", - 0ms); // 0ms won't cover config connect timeout duration. + auto ret = client.connect("127.0.0.1", "8801"); auto val = syncAwait(ret); CHECK(!val); + // TODO will remove via later for 0ms timeout auto result = syncAwait(client.call().via(&client.get_executor())); @@ -566,18 +562,6 @@ TEST_CASE("testing client timeout") { auto val = syncAwait(ret); CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message()); } - // SUBCASE("call, 0ms timeout") { - // coro_rpc_server server(2, 8801); - // server.async_start().start([](auto&&) { - // }); - // coro_rpc_client client; - // auto ec_lazy = client.connect("127.0.0.1", "8801", 5ms); - // auto ec = syncAwait(ec_lazy); - // assert(ec == std::errc{}); - // auto ret = client.call_for(0ms); - // auto val = syncAwait(ret); - // CHECK_MESSAGE(val.error().code == std::errc::timed_out, val.error().msg); - // } } TEST_CASE("testing client connect err") { coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++); diff --git a/src/coro_rpc/tests/test_parallel.cpp b/src/coro_rpc/tests/test_parallel.cpp new file mode 100644 index 0000000000..1c73b77b3a --- /dev/null +++ b/src/coro_rpc/tests/test_parallel.cpp @@ -0,0 +1,111 @@ +#include +#include +#include +#include +#include + +#include "async_simple/coro/Semaphore.h" +#include "doctest.h" +#include "ylt/coro_io/io_context_pool.hpp" +#include "ylt/coro_rpc/impl/coro_rpc_client.hpp" +#include "ylt/easylog.hpp" +#include "ylt/easylog/record.hpp" +using namespace async_simple::coro; +std::string_view echo(std::string_view data) { return data; } +TEST_CASE("test parall coro_rpc call") { + auto s = easylog::logger<>::instance().get_min_severity(); + easylog::logger<>::instance().set_min_severity(easylog::Severity::WARNING); + int thread_cnt = 100; + coro_rpc::coro_rpc_server server(thread_cnt, 9001); + coro_io::io_context_pool pool(thread_cnt); + std::thread thrd{[&] { + pool.run(); + }}; + + server.register_handler(); + server.async_start(); + int client_cnt = 200; + int64_t work_cnt_tot = thread_cnt * client_cnt * 5; + std::vector> clients; + clients.resize(client_cnt); + std::atomic connected_cnt = 0; + std::atomic work_cnt = 0; + std::promise p, p2; + for (auto& cli : clients) { + cli = std::make_unique(); + cli->connect("127.0.0.1", "9001", std::chrono::seconds{5}) + .via(pool.get_executor()) + .start([&](auto&&) { + if (++connected_cnt == client_cnt) { + p.set_value(); + } + }); + } + p.get_future().wait(); + for (int i = 0; i < work_cnt_tot; ++i) { + [](coro_rpc::coro_rpc_client& cli) -> Lazy { + if (!cli.has_closed()) { + auto future_resp = co_await cli.send_request("hello"); + auto result = co_await std::move(future_resp); + if (result.has_value()) { + CHECK(result.value().result() == "hello"); + } + } + }(*clients[i % client_cnt]) + .via(pool.get_executor()) + .start([&](auto&&) { + auto i = ++work_cnt; + if (i == work_cnt_tot) { + p2.set_value(); + } + }); + } + p2.get_future().wait(); + pool.stop(); + thrd.join(); + easylog::logger<>::instance().set_min_severity(s); +}; + +TEST_CASE("test parall coro_rpc call2") { + auto s = easylog::logger<>::instance().get_min_severity(); + easylog::logger<>::instance().set_min_severity(easylog::Severity::WARNING); + int thread_cnt = 100; + coro_rpc::coro_rpc_server server(thread_cnt, 9001); + coro_io::io_context_pool pool(thread_cnt); + std::thread thrd{[&] { + pool.run(); + }}; + server.register_handler(); + server.async_start(); + int client_cnt = 200; + std::vector> clients; + clients.resize(client_cnt); + std::atomic work_cnt = 0; + std::promise p; + for (auto& cli : clients) { + cli = std::make_unique(); + cli->connect("127.0.0.1", "9001", std::chrono::seconds{5}) + .via(pool.get_executor()) + .start([&](auto&&) { + [](coro_rpc::coro_rpc_client& cli) -> Lazy { + for (int i = 0; i < 500; ++i) + if (!cli.has_closed()) { + auto result = co_await cli.call("hello"); + if (result.has_value()) { + CHECK(result.value() == "hello"); + } + } + }(*cli) + .start([&](auto&&) { + auto i = ++work_cnt; + if (i == client_cnt) { + p.set_value(); + } + }); + }); + } + p.get_future().wait(); + pool.stop(); + thrd.join(); + easylog::logger<>::instance().set_min_severity(s); +}; \ No newline at end of file