Skip to content

Commit

Permalink
fix: add syslog UDP port error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
dfranusic committed Jan 20, 2022
1 parent 0b905db commit 98b4ab8
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 50 deletions.
69 changes: 35 additions & 34 deletions src/proto/gdt.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/proto/gdt.pb.enums_only.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace gdt_grpc {
PT_MINK_ERROR_MSG = 6017,
PT_MINK_STATUS = 6018,
PT_MINK_STATUS_MSG = 6019,
PT_MINK_PERSISTENT_CORRELATION = 6020,
PT_CPU_USER_PERCENT = 9000,
PT_CPU_NICE_PERCENT = 9001,
PT_CPU_SYSTEM_PERCENT = 9002,
Expand Down Expand Up @@ -74,6 +75,7 @@ namespace gdt_grpc {
{PT_MINK_ERROR_MSG, "PT_MINK_ERROR_MSG"},
{PT_MINK_STATUS, "PT_MINK_STATUS"},
{PT_MINK_STATUS_MSG, "PT_MINK_STATUS_MSG"},
{PT_MINK_PERSISTENT_CORRELATION, "PT_MINK_PERSISTENT_CORRELATION"},
{PT_CPU_USER_PERCENT, "PT_CPU_USER_PERCENT"},
{PT_CPU_NICE_PERCENT, "PT_CPU_NICE_PERCENT"},
{PT_CPU_SYSTEM_PERCENT, "PT_CPU_SYSTEM_PERCENT"},
Expand Down
1 change: 1 addition & 0 deletions src/proto/gdt.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/proto/gdt.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ enum ParameterType {
PT_MINK_ERROR_MSG = 6017;
PT_MINK_STATUS = 6018;
PT_MINK_STATUS_MSG = 6019;
PT_MINK_PERSISTENT_CORRELATION = 6020;
// cpu info
PT_CPU_USER_PERCENT = 9000;
PT_CPU_NICE_PERCENT = 9001;
Expand Down
3 changes: 1 addition & 2 deletions src/services/json_rpc/events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,7 @@ void EVSrvcMsgRecv::run(gdt::GDTCallbackArgs *args){
gdt::ServiceMessage* smsg = args->get<gdt::ServiceMessage>(gdt::GDT_CB_INPUT_ARGS,
gdt::GDT_CB_ARGS_SRVC_MSG);
auto dd = static_cast<JsonRpcdDescriptor*>(mink::CURRENT_DAEMON);
auto gdt_stream = args->get<gdt::GDTStream>(gdt::GDT_CB_INPUT_ARGS,
gdt::GDT_CB_ARG_STREAM);

// check for missing params
if (smsg->missing_params) {
// TODO stats
Expand Down
84 changes: 70 additions & 14 deletions src/services/sysagent/plugins/syslog/plg_sysagent_syslog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "mink_err_codes.h"
#include "mink_utils.h"
#include <config.h>
#include <exception>
#include <mink_plugin.h>
#include <gdt_utils.h>
#include <string>
Expand Down Expand Up @@ -79,7 +80,12 @@ std::atomic_bool running;
GdtLogSentCb gdt_log_sent_cb;
// permanent stream guid
mink_utils::Guid guid;
int udp_port = -1;


/**********************/
/* zLib error handler */
/**********************/
static void handle_zlib_error(gdt::ServiceMsgManager *smsgm,
gdt::ServiceMessage *smsg,
EVUserCB *ev_usr_cb){
Expand All @@ -97,7 +103,8 @@ static void gdt_push(const std::string data,
gdt::ServiceMsgManager *smsgm,
const std::string &d_type,
const std::string &d_id,
const mink_utils::Guid &guid){
const mink_utils::Guid &guid,
const bool last){

// get daemon pointer
auto dd = static_cast<SysagentdDescriptor *>(mink::CURRENT_DAEMON);
Expand Down Expand Up @@ -169,8 +176,8 @@ static void gdt_push(const std::string data,
// set guid
smsg->vpmap.set_octets(asn1::ParameterType::_pt_mink_guid, guid.data(), 16);
// persistent guid
smsg->vpmap.set_cstr(asn1::ParameterType::_pt_mink_persistent_correlation,
std::to_string(1).c_str());
if (!last) smsg->vpmap.set_cstr(asn1::ParameterType::_pt_mink_persistent_correlation,
std::to_string(1).c_str());

// set source daemon type
smsg->vpmap.set_cstr(asn1::ParameterType::_pt_mink_daemon_type,
Expand All @@ -197,9 +204,7 @@ static void gdt_push(const std::string data,
}


/************************/
/* send service message */
/************************/
// send service message
int r = smsgm->send(smsg,
gdtc,
d_type.c_str(),
Expand All @@ -218,11 +223,14 @@ static void gdt_push(const std::string data,

}

/*****************/
/* Syslog thread */
/*****************/
static void thread_syslog(gdt::ServiceMsgManager *smsgm,
const std::string src_dtype,
const std::string src_did,
const mink_utils::VariantParam *vp_guid,
const int udp_port){
const int port){

using boost::asio::ip::udp;

Expand All @@ -233,22 +241,23 @@ static void thread_syslog(gdt::ServiceMsgManager *smsgm,
udp::endpoint endp;
std::size_t l;
boost::asio::io_context io_ctx;
udp::socket s(io_ctx, udp::endpoint(udp::v4(), udp_port));
udp::socket s(io_ctx, udp::endpoint(udp::v4(), port));

// start receiving syslog packets
while (!mink::CURRENT_DAEMON->DAEMON_TERMINATED && running.load()) {
try{
l = s.receive_from(boost::asio::buffer(data, sizeof(data)), endp);
data[l] = '\0';
// push
gdt_push(data, smsgm, src_dtype, src_did, guid);
gdt_push(data, smsgm, src_dtype, src_did, guid, !running.load());

}catch(std::exception &e){
mink::CURRENT_DAEMON->log(mink::LLT_ERROR,
"plg_syslog: [%s]",
e.what());
}
}
udp_port = -1;
}


Expand All @@ -267,8 +276,24 @@ extern "C" int terminate(mink_utils::PluginManager *pm, mink_utils::PluginDescri
return 0;
}

static int vp_str_to_port(const mink_utils::VariantParam *vp){
if (!vp) return -1;
// port
int port = -1;
try {
port = std::stoi(static_cast<char *>(*vp));

} catch (std::exception &e) {
return -1;
}

return port;
}


// Implementation of "start" command
/*************************************/
/* Implementation of "start" command */
/*************************************/
static void impl_syslog_start(gdt::ServiceMessage *smsg){
using boost::asio::ip::udp;
using Vp = mink_utils::VariantParam;
Expand All @@ -292,8 +317,26 @@ static void impl_syslog_start(gdt::ServiceMessage *smsg){
// save source daemon address
std::string src_type(static_cast<char *>(*vp_src_type));
std::string src_id(static_cast<char *>(*vp_src_id));

// already running
if (udp_port != -1) {
mink::CURRENT_DAEMON->log(mink::LLT_ERROR,
"plg_syslog: already running");
smsg->vpmap.set_cstr(gdt_grpc::PT_MINK_ERROR,
std::to_string(mink::error::EC_UNKNOWN).c_str());
return;
}

// port
int udp_port = std::stoi(static_cast<char *>(*vp_port));
int port = vp_str_to_port(vp_port);
if (port == -1){
mink::CURRENT_DAEMON->log(mink::LLT_ERROR,
"plg_syslog: invalid UDP port value");
smsg->vpmap.set_cstr(gdt_grpc::PT_MINK_ERROR,
std::to_string(mink::error::EC_UNKNOWN).c_str());
return;

}

// check if already running
if (running.load()) {
Expand All @@ -306,13 +349,14 @@ static void impl_syslog_start(gdt::ServiceMessage *smsg){

// setup listener thread
try{
udp_port = port;
running.store(true);
std::thread th(&thread_syslog,
smsg->get_smsg_manager(),
src_type,
src_id,
vp_guid,
udp_port);
port);
th.detach();
smsg->vpmap.set_cstr(asn1::ParameterType::_pt_mink_persistent_correlation,
std::to_string(1).c_str());
Expand All @@ -327,15 +371,27 @@ static void impl_syslog_start(gdt::ServiceMessage *smsg){

}

// Implementation of "stop" command
/************************************/
/* Implementation of "stop" command */
/************************************/
static void impl_syslog_stop(gdt::ServiceMessage *smsg){
using boost::asio::ip::udp;
using Vp = mink_utils::VariantParam;

// port
const Vp *vp_port = smsg->vpget(gdt_grpc::PT_SL_PORT);
if (vp_port == nullptr) return;
int udp_port = std::stoi(static_cast<char *>(*vp_port));

// port
int port = vp_str_to_port(vp_port);
if (port == -1 || port != udp_port){
mink::CURRENT_DAEMON->log(mink::LLT_ERROR,
"plg_syslog: invalid UDP port value");
smsg->vpmap.set_cstr(gdt_grpc::PT_MINK_ERROR,
std::to_string(mink::error::EC_UNKNOWN).c_str());
return;

}

running.store(false);
boost::asio::io_context io_ctx;
Expand Down

0 comments on commit 98b4ab8

Please sign in to comment.