Skip to content

Commit

Permalink
feat: add zlib compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
dfranusic committed Jan 5, 2022
1 parent 9a53e82 commit 4a59df5
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 22 deletions.
4 changes: 3 additions & 1 deletion Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ jrpcd_LDADD = libdaemon.la \
-lcap
if ENABLE_OPENSSL
jrpcd_LDADD += $(CRYPTO_LIBS) \
$(SSL_LIBS)
$(SSL_LIBS) \
$(Z_LIBS)
endif
endif

Expand All @@ -163,6 +164,7 @@ sysagentd_LDADD = libdaemon.la \
libminkplugin.la \
libminkdb.la \
${NCURSES_LIBS} \
${Z_LIBS} \
-lcap
endif

Expand Down
7 changes: 7 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,12 @@ AC_CHECK_LIB([ncurses],
[AC_SUBST([NCURSES_LIBS], ["-lncurses"])],
[AC_MSG_ERROR([ncurses library not found!])])

# zlib
AC_CHECK_LIB([z],
[inflateEnd],
[AC_SUBST([Z_LIBS], ["-lz"])],
[AC_MSG_ERROR([libz not found!])])

# pthread
AC_CHECK_LIB([pthread], [pthread_create], , AC_MSG_ERROR([pthread library not found!]))
# sctp
Expand Down Expand Up @@ -281,6 +287,7 @@ AC_CHECK_HEADERS([ arpa/inet.h \
boost/algorithm/string/trim.hpp \
boost/range/as_array.hpp \
boost/tokenizer.hpp \
zlib.h \
unistd.h], , AC_MSG_ERROR([Header file missing!]))

# Checks for typedefs, structures, and compiler characteristics.
Expand Down
89 changes: 72 additions & 17 deletions src/services/json_rpc/events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <daemon.h>
#include <atomic.h>
#include <gdt.pb.enums_only.h>
#include <zlib.h>

using data_vec_t = std::vector<uint8_t>;
namespace beast = boost::beast;
Expand Down Expand Up @@ -108,6 +109,52 @@ static void handle_error(const mink_utils::VariantParam *vp_err,

}

static std::string sparam_zlib_decmpress(const uint8_t *data,
const std::size_t sz,
char *out_buff,
const std::size_t out_buf_sz){
// output string
std::string s;
// decompress
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
// input size
zs.avail_in = sz;
// input data
zs.next_in = (Bytef *)data;
zs.avail_out = out_buf_sz;
zs.next_out = (Bytef *)out_buff;
// init decompress struct
int z_res = inflateInit(&zs);
if(z_res != Z_OK){
// cleanup
delete data;
}
// decompress
do {
// inflate and flush to buffer
z_res = inflate(&zs, Z_SYNC_FLUSH);
// error
if (z_res < 0 && z_res != Z_BUF_ERROR)
break;

// append uncompressed data
s.append(out_buff, out_buf_sz - zs.avail_out);
// update zlib struct
if (zs.avail_out == 0) {
zs.avail_out = out_buf_sz;
zs.next_out = (Bytef *)out_buff;
}
} while (z_res != Z_STREAM_END);

// zlib cleanup
inflateEnd(&zs);
// res
return s;
}

