Skip to content

Commit

Permalink
[ci][coro_rpc] fix ci, add parallel test for coro_rpc (alibaba#896)
Browse files Browse the repository at this point in the history
* [ci][coro_rpc] fix ci, add parallel test for coro_rpc

* fix logger

* fix case
  • Loading branch information
poor-circle authored and howardlau1999 committed Feb 16, 2025
1 parent 3ddd607 commit d66ec25
Show file tree
Hide file tree
Showing 8 changed files with 195 additions and 56 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/ubuntu_clang.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,7 @@ jobs:

- name: Test
working-directory: ${{github.workspace}}/build
run: |
export TSAN_OPTIONS="halt_on_error=1"
ctest -C ${{matrix.mode}} -j 1 -V
run: ctest -C ${{matrix.mode}} -j 1 -V

ubuntu_clang_for_liburing:
strategy:
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/ubuntu_gcc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,42 @@ jobs:
working-directory: ${{github.workspace}}/build
run: ctest -C ${{matrix.mode}} -j 1 -V

ubuntu_gcc_tsan:
strategy:
matrix:
mode: [ Debug ]
ssl: [ ON ]
runs-on: ubuntu-22.04

steps:
- name: Checkout
uses: actions/checkout@v4

- name: Install Dependencies
run: sudo apt-get install openssl libssl-dev

- name: Install ninja-build tool
uses: seanmiddleditch/gha-setup-ninja@master

- name: ccache
uses: hendrikmuhs/ccache-action@v1.2
with:
key: ${{ github.job }}-${{ matrix.mode}}-ssl( ${{ matrix.ssl}} )

- name: Configure
run: |
CXX=g++ CC=gcc
cmake -B ${{github.workspace}}/build -G Ninja \
-DCMAKE_BUILD_TYPE=${{matrix.mode}} -DBUILD_WITH_LIBCXX=${{matrix.libcxx}} -DYLT_ENABLE_SSL=${{matrix.ssl}} \
-DUSE_CCACHE=${{env.ccache}} -DENABLE_TSAN=ON
- name: Build
run: cmake --build ${{github.workspace}}/build --config ${{matrix.mode}}

- name: Test
working-directory: ${{github.workspace}}/build
run: ctest -C ${{matrix.mode}} -j 1 -V

ubuntu_gcc_for_liburing:
strategy:
matrix:
Expand Down
2 changes: 1 addition & 1 deletion cmake/develop.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ message(STATUS "CORO_RPC_USE_OTHER_RPC: ${CORO_RPC_USE_OTHER_RPC}")
# Enable address sanitizer
option(ENABLE_SANITIZER "Enable sanitizer(Debug+Gcc/Clang/AppleClang)" ON)

option(ENABLE_TSAN "Enable thread sanitizer" ON)
option(ENABLE_TSAN "Enable thread sanitizer" OFF)

if(ENABLE_SANITIZER AND NOT MSVC)
if (ENABLE_TSAN)
Expand Down
60 changes: 32 additions & 28 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ class coro_rpc_client {
"client has been closed"};
struct config {
uint64_t client_id = get_global_client_id();
std::optional<std::chrono::milliseconds> connect_timeout_duration;
std::optional<std::chrono::milliseconds> request_timeout_duration;
std::chrono::milliseconds connect_timeout_duration =
std::chrono::seconds{30};
std::chrono::milliseconds request_timeout_duration =
std::chrono::seconds{30};
std::string host{};
std::string port{};
bool enable_tcp_no_delay = true;
Expand Down Expand Up @@ -235,8 +237,7 @@ class coro_rpc_client {
*/
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port,
std::chrono::steady_clock::duration connect_timeout_duration =
std::chrono::seconds(30)) {
std::chrono::steady_clock::duration connect_timeout_duration) {
auto lock_ok = connect_mutex_.tryLock();
if (!lock_ok) {
co_await connect_mutex_.coScopedLock();
Expand All @@ -250,27 +251,33 @@ class coro_rpc_client {
if (config_.port.empty()) {
config_.port = std::move(port);
}
if (!config_.connect_timeout_duration) {
config_.connect_timeout_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
connect_timeout_duration);
}

auto ret = co_await connect_impl();
auto ret = co_await connect_impl(
std::chrono::duration_cast<std::chrono::milliseconds>(
connect_timeout_duration));
connect_mutex_.unlock();
co_return std::move(ret);
}
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint,
std::chrono::steady_clock::duration connect_timeout_duration =
std::chrono::seconds(30)) {
std::chrono::steady_clock::duration connect_timeout_duration) {
auto pos = endpoint.find(':');
std::string host(endpoint.substr(0, pos));
std::string port(endpoint.substr(pos + 1));

return connect(std::move(host), std::move(port), connect_timeout_duration);
}

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint) {
return connect(endpoint, config_.connect_timeout_duration);
}

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port) {
return connect(std::move(host), std::move(port),
config_.connect_timeout_duration);
}

#ifdef YLT_ENABLE_SSL

