Skip to content

Commit

Permalink
[coro_io][feat]channel support WRR (#598)
Browse files Browse the repository at this point in the history
* channel support WRR

* improve

* avoid use same port
  • Loading branch information
qicosmos authored Feb 19, 2024
1 parent 86e7cfe commit 5389caa
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 8 deletions.
106 changes: 100 additions & 6 deletions include/ylt/coro_io/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <atomic>
#include <memory>
#include <numeric>
#include <random>

#include "client_pool.hpp"
Expand All @@ -26,7 +27,8 @@ namespace coro_io {

enum class load_blance_algorithm {
RR = 0, // round-robin
random = 1
WRR, // weight round-robin
random,
};

template <typename client_t, typename io_context_pool_t = io_context_pool>
Expand All @@ -51,6 +53,89 @@ class channel {
co_return channel.client_pools_[i % channel.client_pools_.size()];
}
};

/*
Supposing that there is a server set ''S'' = {S0, S1, …, Sn-1};
W(Si) indicates the weight of Si;
''i'' indicates the server selected last time, and ''i'' is initialized with
-1;
''cw'' is the current weight in scheduling, and cw is initialized with zero;
max(S) is the maximum weight of all the servers in S;
gcd(S) is the greatest common divisor of all server weights in S;
while (true) {
i = (i + 1) mod n;
if (i == 0) {
cw = cw - gcd(S);
if (cw <= 0) {
cw = max(S);
if (cw == 0)
return NULL;
}
}
if (W(Si) >= cw)
return Si;
}
*/
struct WRRLoadBlancer {
WRRLoadBlancer(const std::vector<int>& weights) : weights_(weights) {
max_gcd_ = get_max_weight_gcd();
max_weight_ = get_max_weight();
}

async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
int selected = select_host_with_weight_round_robin();
if (selected == -1) {
selected = 0;
}

wrr_current_ = selected;
co_return channel.client_pools_[selected % channel.client_pools_.size()];
}

private:
int select_host_with_weight_round_robin() {
while (true) {
wrr_current_ = (wrr_current_ + 1) % weights_.size();
if (wrr_current_ == 0) {
weight_current_ = weight_current_ - max_gcd_;
if (weight_current_ <= 0) {
weight_current_ = max_weight_;
if (weight_current_ == 0) {
return -1; // can't find max weight server
}
}
}

if (weights_[wrr_current_] >= weight_current_) {
return wrr_current_;
}
}
}

int get_max_weight_gcd() {
int res = weights_[0];
int cur_max = 0, cur_min = 0;
for (size_t i = 0; i < weights_.size(); i++) {
cur_max = (std::max)(res, weights_[i]);
cur_min = (std::min)(res, weights_[i]);
res = std::gcd(cur_max, cur_min);
}
return res;
}

int get_max_weight() {
return *std::max_element(weights_.begin(), weights_.end());
}

std::vector<int> weights_;
int max_gcd_ = 0;
int max_weight_ = 0;
int wrr_current_ = -1;
int weight_current_ = 0;
};

struct RandomLoadBlancer {
async_simple::coro::Lazy<std::shared_ptr<client_pool_t>> operator()(
const channel& channel) {
Expand Down Expand Up @@ -97,20 +182,20 @@ class channel {
return send_request(std::move(op), config_.pool_config.client_config);
}

std::size_t size() const noexcept { return client_pools_.size(); }

static channel create(const std::vector<std::string_view>& hosts,
const channel_config& config = {},
const std::vector<int>& weights = {},
client_pools_t& client_pools =
g_clients_pool<client_t, io_context_pool_t>()) {
channel ch;
ch.init(hosts, config, client_pools);
ch.init(hosts, config, weights, client_pools);
return ch;
}

private:
void init(const std::vector<std::string_view>& hosts,
const channel_config& config, client_pools_t& client_pools) {
const channel_config& config, const std::vector<int>& weights,
client_pools_t& client_pools) {
config_ = config;
client_pools_.reserve(hosts.size());
for (auto& host : hosts) {
Expand All @@ -120,14 +205,23 @@ class channel {
case load_blance_algorithm::RR:
lb_worker = RRLoadBlancer{};
break;
case load_blance_algorithm::WRR: {
if (hosts.empty() || weights.empty()) {
throw std::invalid_argument("host/weight list is empty!");
}
if (hosts.size() != weights.size()) {
throw std::invalid_argument("hosts count is not equal with weights!");
}
lb_worker = WRRLoadBlancer(weights);
} break;
case load_blance_algorithm::random:
default:
lb_worker = RandomLoadBlancer{};
}
return;
}
channel_config config_;
std::variant<RRLoadBlancer, RandomLoadBlancer> lb_worker;
std::variant<RRLoadBlancer, WRRLoadBlancer, RandomLoadBlancer> lb_worker;
std::vector<std::shared_ptr<client_pool_t>> client_pools_;
};

Expand Down
78 changes: 76 additions & 2 deletions src/coro_io/tests/test_channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,80 @@ TEST_CASE("test RR") {
}());
}

