Skip to content
This repository has been archived by the owner on Oct 6, 2023. It is now read-only.

Commit

Permalink
Mon 6367 ipv4 bad flush 20.10.x (#477)
Browse files Browse the repository at this point in the history
This patch is essentially an improvement on the tcp stream write/flush functions.

    If the write function cannot start because of too much retention, writing is begun by flush(). But it was unab
    writing properly. Moreover, there was a possibility to loop into an infinite loop because of a 0 value returne

    Refs: MON-6367
  • Loading branch information
bouda1 authored Nov 23, 2020
1 parent 5bd712d commit fec4263
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 35 deletions.
5 changes: 5 additions & 0 deletions doc/en/release_notes/20.10.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ The splitter class is now thread safe and does not need external locks anymore.
It is also far less strict and allows some reading and some writing at the same
time.

TCP
===
Writing on a tcp stream could slow down in case of many retention files. The
issue was essentially in the flush internal function.

************
Enhancements
************
Expand Down
5 changes: 2 additions & 3 deletions tcp/inc/com/centreon/broker/tcp/tcp_connection.hh
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
asio::ip::tcp::socket _socket;
asio::io_context::strand _strand;

std::mutex _data_m;
std::mutex _error_m;
asio::error_code _current_error;

std::mutex _exposed_write_queue_m;
std::queue<std::vector<char>> _exposed_write_queue;
std::queue<std::vector<char>> _write_queue;
std::atomic_bool _write_queue_has_events;
std::atomic_bool _writing;

std::atomic<int32_t> _acks;
Expand All @@ -53,8 +54,6 @@ class tcp_connection : public std::enable_shared_from_this<tcp_connection> {
std::atomic_bool _closed;
std::string _peer;

std::condition_variable _is_writing_cv;

public:
typedef std::shared_ptr<tcp_connection> pointer;
tcp_connection(asio::io_context& io_context,
Expand Down
63 changes: 42 additions & 21 deletions tcp/src/tcp_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ tcp_connection::tcp_connection(asio::io_context& io_context,
uint16_t port)
: _socket(io_context),
_strand(io_context),
_write_queue_has_events(false),
_writing(false),
_acks(0),
_reading(false),
Expand Down Expand Up @@ -80,10 +81,27 @@ asio::ip::tcp::socket& tcp_connection::socket() {
* @return 0.
*/
int32_t tcp_connection::flush() {
while (_writing) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
int32_t retval = _acks;
if (_acks) {
/* Do not set it to zero directly, maybe it has already been incremented by
* another operation */
_acks -= retval;
return retval;
}
{
std::lock_guard<std::mutex> lck(_error_m);
if (_current_error) {
std::string msg{std::move(_current_error.message())};
_current_error.clear();
throw exceptions::msg() << msg;
}
}
return 0;
if (_write_queue_has_events && !_writing) {
_writing = true;
// The strand is useful because of the flush() method.
_strand.context().post(std::bind(&tcp_connection::writing, ptr()));
}
return retval;
}

/**
Expand Down Expand Up @@ -150,31 +168,32 @@ static std::string debug_buf(const char* data, int32_t size) {
*/
int32_t tcp_connection::write(const std::vector<char>& v) {
{
std::lock_guard<std::mutex> lck(_data_m);
std::lock_guard<std::mutex> lck(_error_m);
if (_current_error) {
std::string msg{std::move(_current_error.message())};
_current_error.clear();
throw exceptions::msg() << msg;
}
}

{
std::lock_guard<std::mutex> lck(_exposed_write_queue_m);
_exposed_write_queue.push(v);
}
{
std::lock_guard<std::mutex> lck(_exposed_write_queue_m);
_exposed_write_queue.push(v);
}

// If the queue is not empty and the writing work is not started, we start
// it.
if (!_writing) {
_writing = true;
// The strand is useful because of the flush() method.
_strand.context().post(std::bind(&tcp_connection::writing, ptr()));
}
// If the queue is not empty and the writing work is not started, we start
// it.
if (!_writing) {
_writing = true;
// The strand is useful because of the flush() method.
_strand.context().post(std::bind(&tcp_connection::writing, ptr()));
}

int32_t retval = _acks;
/* Do not set it to zero directly, maybe it has already been incremented by
* another operation */
_acks -= retval;

return retval;
}

Expand All @@ -188,11 +207,13 @@ int32_t tcp_connection::write(const std::vector<char>& v) {
* * Launches the async_write.
*/
void tcp_connection::writing() {
if (_write_queue.empty()) {
if (!_write_queue_has_events) {
std::lock_guard<std::mutex> lck(_exposed_write_queue_m);
std::swap(_write_queue, _exposed_write_queue);
_write_queue_has_events = !_write_queue.empty();
}
if (_write_queue.empty()) {
if (!_write_queue_has_events) {

_writing = false;
return;
}
Expand All @@ -211,14 +232,14 @@ void tcp_connection::writing() {
void tcp_connection::handle_write(const asio::error_code& ec) {
if (ec) {
log_v2::tcp()->error("Error while writing on tcp socket: {}", ec.message());
std::lock_guard<std::mutex> lck(_data_m);
std::lock_guard<std::mutex> lck(_error_m);
_current_error = ec;
_writing = false;
} else {
std::lock_guard<std::mutex> lck(_data_m);
++_acks;
_write_queue.pop();
if (!_write_queue.empty()) {
_write_queue_has_events = !_write_queue.empty();
if (_write_queue_has_events) {
// The strand is useful because of the flush() method.
asio::async_write(_socket, asio::buffer(_write_queue.front()),
_strand.wrap(std::bind(&tcp_connection::handle_write,
Expand Down Expand Up @@ -291,7 +312,7 @@ void tcp_connection::close() {
*/
std::vector<char> tcp_connection::read(time_t timeout_time, bool* timeout) {
{
std::lock_guard<std::mutex> lck(_data_m);
std::lock_guard<std::mutex> lck(_error_m);
if (_current_error) {
std::string msg{std::move(_current_error.message())};
_current_error.clear();
Expand Down
44 changes: 33 additions & 11 deletions tcp/test/acceptor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ TEST(TcpAcceptor, Nominal) {
}
}
s_centengine->write(data_write);
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

centengine.join();
Expand Down Expand Up @@ -158,7 +160,9 @@ TEST(TcpAcceptor, QuestionAnswer) {
}
s_cbd->write(data_write);
}
s_cbd->flush();
int retry = 10;
while (retry-- && s_cbd->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

std::thread centengine([] {
Expand Down Expand Up @@ -303,7 +307,9 @@ TEST(TcpAcceptor, MultiNominal) {
}
}
s_centengine->write(data_write);
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(cbd_m);
cbd_cv.wait(lock, [&cbd_finished] { return cbd_finished; });
});
Expand Down Expand Up @@ -348,7 +354,9 @@ TEST(TcpAcceptor, NominalReversed) {
}
}
s_centengine->write(data_write);
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

std::this_thread::sleep_for(std::chrono::milliseconds(500));
Expand Down Expand Up @@ -406,7 +414,9 @@ TEST(TcpAcceptor, OnePeer) {
}
}
s_centengine->write(data_write);
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

std::thread cbd([] {
Expand Down Expand Up @@ -500,7 +510,9 @@ TEST(TcpAcceptor, OnePeerReversed) {
}
}
s_centengine->write(data_write);
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

centengine.join();
Expand Down Expand Up @@ -537,7 +549,9 @@ TEST(TcpAcceptor, MultiOnePeer) {
std::shared_ptr<io::raw> data_write = std::make_shared<io::raw>();
data_write->append(std::string("Hello2!"));
s_centengine->write(data_write);
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
s_centengine.reset();
i++;
} else
Expand Down Expand Up @@ -621,7 +635,9 @@ TEST(TcpAcceptor, NominalRepeated) {
std::cout << "engine 4 " << i << "\n";
i++;
}
s_centengine->flush();
int retry = 10;
while (retry-- && s_centengine->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

/* We start nb_steps instances of cbd one after the other. Each time, it
Expand Down Expand Up @@ -706,7 +722,9 @@ TEST(TcpAcceptor, Simple) {
std::shared_ptr<io::data> data_read;
data->append(std::string("TEST\n"));
str->write(data);
str->flush();
int retry = 10;
while (retry-- && str->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&finish] { return finish; });
});
Expand Down Expand Up @@ -856,7 +874,9 @@ TEST(TcpAcceptor, CloseRead) {
std::shared_ptr<io::data> data_read;
data->append(std::string("0"));
str->write(data);
str->flush();
int retry = 10;
while (retry-- && str->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}};
std::shared_ptr<io::stream> io;
Expand Down Expand Up @@ -936,7 +956,9 @@ TEST(TcpAcceptor, QuestionAnswerMultiple) {
}
s_cbd->write(data_write);
}
s_cbd->flush();
int retry = 10;
while (retry-- && s_cbd->flush() == 0)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
});

centengine.emplace_back([i] {
Expand Down

0 comments on commit fec4263

Please sign in to comment.