Skip to content

Commit

Permalink
[coro_io][coro_rpc][coro_http] support dns cache (#899)
Browse files Browse the repository at this point in the history
* [coro_io][coro_rpc][coro_http] support dns cache

* fix test case

* fix

* remove mac tsan test

* fix
  • Loading branch information
poor-circle authored Feb 11, 2025
1 parent 416e89d commit 3cedfb9
Show file tree
Hide file tree
Showing 10 changed files with 3,436 additions and 2,514 deletions.
77 changes: 64 additions & 13 deletions include/ylt/coro_io/client_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "detail/client_queue.hpp"
#include "io_context_pool.hpp"
#include "ylt/easylog.hpp"
#include "ylt/util/atomic_shared_ptr.hpp"
namespace coro_io {

template <typename client_t, typename io_context_pool_t>
Expand Down Expand Up @@ -117,13 +118,53 @@ class client_pool : public std::enable_shared_from_this<
reconnect_impl(std::unique_ptr<client_t>& client,
std::shared_ptr<client_pool>& self) {
auto pre_time_point = std::chrono::steady_clock::now();
auto result = co_await client->connect(self->host_name_);
auto dns_cache_update_duration =
self->pool_config_.dns_cache_update_duration;
std::vector<asio::ip::tcp::endpoint>* eps_raw_ptr = nullptr;
std::shared_ptr<std::vector<asio::ip::tcp::endpoint>> eps_ptr;
std::vector<asio::ip::tcp::endpoint> eps;
uint64_t old_tp;
if (dns_cache_update_duration.count() >= 0) {
eps_ptr = self->eps_.load(std::memory_order_acquire);
eps_raw_ptr = eps_ptr.get();
old_tp = self->timepoint_.load(std::memory_order_acquire);
auto old_time_point = std::chrono::steady_clock::time_point{
std::chrono::steady_clock::duration{old_tp}};

// check if no dns cache, or cache outdated
if (eps_raw_ptr->empty() || (pre_time_point - old_time_point) >
dns_cache_update_duration) [[unlikely]] {
// start resolve, store result to eps
eps_raw_ptr = &eps;
}
// else, we can used cached eps
}
auto result = co_await client->connect(self->host_name_, eps_raw_ptr);

bool ok = client_t::is_ok(result);
if (dns_cache_update_duration.count() >= 0) {
if ((!ok &&
(eps_raw_ptr != &eps)) // use cache but request failed, clear cache
|| (ok &&
(eps_raw_ptr == &eps))) // don't have cache request ok, set cache
{
if (self->timepoint_.compare_exchange_strong(
old_tp, pre_time_point.time_since_epoch().count(),
std::memory_order_release)) {
auto new_eps_ptr =
std::make_shared<std::vector<asio::ip::tcp::endpoint>>(
std::move(eps));
self->eps_.store(std::move(new_eps_ptr), std::memory_order_release);
}
}
}

auto post_time_point = std::chrono::steady_clock::now();
auto cost_time = std::chrono::duration_cast<std::chrono::milliseconds>(
post_time_point - pre_time_point);
ELOG_TRACE << "reconnect client{" << client.get()
<< "} cost time: " << cost_time / std::chrono::milliseconds{1}
ELOG_TRACE << "reconnect client{" << client.get() << "}"
<< "is success:" << ok
<< ", cost time: " << cost_time / std::chrono::milliseconds{1}
<< "ms";
co_return std::pair{ok, cost_time};
}
Expand Down Expand Up @@ -296,15 +337,15 @@ class client_pool : public std::enable_shared_from_this<
};
template <typename T>
using return_type =
tl::expected<typename lazy_hacker<decltype(std::declval<T>()(
std::declval<client_t&>()))>::type,
std::errc>;
ylt::expected<typename lazy_hacker<decltype(std::declval<T>()(
std::declval<client_t&>()))>::type,
std::errc>;

template <typename T>
using return_type_with_host =
tl::expected<typename lazy_hacker<decltype(std::declval<T>()(
std::declval<client_t&>(), std::string_view{}))>::type,
std::errc>;
ylt::expected<typename lazy_hacker<decltype(std::declval<T>()(
std::declval<client_t&>(), std::string_view{}))>::type,
std::errc>;

public:
struct pool_config {
Expand All @@ -317,6 +358,7 @@ class client_pool : public std::enable_shared_from_this<
std::chrono::milliseconds host_alive_detect_duration{
30000}; /* zero means wont detect */
typename client_t::config client_config;
std::chrono::seconds dns_cache_update_duration{5 * 60}; // 5mins
};

private:
Expand All @@ -336,7 +378,8 @@ class client_pool : public std::enable_shared_from_this<
: host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection){};
free_clients_(pool_config.max_connection),
eps_(std::make_shared<std::vector<asio::ip::tcp::endpoint>>()){};