TEST_CASE("test WRR") {
SUBCASE(
"exception tests: empty hosts, empty weights test or count not equal") {
CHECK_THROWS_AS(
coro_io::channel<coro_rpc::coro_rpc_client>::create(
{}, {.lba = coro_io::load_blance_algorithm::WRR}, {2, 1}),
std::invalid_argument);

CHECK_THROWS_AS(coro_io::channel<coro_rpc::coro_rpc_client>::create(
{"127.0.0.1:8801", "127.0.0.1:8802"},
{.lba = coro_io::load_blance_algorithm::WRR}),
std::invalid_argument);

CHECK_THROWS_AS(coro_io::channel<coro_rpc::coro_rpc_client>::create(
{"127.0.0.1:8801", "127.0.0.1:8802"},
{.lba = coro_io::load_blance_algorithm::WRR}, {1}),
std::invalid_argument);
}

coro_rpc::coro_rpc_server server1(1, 8801);
auto res = server1.async_start();
REQUIRE_MESSAGE(res, "server start failed");
coro_rpc::coro_rpc_server server2(1, 8802);
auto res2 = server2.async_start();
REQUIRE_MESSAGE(res2, "server start failed");

async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "127.0.0.1:8802"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
hosts, {.lba = coro_io::load_blance_algorithm::WRR}, {2, 1});
for (int i = 0; i < 6; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
if (i == 0 || i == 1) {
CHECK(host == hosts[0]);
}
else if (i == 2 || i == 5) {
CHECK(host == hosts[1]);
}
else if (i == 3 || i == 4) {
CHECK(host == hosts[0]);
}
co_return;
});
CHECK(res.has_value());
}
}());

async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
auto hosts =
std::vector<std::string_view>{"127.0.0.1:8801", "127.0.0.1:8802"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(
hosts, {.lba = coro_io::load_blance_algorithm::WRR}, {0, 0});
for (int i = 0; i < 6; ++i) {
auto res = co_await channel.send_request(
[&i, &hosts](
coro_rpc::coro_rpc_client &client,
std::string_view host) -> async_simple::coro::Lazy<void> {
if (i % 2 == 0) {
CHECK(host == hosts[0]);
}
else {
CHECK(host == hosts[1]);
}
co_return;
});
CHECK(res.has_value());
}
}());
}

TEST_CASE("test Random") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8801);
Expand Down Expand Up @@ -92,10 +166,10 @@ TEST_CASE("test single host") {

TEST_CASE("test send_request config") {
async_simple::coro::syncAwait([]() -> async_simple::coro::Lazy<void> {
coro_rpc::coro_rpc_server server(1, 8802);
coro_rpc::coro_rpc_server server(1, 9813);
auto res = server.async_start();
REQUIRE_MESSAGE(res, "server start failed");
auto hosts = std::vector<std::string_view>{"127.0.0.1:8802"};
auto hosts = std::vector<std::string_view>{"127.0.0.1:9813"};
auto channel = coro_io::channel<coro_rpc::coro_rpc_client>::create(hosts);
for (int i = 0; i < 100; ++i) {
auto config = coro_rpc::coro_rpc_client::config{.client_id = 114514};
Expand Down

0 comments on commit 5389caa

Please sign in to comment.