Skip to content

Commit

Permalink
Improved ZMQ handling by adding encryption and compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
lucaderi committed Mar 15, 2016
1 parent 9b4a039 commit 263f312
Show file tree
Hide file tree
Showing 16 changed files with 133 additions and 47 deletions.
2 changes: 2 additions & 0 deletions configure.seed
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ fi
AC_CHECK_LIB([nl], [nl_handle_alloc], [LDFLAGS="${LDFLAGS} -lnl"])
AC_CHECK_LIB([rt], [clock_gettime], [LDFLAGS="${LDFLAGS} -lrt"])

AC_CHECK_LIB([z], [zlibVersion], [LDFLAGS="${LDFLAGS} -lz"; AC_DEFINE_UNQUOTED(HAVE_ZLIB, 1, [zlib is present])])

dnl> ldl (used by edjdb)
AC_CHECK_LIB([dl], [dlopen], [LDFLAGS="${LDFLAGS} -ldl"])

Expand Down
4 changes: 2 additions & 2 deletions doc/README.compilation
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ Prerequisites
- libmysqlclient-dev

On Ubuntu/Debian
- apt-get install build-essential git libglib2.0 libxml2-dev libpcap-dev libtool libtool-bin rrdtool librrd-dev autoconf automake autogen redis-server wget libsqlite3-dev libhiredis-dev libgeoip-dev libcurl4-openssl-dev libpango1.0-dev libcairo2-dev libpng12-dev libmysqlclient-dev libnetfilter-queue-dev libmysqlclient-dev
- apt-get install build-essential git libglib2.0 libxml2-dev libpcap-dev libtool libtool-bin rrdtool librrd-dev autoconf automake autogen redis-server wget libsqlite3-dev libhiredis-dev libgeoip-dev libcurl4-openssl-dev libpango1.0-dev libcairo2-dev libpng12-dev libmysqlclient-dev libnetfilter-queue-dev libmysqlclient-dev zlib1g-dev

You can decide to use MariaDB (instead of MySQL) do "apt-get install libmariadb-client-lgpl-dev" instead.

On Fedora/CentOS
- yum groupinstall "Development tools"
- yum install git autoconf automake autogen libpcap-devel GeoIP-devel hiredis-devel redis glib2-devel libxml2-devel sqlite-devel gcc-c++ libtool wget libcurl-devel pango-devel cairo-devel libpng-devel mysql-devel libnetfilter-queue-devel
- yum install git autoconf automake autogen libpcap-devel GeoIP-devel hiredis-devel redis glib2-devel libxml2-devel sqlite-devel gcc-c++ libtool wget libcurl-devel pango-devel cairo-devel libpng-devel mysql-devel libnetfilter-queue-devel zlib-devel