[[nodiscard]] bool init_ssl(std::string_view cert_base_path,
Expand All @@ -296,7 +303,7 @@ class coro_rpc_client {
template <auto func, typename... Args>
async_simple::coro::Lazy<rpc_result<decltype(get_return_type<func>())>> call(
Args &&...args) {
return call_for<func>(std::chrono::seconds(30),
return call_for<func>(config_.request_timeout_duration,
std::forward<Args>(args)...);
}

Expand Down Expand Up @@ -385,7 +392,8 @@ class coro_rpc_client {
}
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl() {
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl(
std::chrono::milliseconds conn_timeout_dur) {
if (should_reset_) {
co_await reset();
}
Expand All @@ -402,7 +410,6 @@ class coro_rpc_client {

ELOG_INFO << "client_id " << config_.client_id << " begin to connect "
<< config_.port;
auto conn_timeout_dur = *config_.connect_timeout_duration;
if (conn_timeout_dur.count() >= 0) {
timeout(*this->timer_, conn_timeout_dur, "connect timer canceled")
.start([](auto &&) {
Expand Down Expand Up @@ -935,27 +942,27 @@ class coro_rpc_client {
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request(Args &&...args) {
return send_request_for_with_attachment<func>(std::chrono::seconds{30}, {},
std::forward<Args>(args)...);
return send_request_for_with_attachment<func>(
config_.request_timeout_duration, {}, std::forward<Args>(args)...);
}

template <auto func, typename... Args>
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request_with_attachment(std::string_view request_attachment,
Args &&...args) {
return send_request_for_with_attachment<func>(std::chrono::seconds{30},
request_attachment,
std::forward<Args>(args)...);
return send_request_for_with_attachment<func>(
config_.request_timeout_duration, request_attachment,
std::forward<Args>(args)...);
}

template <auto func, typename... Args>
async_simple::coro::Lazy<async_simple::coro::Lazy<
async_rpc_result<decltype(get_return_type<func>())>>>
send_request_for(Args &&...args) {
return send_request_for_with_attachment<func>(std::chrono::seconds{30},
std::string_view{},
std::forward<Args>(args)...);
return send_request_for_with_attachment<func>(
config_.request_timeout_duration, std::string_view{},
std::forward<Args>(args)...);
}

struct recving_guard {
Expand Down Expand Up @@ -986,14 +993,11 @@ class coro_rpc_client {
using rpc_return_t = decltype(get_return_type<func>());
recving_guard guard(control_.get());
uint32_t id;
if (!config_.request_timeout_duration) {
config_.request_timeout_duration = request_timeout_duration;
}

auto timer = std::make_unique<coro_io::period_timer>(
control_->executor_.get_asio_executor());
auto result = co_await send_request_for_impl<func>(
*config_.request_timeout_duration, id, *timer, request_attachment,
request_timeout_duration, id, *timer, request_attachment,
std::forward<Args>(args)...);
auto &control = *control_;
if (!result) {
Expand Down
11 changes: 8 additions & 3 deletions include/ylt/easylog.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <functional>
#include <string_view>
#include <utility>
Expand Down Expand Up @@ -75,7 +76,9 @@ class logger {
enable_console_ = enable_console;
}

bool check_severity(Severity severity) { return severity >= min_severity_; }
bool check_severity(Severity severity) {
return severity >= min_severity_.load(std::memory_order::relaxed);
}

void add_appender(std::function<void(std::string_view)> fn) {
appenders_.emplace_back(std::move(fn));
Expand All @@ -85,7 +88,9 @@ class logger {

// set and get
void set_min_severity(Severity severity) { min_severity_ = severity; }
Severity get_min_severity() { return min_severity_; }
Severity get_min_severity() {
return min_severity_.load(std::memory_order::relaxed);
}

void set_console(bool enable) {
enable_console_ = enable;
Expand Down Expand Up @@ -122,7 +127,7 @@ class logger {
}
}

Severity min_severity_ =
std::atomic<Severity> min_severity_ =
#if NDEBUG
Severity::WARN;
#else
Expand Down
1 change: 1 addition & 0 deletions src/coro_rpc/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ set(TEST_SRCS
test_connection.cpp
test_function_name.cpp
test_variadic.cpp
test_parallel.cpp
)
set(TEST_COMMON
rpc_api.cpp
Expand Down
26 changes: 5 additions & 21 deletions src/coro_rpc/tests/test_coro_rpc_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,8 @@ TEST_CASE("testing client timeout") {
config.connect_timeout_duration = 0ms;
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
1000ms); // this arg won't update config connect timeout duration.
auto ret = client.connect("127.0.0.1", "8801");
auto val = syncAwait(ret);

if (val) {
CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message());
}
Expand All @@ -515,8 +512,8 @@ TEST_CASE("testing client timeout") {
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
1000ms); // this arg won't update config connect timeout duration.
"127.0.0.1",
"8801"); // this arg won't update config connect timeout duration.
auto val = syncAwait(ret);

CHECK(!val);
Expand All @@ -529,12 +526,11 @@ TEST_CASE("testing client timeout") {
config.request_timeout_duration = 0ms;
bool r = client.init_config(config);
CHECK(r);
auto ret = client.connect(
"127.0.0.1", "8801",
0ms); // 0ms won't cover config connect timeout duration.
auto ret = client.connect("127.0.0.1", "8801");
auto val = syncAwait(ret);

CHECK(!val);

// TODO will remove via later for 0ms timeout
auto result = syncAwait(client.call<hello>().via(&client.get_executor()));

Expand Down Expand Up @@ -566,18 +562,6 @@ TEST_CASE("testing client timeout") {
auto val = syncAwait(ret);
CHECK_MESSAGE(val == coro_rpc::errc::timed_out, val.message());
}
// SUBCASE("call, 0ms timeout") {
// coro_rpc_server server(2, 8801);
// server.async_start().start([](auto&&) {
// });
// coro_rpc_client client;
// auto ec_lazy = client.connect("127.0.0.1", "8801", 5ms);
// auto ec = syncAwait(ec_lazy);
// assert(ec == std::errc{});
// auto ret = client.call_for<hi>(0ms);
// auto val = syncAwait(ret);
// CHECK_MESSAGE(val.error().code == std::errc::timed_out, val.error().msg);
// }
}
TEST_CASE("testing client connect err") {
coro_rpc_client client(*coro_io::get_global_executor(), g_client_id++);
Expand Down
Loading

0 comments on commit d66ec25

Please sign in to comment.