client_pool(private_construct_token t, client_pools_t* pools_manager_,
std::string_view host_name, const pool_config& pool_config,
Expand All @@ -345,7 +388,8 @@ class client_pool : public std::enable_shared_from_this<
host_name_(host_name),
pool_config_(pool_config),
io_context_pool_(io_context_pool),
free_clients_(pool_config.max_connection){};
free_clients_(pool_config.max_connection),
eps_(std::make_shared<std::vector<asio::ip::tcp::endpoint>>()){};

template <typename T>
async_simple::coro::Lazy<return_type<T>> send_request(
Expand All @@ -356,7 +400,7 @@ class client_pool : public std::enable_shared_from_this<
if (!client) {
ELOG_WARN << "send request to " << host_name_
<< " failed. connection refused.";
co_return return_type<T>{tl::unexpect, std::errc::connection_refused};
co_return return_type<T>{ylt::unexpect, std::errc::connection_refused};
}
if constexpr (std::is_same_v<typename return_type<T>::value_type, void>) {
co_await op(*client);
Expand Down Expand Up @@ -399,6 +443,11 @@ class client_pool : public std::enable_shared_from_this<

std::string_view get_host_name() const noexcept { return host_name_; }

std::shared_ptr<std::vector<asio::ip::tcp::endpoint>> get_remote_endpoints()
const noexcept {
return eps_.load(std::memory_order_acquire);
}

private:
template <typename, typename>
friend class client_pools;
Expand All @@ -416,7 +465,7 @@ class client_pool : public std::enable_shared_from_this<
if (!client) {
ELOG_WARN << "send request to " << endpoint
<< " failed. connection refused.";
co_return return_type_with_host<T>{tl::unexpect,
co_return return_type_with_host<T>{ylt::unexpect,
std::errc::connection_refused};
}
if constexpr (std::is_same_v<typename return_type_with_host<T>::value_type,
Expand Down Expand Up @@ -446,6 +495,8 @@ class client_pool : public std::enable_shared_from_this<
pool_config pool_config_;
io_context_pool_t& io_context_pool_;
std::atomic<bool> is_alive_ = true;
std::atomic<uint64_t> timepoint_;
ylt::util::atomic_shared_ptr<std::vector<asio::ip::tcp::endpoint>> eps_;
};

template <typename client_t,
Expand Down
28 changes: 28 additions & 0 deletions include/ylt/coro_io/coro_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,34 @@ inline async_simple::coro::Lazy<std::error_code> async_connect(
co_return result.first;
}

template <typename executor_t, typename EndPointSeq>
inline async_simple::coro::Lazy<
std::pair<std::error_code, asio::ip::tcp::endpoint>>
async_connect(executor_t *executor, asio::ip::tcp::socket &socket,
const EndPointSeq &endpoint) noexcept {
auto result =
co_await async_io<std::pair<std::error_code, asio::ip::tcp::endpoint>>(
[&](auto &&cb) {
asio::async_connect(socket, endpoint, std::move(cb));
},
socket);
co_return result;
}

template <typename executor_t>
inline async_simple::coro::Lazy<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>
async_resolve(executor_t *executor, asio::ip::tcp::socket &socket,
const std::string &host, const std::string &port) noexcept {
asio::ip::tcp::resolver resolver(executor->get_asio_executor());
co_return co_await async_io<
std::pair<std::error_code, asio::ip::tcp::resolver::iterator>>(
[&](auto &&cb) {
resolver.async_resolve(host, port, std::move(cb));
},
resolver);
}

template <typename Socket>
inline async_simple::coro::Lazy<void> async_close(Socket &socket) noexcept {
callback_awaitor<void> awaitor;
Expand Down
81 changes: 62 additions & 19 deletions include/ylt/coro_rpc/impl/coro_rpc_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,17 +227,21 @@ class coro_rpc_client {
/*!
* Connect server
*
* If connect hasn't been closed, it will be closed first then connect to
* If socket hasn't been closed, it will be closed first then connect to
* server, else the client will connect to server directly
*
* @param host server address
* @param port server port
* @param connect_timeout_duration RPC call timeout seconds
* @param eps endpoints of resolve result. if eps is not nullptr and vector is
* empty, it will return the endpoints that, else if vector is not empty, it
* will use the eps to skill resolve and connect to server directly.
* @return error code
*/
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port,
std::chrono::steady_clock::duration connect_timeout_duration) {
std::chrono::steady_clock::duration connect_timeout_duration,
std::vector<asio::ip::tcp::endpoint> *eps = nullptr) {
auto lock_ok = connect_mutex_.tryLock();
if (!lock_ok) {
co_await connect_mutex_.coScopedLock();
Expand All @@ -253,29 +257,39 @@ class coro_rpc_client {
}
auto ret = co_await connect_impl(
std::chrono::duration_cast<std::chrono::milliseconds>(
connect_timeout_duration));
connect_timeout_duration),
eps);
connect_mutex_.unlock();
co_return std::move(ret);
}
[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint,
std::chrono::steady_clock::duration connect_timeout_duration) {
auto pos = endpoint.find(':');
std::string host(endpoint.substr(0, pos));
std::string port(endpoint.substr(pos + 1));

return connect(std::move(host), std::move(port), connect_timeout_duration);
std::string_view address,
std::chrono::steady_clock::duration connect_timeout_duration,
std::vector<asio::ip::tcp::endpoint> *eps = nullptr) {
auto pos = address.find(':');
std::string host(address.substr(0, pos));
std::string port(address.substr(pos + 1));

return connect(std::move(host), std::move(port), connect_timeout_duration,
eps);
}

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string_view endpoint) {
return connect(endpoint, config_.connect_timeout_duration);
std::string host, std::string port,
std::vector<asio::ip::tcp::endpoint> *eps = nullptr) {
return connect(std::move(host), std::move(port),
config_.connect_timeout_duration, eps);
}

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect(
std::string host, std::string port) {
std::string_view address,
std::vector<asio::ip::tcp::endpoint> *eps = nullptr) {
auto pos = address.find(':');
std::string host(address.substr(0, pos));
std::string port(address.substr(pos + 1));

return connect(std::move(host), std::move(port),
config_.connect_timeout_duration);
config_.connect_timeout_duration, eps);
}

#ifdef YLT_ENABLE_SSL
Expand Down Expand Up @@ -393,7 +407,8 @@ class coro_rpc_client {
static bool is_ok(coro_rpc::err_code ec) noexcept { return !ec; }

[[nodiscard]] async_simple::coro::Lazy<coro_rpc::err_code> connect_impl(
std::chrono::milliseconds conn_timeout_dur) {
std::chrono::milliseconds conn_timeout_dur,
std::vector<asio::ip::tcp::endpoint> *eps) {
if (should_reset_) {
co_await reset();
}
Expand All @@ -415,12 +430,35 @@ class coro_rpc_client {
.start([](auto &&) {
});
}

std::error_code ec = co_await coro_io::async_connect(
&control_->executor_, control_->socket_, config_.host, config_.port);
std::vector<asio::ip::tcp::endpoint> eps_tmp;
if (eps == nullptr) {
eps = &eps_tmp;
}
std::error_code ec;
asio::ip::tcp::resolver::iterator iter;
if (eps->empty()) {
ELOG_TRACE << "start resolve host: " << config_.host << ":"
<< config_.port;
std::tie(ec, iter) = co_await coro_io::async_resolve(
&control_->executor_, control_->socket_, config_.host, config_.port);
asio::ip::tcp::resolver::iterator end;
while (iter != end) {
eps->push_back(iter->endpoint());
++iter;
}
if (eps->empty()) [[unlikely]] {
co_return errc::not_connected;
}
}
ELOG_TRACE << "start connect to endpoint lists. total endpoint count:"
<< eps->size()
<< ", the first endpoint is: " << (*eps)[0].address().to_string()
<< std::to_string((*eps)[0].port());
asio::ip::tcp::endpoint endpoint;
std::tie(ec, endpoint) = co_await coro_io::async_connect(
&control_->executor_, control_->socket_, *eps);
std::error_code err_code;
timer_->cancel(err_code);

if (control_->is_timeout_) {
ELOG_WARN << "client_id " << config_.client_id << " connect timeout";
co_return errc::timed_out;
Expand All @@ -430,6 +468,10 @@ class coro_rpc_client {
<< " failed:" << ec.message();
co_return errc::not_connected;
}
ELOG_INFO << "connect successful, the endpoint is: "
<< endpoint.address().to_string() + ":" +
std::to_string(endpoint.port());

if (config_.enable_tcp_no_delay == true) {
control_->socket_.set_option(asio::ip::tcp::no_delay(true), ec);
}
Expand Down Expand Up @@ -1155,6 +1197,7 @@ class coro_rpc_client {
std::unique_ptr<coro_io::period_timer> timer_;
std::shared_ptr<control_t> control_;
std::string_view req_attachment_;
std::vector<asio::ip::tcp::endpoint> endpoints_;
config config_;
constexpr static std::size_t default_read_buf_size_ = 256;
#ifdef YLT_ENABLE_SSL
Expand Down
18 changes: 0 additions & 18 deletions include/ylt/coro_rpc/impl/expected.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@
#endif

namespace coro_rpc {
#if __cpp_lib_expected >= 202202L && __cplusplus > 202002L
template <class T, class E>
using expected = std::expected<T, E>;

template <class T>
using unexpected = std::unexpected<T>;

using unexpect_t = std::unexpect_t;

#else
template <class T, class E>
using expected = tl::expected<T, E>;

template <class T>
using unexpected = tl::unexpected<T>;

using unexpect_t = tl::unexpect_t;
#endif

namespace protocol {
struct coro_rpc_protocol;
Expand Down
Loading

0 comments on commit 3cedfb9

Please sign in to comment.