void EVSrvcMsgRecv::run(gdt::GDTCallbackArgs *args){
gdt::ServiceMessage* smsg = args->get<gdt::ServiceMessage>(gdt::GDT_CB_INPUT_ARGS,
gdt::GDT_CB_ARGS_SRVC_MSG);
Expand Down Expand Up @@ -196,6 +243,10 @@ void EVSrvcMsgRecv::run(gdt::GDTCallbackArgs *args){

// loop GDT params
mink_utils::PooledVPMap<uint32_t>::it_t it = smsg->vpmap.get_begin();

// zlib out buffer
char z_out_buff[65535];

// loop param map
for(; it != smsg->vpmap.get_end(); it++){
// param name from ID
Expand All @@ -208,8 +259,15 @@ void EVSrvcMsgRecv::run(gdt::GDTCallbackArgs *args){
if(pt == mink_utils::DPT_POINTER){
auto data = static_cast<data_vec_t *>((void *)it->second);
try {
std::string s(reinterpret_cast<char *>(data->data()),
data->size());
// output string
std::string s = sparam_zlib_decmpress(data->data(),
data->size(),
z_out_buff,
sizeof(z_out_buff));
// compressed/uncompressed
if (s.empty()) {
s.assign(reinterpret_cast<char *>(data->data()), data->size());
}
// new json object
auto o = json::object();
o[pname] = s;
Expand All @@ -235,23 +293,20 @@ void EVSrvcMsgRecv::run(gdt::GDTCallbackArgs *args){

// STRING as OCTETES (check if printable)
} else if (pt == mink_utils::DPT_OCTETS) {
// sparam data
unsigned char *od = static_cast<unsigned char *>(it->second);
/*
bool is_p = true;
for (int i = 0; i < it->second.get_size(); i++) {
if (!std::isprint(od[i])) {
is_p = false;
break;
}
// output string
val = sparam_zlib_decmpress(od,
it->second.get_size(),
z_out_buff,
sizeof(z_out_buff));

// compressed/uncompressed
if (val.empty()) {
val.assign(reinterpret_cast<char *>(od), it->second.get_size());
}
if (!is_p) continue;
val = std::move(std::string(reinterpret_cast<char *>(od),
it->second.get_size()));
*/
val = std::move(std::string(reinterpret_cast<char *>(od),
it->second.get_size()));

// ignore other types

// ignore other types
} else continue;

// setup json object
Expand Down
2 changes: 1 addition & 1 deletion src/services/sysagent/plugins/openwrt/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pkglib_LTLIBRARIES += plg_sysagent_openwrt.la
plg_sysagent_openwrt_la_SOURCES = src/services/sysagent/plugins/openwrt/plg_sysagent_openwrt.cpp
plg_sysagent_openwrt_la_CPPFLAGS = ${COMMON_INCLUDES} ${GRPC_CFLAGS} -Isrc/proto
plg_sysagent_openwrt_la_LDFLAGS = -version-info 1:0:0 -shared -module -export-dynamic
plg_sysagent_openwrt_la_LIBADD = ${UBUS_LIBS} ${UBOX_LIBS} ${BLOBMSG_JSON_LIBS} ${JSONC_LIBS}
plg_sysagent_openwrt_la_LIBADD = ${UBUS_LIBS} ${UBOX_LIBS} ${BLOBMSG_JSON_LIBS} ${JSONC_LIBS} ${Z_LIBS}
50 changes: 47 additions & 3 deletions src/services/sysagent/plugins/openwrt/plg_sysagent_openwrt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ extern "C" {
#include <libubox/blobmsg_json.h>
}
#include <boost/beast/core/detail/base64.hpp>
#include <zlib.h>

/*********/
/* types */
Expand Down Expand Up @@ -367,7 +368,52 @@ static void ubus_event_cb(ubus_request *req, int type, blob_attr *msg){
->get_param_factory()
->new_param(gdt::SPT_OCTETS);
if(sp){
sp->set_data(ic->ev_usr_cb->buff, strlen(ic->ev_usr_cb->buff));
// compress
z_stream zs;
zs.zalloc = Z_NULL;
zs.zfree = Z_NULL;
zs.opaque = Z_NULL;
// input size
zs.avail_in = strlen(ic->ev_usr_cb->buff);
// input data
zs.next_in = (Bytef *)ic->ev_usr_cb->buff;
//output buffer size
char z_out_buff[zs.avail_in * 2];
zs.avail_out = sizeof(z_out_buff);
zs.next_out = (Bytef *)z_out_buff;
// init struct
if(deflateInit(&zs, Z_BEST_COMPRESSION) != Z_OK){
set_ubus_error(ic->smsg, mink::error::EC_UNKNOWN);
// cleanup
blob_buf_free(&ic->msg);
delete ic->ev_usr_cb;
delete ic;
return;
}
// compress data
int zres = deflate(&zs, Z_FINISH);
if(zres != Z_STREAM_END){
set_ubus_error(ic->smsg, mink::error::EC_UNKNOWN);
// cleanup
blob_buf_free(&ic->msg);
delete ic->ev_usr_cb;
delete ic;
return;

}
// finish
if(deflateEnd(&zs) != Z_OK){
set_ubus_error(ic->smsg, mink::error::EC_UNKNOWN);
// cleanup
blob_buf_free(&ic->msg);
delete ic->ev_usr_cb;
delete ic;
return;
}

// switch buffers
memcpy(ic->ev_usr_cb->buff, z_out_buff, zs.total_out);
sp->set_data(ic->ev_usr_cb->buff, zs.total_out);
sp->set_id(PT_OWRT_UBUS_RESULT);
sp->set_extra_type(0);
pmap->push_back(sp);
Expand Down Expand Up @@ -402,8 +448,6 @@ static void impl_firmware_update(gdt::ServiceMessage *smsg){
auto res = base64::decode(arr.data(), data.data(), data.size());
if (fwrite(arr.data(), res.first, 1, f) != 1)
throw std::invalid_argument("size mismatch while writing file");

fflush(f);
fclose(f);

} catch (std::exception &e) {
Expand Down

0 comments on commit 4a59df5

Please sign in to comment.