On MacOSX (using http://brew.sh)
brew install redis hiredis autoconf automake libtool rrdtool wget pkg-config git mysql
Expand Down
6 changes: 4 additions & 2 deletions include/NetworkInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class L7Policer;
class NetworkInterface {
protected:
char *ifname; /**< Network interface name.*/
char *remoteIfname, *remoteIfIPaddr, *remoteProbeIPaddr;
char *remoteIfname, *remoteIfIPaddr, *remoteProbeIPaddr, *remoteProbePublicIPaddr;
string ip_addresses;
int id;
bool bridge_interface, has_mesh_networks_traffic;
Expand Down Expand Up @@ -139,6 +139,7 @@ class NetworkInterface {
inline void setRemoteIfname(char *name) { if(!remoteIfname) remoteIfname = strdup(name); };
inline void setRemoteIfIPaddr(char *ip) { if(!remoteIfIPaddr) remoteIfIPaddr = strdup(ip); };
inline void setRemoteProbeAddr(char *ip) { if(!remoteProbeIPaddr) remoteProbeIPaddr = strdup(ip);};
inline void setRemoteProbePublicAddr(char *ip) { if(!remoteProbePublicIPaddr) remoteProbePublicIPaddr = strdup(ip);};
inline u_int get_flow_size() { return(ndpi_detection_get_sizeof_ndpi_flow_struct()); };
inline u_int get_size_id() { return(ndpi_detection_get_sizeof_ndpi_id_struct()); };
inline char* get_name() { return(ifname); };
Expand Down Expand Up @@ -287,7 +288,8 @@ class NetworkInterface {
inline bool checkProfileSyntax(char *filter) { return(flow_profiles ? flow_profiles->checkProfileSyntax(filter) : false); }
#endif
void setRemoteStats(char *name, char *address, u_int32_t speedMbit,
char *remoteProbeAddress, u_int64_t remBytes, u_int64_t remPkts);
char *remoteProbeAddress, char *remoteProbePublicAddress,
u_int64_t remBytes, u_int64_t remPkts);
};

#endif /* _NETWORK_INTERFACE_H_ */
1 change: 1 addition & 0 deletions include/ParserInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ struct FlowFieldMap {
class ParserInterface : public NetworkInterface {
private:
struct FlowFieldMap *map;
bool once;

int getKeyId(char *sym);
void addMapping(const char *sym, int num);
Expand Down
17 changes: 9 additions & 8 deletions include/Prefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Prefs {
char *data_dir, *install_dir, *docs_dir, *scripts_dir, *callbacks_dir, *export_endpoint;
char *categorization_key;
char *httpbl_key;
char *zmq_encryption_pwd;
Flashstart *flashstart;
char *http_prefix;
char *instance_name;
Expand Down Expand Up @@ -178,14 +179,14 @@ class Prefs {
inline bool shutdownWhenDone() { return(shutdown_when_done); }
inline bool are_taps_enabled() { return(enable_taps); };
inline void set_promiscuous_mode(bool mode) { use_promiscuous_mode = mode; };
inline bool use_promiscuous() { return(use_promiscuous_mode); };
inline bool is_zmq_collector_mode() { return(zmq_collector_mode); }
inline char* get_mysql_host() { return(mysql_host); };
inline char* get_mysql_dbname() { return(mysql_dbname); };
inline char* get_mysql_tablename() { return(mysql_tablename); };
inline char* get_mysql_user() { return(mysql_user); };
inline char* get_mysql_pw() { return(mysql_pw); };

inline bool use_promiscuous() { return(use_promiscuous_mode); };
inline bool is_zmq_collector_mode() { return(zmq_collector_mode); }
inline char* get_mysql_host() { return(mysql_host); };
inline char* get_mysql_dbname() { return(mysql_dbname); };
inline char* get_mysql_tablename() { return(mysql_tablename); };
inline char* get_mysql_user() { return(mysql_user); };
inline char* get_mysql_pw() { return(mysql_pw); };
inline char* get_zmq_encryption_pwd() { return(zmq_encryption_pwd); };
inline char* getInterfaceViewAt(int id) { return((id >= MAX_NUM_INTERFACES) ? NULL : ifViewNames[id].name); }
inline char* getInterfaceAt(int id) { return((id >= MAX_NUM_INTERFACES) ? NULL : ifNames[id].name); }
inline pcap_direction_t getCaptureDirection() { return(captureDirection); }
Expand Down
1 change: 1 addition & 0 deletions include/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Utils {
static char* intoaV4(unsigned int addr, char* buf, u_short bufLen);
static char* intoaV6(struct ndpi_in6_addr ipv6, u_int8_t bitmask, char* buf, u_short bufLen);
static u_int32_t timeval2usec(const struct timeval *tv);
static void xor_encdec(u_char *data, int data_len, u_char *key);
};

#endif /* _UTILS_H_ */
3 changes: 3 additions & 0 deletions include/ntop_includes.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@ extern "C" {
#ifdef HAVE_LDAP
#include <ldap.h>
#endif
#ifdef HAVE_ZLIB
#include <zlib.h>
#endif
};

#include "third-party/uthash.h"
Expand Down
19 changes: 14 additions & 5 deletions ntopng.8
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ or
.RB [ \-Z
.IR <prefix> ]
.RB [ \-\-shutdown\-when\-done ]
.RB [ \-\-zmq\-collector\-mode ]
.RB [ \-\-zmq\-encrypt\-pwd
.IR <apssword> ]
.RB [ \-\-capture\-direction ]
.RB [ \-v ]
.RB [ \-V ]
Expand Down Expand Up @@ -339,6 +342,12 @@ you must use ntopng with \-Z "/ntopng/"
.It \-\-shutdown\-when\-done
Terminate ntopng when the input pcap file is over (debug only).

.It \-\-zmq\-collector\-mode
When ZMQ is used, ntopng acts as a client with respect to the ZMQ producer (typically nProbe). In case nProbe is behind a firewall this paradigm needs to be changed and thus nProbe needs to be started with --zmq-probe-mode and ntiong with --zmq-collector-mode.

.It \-\-zmq\-encrypt\-pwd
This is the passowrd used by the symmetric encryption on the probe side. Note that in case you have multiple ZMQ endpoints, the same password is used for all of them.

.It \-\-capture\-direction
Specify the packet capture direction for packet capture interfaces (no ZMQ).
Supported values are: 0=RX+TX (default), 1=RX only, 2=TX only
Expand Down Expand Up @@ -418,15 +427,15 @@ should not need to specifically install such libraries.
.

.SH USER SUPPORT
Please send bug reports to the ntop-dev <ntop-dev@ntop.org> mailing list. The
Please send bug reports to https://github.com/ntop/ntopng/issues. The
ntopng <ntop@ntop.org> mailing list is used for discussing ntopng usage issues. In
order to post messages on the lists a (free) subscription is required
to limit/avoid spam. Please do NOT contact the author directly unless this is
to limit/avoid spam. Please do NOT contact the authors directly unless this is
a personal question.

Commercial support is available upon request. Please see the ntopng site for further info.
Commercial support is available upon request. Please see the ntop site for further info.

Please send code patches to <patch@ntop.org>.
Please send code patches via the github pull requests mechanism.

.SH LICENCE
ntopng is distributed under the GNU GPL licence (http://www.gnu.org/).
ntopng is distributed under the GNU GPLv3 licence (http://www.gnu.org/).
22 changes: 13 additions & 9 deletions scripts/lua/flow_details.lua
Original file line number Diff line number Diff line change
Expand Up @@ -204,16 +204,20 @@ else
print("<td nowrap><div id=last_seen>" .. formatEpoch(flow["seen.last"]) .. " [" .. secondsToTime(os.time()-flow["seen.last"]) .. " ago]" .. "</div></td></tr>\n")

print("<tr><th width=30% rowspan=3>Total Traffic</th><td>Total: <span id=volume>" .. bytesToSize(flow["bytes"]) .. "</span> <span id=volume_trend></span></td>")
print("<td><A HREF=https://en.wikipedia.org/wiki/Goodput>Goodput</A>: <span id=goodput_volume>" .. bytesToSize(flow["goodput_bytes"]) .. "</span> (<span id=goodput_percentage>")
pctg = round((flow["goodput_bytes"]*100)/flow["bytes"], 1)
if(pctg < 50) then
pctg = "<font color=red>"..pctg.."</font>"
elseif(pctg < 60) then
pctg = "<font color=orange>"..pctg.."</font>"
if(ifstats.type ~= "zmq") then
print("<td><A HREF=https://en.wikipedia.org/wiki/Goodput>Goodput</A>: <span id=goodput_volume>" .. bytesToSize(flow["goodput_bytes"]) .. "</span> (<span id=goodput_percentage>")
pctg = round((flow["goodput_bytes"]*100)/flow["bytes"], 1)
if(pctg < 50) then
pctg = "<font color=red>"..pctg.."</font>"
elseif(pctg < 60) then
pctg = "<font color=orange>"..pctg.."</font>"
end
print(pctg)

print("</span> %) <span id=goodput_volume_trend></span> </td></tr>\n")
else
print("<td>&nbsp;</td></tr>\n")
end
print(pctg)

print("</span> %) <span id=goodput_volume_trend></span> </td></tr>\n")

print("<tr><td nowrap>Client <i class=\"fa fa-arrow-right\"></i> Server: <span id=cli2srv>" .. formatPackets(flow["cli2srv.packets"]) .. " / ".. bytesToSize(flow["cli2srv.bytes"]) .. "</span> <span id=sent_trend></span></td><td nowrap>Client <i class=\"fa fa-arrow-left\"></i> Server: <span id=srv2cli>" .. formatPackets(flow["srv2cli.packets"]) .. " / ".. bytesToSize(flow["srv2cli.bytes"]) .. "</span> <span id=rcvd_trend></span></td></tr>\n")

Expand Down
5 changes: 3 additions & 2 deletions scripts/lua/if_stats.lua
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,9 @@ if((page == "overview") or (page == nil)) then

if(ifstats["remote.name"] ~= nil) then
print("<tr><th>Remote Probe</th><td nowrap><b>Interface Name</b>: "..ifstats["remote.name"].." [ ".. maxRateToString(ifstats.speed*1000) .." ]</td>")
if(ifstats["remote.ip"] ~= "") then print("<td nowrap><b>Interface IP</b>: '"..ifstats["remote.ip"].."'</td>") end
print("<td nowrap><b>Probe IP</b>: "..ifstats["probe.ip"].."</td></tr>\n")
if(ifstats["remote.ip"] ~= "") then print("<td nowrap><b>Interface IP</b>: "..ifstats["remote.if_addr"].."</td>") end
print("<td nowrap><b>Probe IP</b>: "..ifstats["probe.ip"].."</td>")
if(ifstats["probe.public_ip"] ~= nil) then print("<td nowrap><b>Public Probe IP</b>: "..ifstats["probe.public_ip"].."</td></tr>\n") end
end

print("<tr><th width=250>Name</th><td colspan=2>" .. ifstats.name.."</td>\n")
Expand Down
42 changes: 38 additions & 4 deletions src/CollectorInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,49 @@ void CollectorInterface::collect_flows() {
size = zmq_recv(items[source_id].socket, payload, payload_len, 0);

if(size > 0) {
char *uncompressed = NULL;
u_int uncompressed_len;

payload[size] = '\0';

if(payload[0] == 0 /* Compressed traffic */) {
#ifdef HAVE_ZLIB
int err;
uLongf uLen;

uLen = uncompressed_len = 3*size;
uncompressed = (char*)malloc(uncompressed_len+1);
if((err = uncompress((Bytef*)uncompressed, &uLen, (Bytef*)&payload[1], size-1)) != Z_OK) {
ntop->getTrace()->traceEvent(TRACE_ERROR, "Uncompress error [%d]", err);
return;
}

uncompressed_len = uLen, uncompressed[uLen] = '\0';
#else
static bool once = false;

if(!once)
ntop->getTrace()->traceEvent(TRACE_ERROR, "Unable to uncompress ZMQ traffic"), once = true;

return;
#endif
} else
uncompressed = payload, uncompressed_len = size;

if(ntop->getPrefs()->get_zmq_encryption_pwd())
Utils::xor_encdec((u_char*)uncompressed, uncompressed_len, (u_char*)ntop->getPrefs()->get_zmq_encryption_pwd());

/* ntop->getTrace()->traceEvent(TRACE_NORMAL, "%s", uncompressed); */

if(strcmp(h.url, "event") == 0)
parseEvent(payload, size, source_id, this);
parseEvent(uncompressed, uncompressed_len, source_id, this);
else
parseFlow(payload, size, source_id, this);
parseFlow(uncompressed, uncompressed_len, source_id, this);

ntop->getTrace()->traceEvent(TRACE_INFO, "[%u] %s", h.size, payload);
}
if(uncompressed) free(uncompressed);

ntop->getTrace()->traceEvent(TRACE_INFO, "[%u] %s", uncompressed_len, uncompressed);
} /* size > 0 */
}
} /* for */
}
Expand Down
2 changes: 1 addition & 1 deletion src/Flow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Flow::Flow(NetworkInterface *_iface,
dump_flow_traffic = false, ndpi_proto_name = NULL,
ndpiDetectedProtocol.protocol = NDPI_PROTOCOL_UNKNOWN,
ndpiDetectedProtocol.master_protocol = NDPI_PROTOCOL_UNKNOWN,
doNotExpireBefore = iface->getTimeLastPktRcvd() + 5 /* sec */;
doNotExpireBefore = iface->getTimeLastPktRcvd() + 30 /* sec */;

ndpiFlow = NULL, cli_id = srv_id = NULL, client_proc = server_proc = NULL;
json_info = strdup("{}"), cli2srv_direction = true, twh_over = false,
Expand Down
11 changes: 7 additions & 4 deletions src/NetworkInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ static void free_wrapper(void *freeable) { free(freeable); }

/* Method used for collateral activities */
NetworkInterface::NetworkInterface() {
ifname = remoteIfname = remoteIfIPaddr = remoteProbeIPaddr = NULL, flows_hash = NULL, hosts_hash = NULL,
ifname = remoteIfname = remoteIfIPaddr = remoteProbeIPaddr = NULL, remoteProbePublicIPaddr = NULL, flows_hash = NULL, hosts_hash = NULL,
ndpi_struct = NULL,
purge_idle_flows_hosts = true, id = (u_int8_t)-1,
sprobe_interface = false, has_vlan_packets = false,
Expand Down Expand Up @@ -96,7 +96,7 @@ NetworkInterface::NetworkInterface(const char *name) {
if(name == NULL) name = "1"; /* First available interface */
#endif

remoteIfname = remoteIfIPaddr = remoteProbeIPaddr = NULL;
remoteIfname = remoteIfIPaddr = remoteProbeIPaddr = remoteProbePublicIPaddr = NULL;
if(strcmp(name, "-") == 0) name = "stdin";

if(ntop->getRedis())
Expand Down Expand Up @@ -415,6 +415,7 @@ NetworkInterface::~NetworkInterface() {
if(remoteIfname) free(remoteIfname);
if(remoteIfIPaddr) free(remoteIfIPaddr);
if(remoteProbeIPaddr) free(remoteProbeIPaddr);
if(remoteProbePublicIPaddr) free(remoteProbePublicIPaddr);
if(db) delete db;
if(statsManager) delete statsManager;
if(networkStats) delete []networkStats;
Expand Down Expand Up @@ -2267,8 +2268,9 @@ void NetworkInterface::lua(lua_State *vm) {

lua_push_str_table_entry(vm, "name", ifname);
if(remoteIfname) lua_push_str_table_entry(vm, "remote.name", remoteIfname);
if(remoteIfIPaddr) lua_push_str_table_entry(vm, "remote.ip", remoteIfIPaddr);
if(remoteIfIPaddr) lua_push_str_table_entry(vm, "remote.if_addr", remoteIfIPaddr);
if(remoteProbeIPaddr) lua_push_str_table_entry(vm, "probe.ip", remoteProbeIPaddr);
if(remoteProbePublicIPaddr) lua_push_str_table_entry(vm, "probe.public_ip", remoteProbePublicIPaddr);
lua_push_int_table_entry(vm, "id", id);
lua_push_bool_table_entry(vm, "sprobe", get_sprobe_interface());
lua_push_bool_table_entry(vm, "inline", get_inline_interface());
Expand Down Expand Up @@ -2837,11 +2839,12 @@ void NetworkInterface::updateSecondTraffic(time_t when) {
/* **************************************** */

void NetworkInterface::setRemoteStats(char *name, char *address, u_int32_t speedMbit,
char *remoteProbeAddress,
char *remoteProbeAddress, char *remoteProbePublicAddress,
u_int64_t remBytes, u_int64_t remPkts) {
if(name) setRemoteIfname(name);
if(address) setRemoteIfIPaddr(address);
if(remoteProbeAddress) setRemoteProbeAddr(remoteProbeAddress);
if(remoteProbePublicAddress) setRemoteProbePublicAddr(remoteProbePublicAddress);
ifSpeed = speedMbit;
ethStats.setNumBytes(remBytes), ethStats.setNumPackets(remPkts);
}
26 changes: 17 additions & 9 deletions src/ParserInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/* IMPORTANT: keep it in sync with flow_fields_description part of flow_utils.lua */
ParserInterface::ParserInterface(const char *endpoint) : NetworkInterface(endpoint) {
map = NULL;
map = NULL, once = false;

addMapping("IN_BYTES", 1);
addMapping("IN_PKTS", 2);
Expand Down Expand Up @@ -424,7 +424,8 @@ int ParserInterface::getKeyId(char *sym) {
if(o != NULL) {
struct json_object_iterator it = json_object_iter_begin(o);
struct json_object_iterator itEnd = json_object_iter_end(o);
char remote_ifname[32] = { 0 }, remote_ifaddress[64] = { 0 }, remote_probe_address[64] = { 0 };
char remote_ifname[32] = { 0 }, remote_ifaddress[64] = { 0 },
remote_probe_address[64] = { 0 }, remote_probe_public_address[64] = { 0 };
u_int64_t remote_bytes = 0, remote_pkts = 0;
u_int32_t remote_ifspeed = 0;

Expand All @@ -442,6 +443,7 @@ int ParserInterface::getKeyId(char *sym) {
else if(!strcmp(key, "if.ip")) snprintf(remote_ifaddress, sizeof(remote_ifaddress), "%s", value);
else if(!strcmp(key, "if.speed")) remote_ifspeed = atol(value);
else if(!strcmp(key, "probe.ip")) snprintf(remote_probe_address, sizeof(remote_probe_address), "%s", value);
else if(!strcmp(key, "probe.public_ip")) snprintf(remote_probe_public_address, sizeof(remote_probe_public_address), "%s", value);
else if(!strcmp(key, "bytes")) remote_bytes = atol(value);
else if(!strcmp(key, "packets")) remote_pkts = atol(value);

Expand All @@ -451,15 +453,19 @@ int ParserInterface::getKeyId(char *sym) {
} // while json_object_iter_equal

/* Process Flow */
iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed, remote_probe_address, remote_bytes, remote_pkts);
iface->setRemoteStats(remote_ifname, remote_ifaddress, remote_ifspeed,
remote_probe_address, remote_probe_public_address,
remote_bytes, remote_pkts);

/* Dispose memory */
json_object_put(o);
} else {
// if o != NULL
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated or invalid JSON?");
ntop->getTrace()->traceEvent(TRACE_WARNING, "[%u] %s", payload_size, payload);
if(!once)
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, data encrypted or invalid JSON?");
once = true;
// ntop->getTrace()->traceEvent(TRACE_WARNING, "[%u] %s", payload_size, payload);
return -1;
}

Expand Down Expand Up @@ -668,9 +674,11 @@ int ParserInterface::getKeyId(char *sym) {
json_object_put(flow.additional_fields);
} else {
// if o != NULL
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated or invalid JSON?");
ntop->getTrace()->traceEvent(TRACE_WARNING, "[%u] %s", payload_size, payload);
if(!once)
ntop->getTrace()->traceEvent(TRACE_WARNING,
"Invalid message received: your nProbe sender is outdated, data encrypted or invalid JSON?");
once = true;
ntop->getTrace()->traceEvent(TRACE_WARNING, "[%u] %s", payload_size, payload);
return -1;
}

Expand Down
Loading

0 comments on commit 263f312

Please sign in to comment.