Skip to content

Commit

Permalink
Extracted IPFIX plugin into separate compile module
Browse files Browse the repository at this point in the history
  • Loading branch information
pavel-odintsov committed Jan 13, 2025
1 parent 4e710fd commit be99c63
Show file tree
Hide file tree
Showing 6 changed files with 421 additions and 287 deletions.
6 changes: 5 additions & 1 deletion src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ add_library(fast_library STATIC fast_library.cpp)
# Our ipfix database library
add_library(ipfix_rfc STATIC ipfix_fields/ipfix_rfc.cpp)

# IPFIX collector as separate module
add_library(ipfix_collector STATIC netflow_plugin/ipfix_collector.cpp)
target_link_libraries(ipfix_collector ipfix_rfc)

add_library(bgp_protocol STATIC bgp_protocol.cpp)

# Here we store some service code for getting IP protocol name by number
Expand Down Expand Up @@ -582,7 +586,7 @@ add_library(netflow STATIC netflow_plugin/netflow.cpp)

# netflow plugin
add_library(netflow_plugin STATIC netflow_plugin/netflow_collector.cpp)
target_link_libraries(netflow_plugin ipfix_rfc netflow netflow_template)
target_link_libraries(netflow_plugin ipfix_collector netflow netflow_template)

if (ENABLE_PCAP_SUPPORT)
# pcap plugin
Expand Down
200 changes: 197 additions & 3 deletions src/netflow_plugin/ipfix_collector.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,201 @@
// That's not a module as we do refactoring right now in small steps
// TODO: place make it proper module
#include <cstdint>
#include <fstream>
#include <string>

#include "../fast_library.hpp"

#include "netflow.hpp"

#include "ipfix_metrics.hpp"

#include "netflow_template.hpp"

#include "netflow_meta_info.hpp"

// We use structures defined in netflow_meta_info.hpp here
#include "ipfix.hpp"

#include "netflow_v9.hpp"

#include "../ipfix_fields/ipfix_rfc.hpp"

#include "../simple_packet_parser_ng.hpp"

#include <boost/serialization/map.hpp>
#include <boost/serialization/vector.hpp>

#include <boost/archive/xml_iarchive.hpp>
#include <boost/archive/xml_oarchive.hpp>

#include "../fastnetmon_configuration_scheme.hpp"

#include "ipfix_metrics.hpp"

extern process_packet_pointer netflow_process_func_ptr;

extern uint64_t flowsets_per_packet_maximum_number;

extern int64_t netflow_ipfix_all_protocols_total_flows;

// TODO: get rid of it ASAP
// Copy an int (possibly shorter than the target) keeping their LSBs aligned
#define BE_COPY(a) memcpy((u_char*)&a + (sizeof(a) - record_length), data, record_length);


// TODO: get rid of such tricks


template_t* peer_find_template(std::map<std::string, std::map<uint32_t, template_t>>& table_for_lookup,
std::mutex& table_for_lookup_mutex,
uint32_t source_id,
uint32_t template_id,
std::string client_addres_in_string_format);

void add_update_peer_template(const netflow_protocol_version_t& netflow_version,
std::map<std::string, std::map<uint32_t, template_t>>& table_for_add,
std::mutex& table_for_add_mutex,
uint32_t source_id,
uint32_t template_id,
const std::string& client_addres_in_string_format,
const template_t& field_template,
bool& updated,
bool& updated_existing_template);

void update_device_flow_timeouts(const device_timeouts_t& device_timeouts,
std::mutex& structure_mutex,
std::map<std::string, device_timeouts_t>& timeout_storage,
const std::string& client_addres_in_string_format,
const netflow_protocol_version_t& netflow_protocol_version);

void override_packet_fields_from_nested_packet(simple_packet_t& packet, const simple_packet_t& nested_packet);

void update_ipfix_sampling_rate(uint32_t sampling_rate, const std::string& client_addres_in_string_format);

// Access to inaccurate but fast time
extern time_t current_inaccurate_time;

extern log4cpp::Category& logger;

std::mutex global_ipfix_templates_mutex;
std::map<std::string, std::map<uint32_t, template_t>> global_ipfix_templates;

// IPFIX Sampling rates
std::mutex ipfix_sampling_rates_mutex;
std::map<std::string, uint32_t> ipfix_sampling_rates;

// IPFIX per device timeouts
std::mutex ipfix_per_device_flow_timeouts_mutex;
std::map<std::string, device_timeouts_t> ipfix_per_device_flow_timeouts;

extern fastnetmon_configuration_t fastnetmon_global_configuration;

std::vector<system_counter_t> get_ipfix_stats() {
std::vector<system_counter_t> system_counter;

system_counter.push_back(system_counter_t("ipfix_total_flows", ipfix_total_flows, metric_type_t::counter, ipfix_total_flows_desc));
system_counter.push_back(
system_counter_t("ipfix_total_packets", ipfix_total_packets, metric_type_t::counter, ipfix_total_packets_desc));
system_counter.push_back(system_counter_t("ipfix_total_ipv4_flows", ipfix_total_ipv4_flows, metric_type_t::counter,
ipfix_total_ipv4_flows_desc));
system_counter.push_back(system_counter_t("ipfix_total_ipv6_flows", ipfix_total_ipv6_flows, metric_type_t::counter,
ipfix_total_ipv6_flows_desc));

system_counter.push_back(system_counter_t("ipfix_duration_0_seconds", ipfix_duration_0_seconds,
metric_type_t::counter, ipfix_duration_0_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_less_1_seconds", ipfix_duration_less_1_seconds,
metric_type_t::counter, ipfix_duration_less_1_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_less_2_seconds", ipfix_duration_less_2_seconds,
metric_type_t::counter, ipfix_duration_less_2_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_less_3_seconds", ipfix_duration_less_3_seconds,
metric_type_t::counter, ipfix_duration_less_3_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_less_5_seconds", ipfix_duration_less_5_seconds,
metric_type_t::counter, ipfix_duration_less_5_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_less_10_seconds", ipfix_duration_less_10_seconds,
metric_type_t::counter, ipfix_duration_less_10_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_less_15_seconds", ipfix_duration_less_15_seconds,
metric_type_t::counter, ipfix_duration_less_15_seconds_desc));
system_counter.push_back(system_counter_t("ipfix_duration_less_30_seconds", ipfix_duration_less_30_seconds,
metric_type_t::counter, ipfix_duration_less_30_seconds_desc));
system_counter.push_back(system_counter_t("ipfix_duration_less_60_seconds", ipfix_duration_less_60_seconds,
metric_type_t::counter, ipfix_duration_less_60_seconds_desc));
system_counter.push_back(system_counter_t("ipfix_duration_less_90_seconds", ipfix_duration_less_90_seconds,
metric_type_t::counter, ipfix_duration_less_90_seconds_desc));
system_counter.push_back(system_counter_t("ipfix_duration_less_180_seconds", ipfix_duration_less_180_seconds,
metric_type_t::counter, ipfix_duration_less_180_seconds_desc));
system_counter.push_back(system_counter_t("ipfix_duration_exceed_180_seconds", ipfix_duration_exceed_180_seconds,
metric_type_t::counter, ipfix_duration_exceed_180_seconds_desc));

system_counter.push_back(system_counter_t("ipfix_duration_negative", ipfix_duration_negative,
metric_type_t::counter, ipfix_duration_negative_desc));

system_counter.push_back(system_counter_t("ipfix_flows_end_reason_idle_timeout", ipfix_flows_end_reason_idle_timeout,
metric_type_t::counter, ipfix_flows_end_reason_idle_timeout_desc));
system_counter.push_back(system_counter_t("ipfix_flows_end_reason_active_timeout", ipfix_flows_end_reason_active_timeout,
metric_type_t::counter, ipfix_flows_end_reason_active_timeout_desc));
system_counter.push_back(system_counter_t("ipfix_flows_end_reason_end_of_flow_timeout", ipfix_flows_end_reason_end_of_flow_timeout,
metric_type_t::counter, ipfix_flows_end_reason_end_of_flow_timeout_desc));
system_counter.push_back(system_counter_t("ipfix_flows_end_reason_force_end_timeout", ipfix_flows_end_reason_force_end_timeout,
metric_type_t::counter, ipfix_flows_end_reason_force_end_timeout_desc));
system_counter.push_back(system_counter_t("ipfix_flows_end_reason_lack_of_resource_timeout",
ipfix_flows_end_reason_lack_of_resource_timeout, metric_type_t::counter,
ipfix_flows_end_reason_lack_of_resource_timeout_desc));

system_counter.push_back(system_counter_t("ipfix_data_packet_number", ipfix_data_packet_number,
metric_type_t::counter, ipfix_data_packet_number_desc));
system_counter.push_back(system_counter_t("ipfix_data_templates_number", ipfix_data_templates_number,
metric_type_t::counter, ipfix_data_templates_number_desc));
system_counter.push_back(system_counter_t("ipfix_options_templates_number", ipfix_options_templates_number,
metric_type_t::counter, ipfix_options_templates_number_desc));
system_counter.push_back(system_counter_t("ipfix_options_packet_number", ipfix_options_packet_number,
metric_type_t::counter, ipfix_options_packet_number_desc));
system_counter.push_back(system_counter_t("ipfix_packets_with_unknown_templates", ipfix_packets_with_unknown_templates,
metric_type_t::counter, ipfix_packets_with_unknown_templates_desc));
system_counter.push_back(system_counter_t("ipfix_custom_sampling_rate_received", ipfix_custom_sampling_rate_received,
metric_type_t::counter, ipfix_custom_sampling_rate_received_desc));
system_counter.push_back(system_counter_t("ipfix_sampling_rate_changes", ipfix_sampling_rate_changes,
metric_type_t::counter, ipfix_sampling_rate_changes_desc));
system_counter.push_back(system_counter_t("ipfix_marked_zero_next_hop_and_zero_output_as_dropped",
ipfix_marked_zero_next_hop_and_zero_output_as_dropped, metric_type_t::counter,
ipfix_marked_zero_next_hop_and_zero_output_as_dropped_desc));
system_counter.push_back(system_counter_t("ipfix_template_updates_number_due_to_real_changes", ipfix_template_data_updates,
metric_type_t::counter, ipfix_template_data_updates_desc));
system_counter.push_back(system_counter_t("ipfix_packets_with_padding", ipfix_packets_with_padding,
metric_type_t::counter, ipfix_packets_with_padding_desc));
system_counter.push_back(system_counter_t("ipfix_inline_headers", ipfix_inline_headers, metric_type_t::counter,
ipfix_inline_headers_desc));
system_counter.push_back(system_counter_t("ipfix_protocol_version_adjustments", ipfix_protocol_version_adjustments,
metric_type_t::counter, ipfix_protocol_version_adjustments_desc));
system_counter.push_back(system_counter_t("ipfix_too_large_field", ipfix_too_large_field, metric_type_t::counter,
ipfix_too_large_field_desc));
system_counter.push_back(system_counter_t("ipfix_forwarding_status", ipfix_forwarding_status,
metric_type_t::counter, ipfix_forwarding_status_desc));
system_counter.push_back(system_counter_t("ipfix_inline_header_parser_error", ipfix_inline_header_parser_error,
metric_type_t::counter, ipfix_inline_header_parser_error_desc));

system_counter.push_back(system_counter_t("ipfix_inline_encoding_error", ipfix_inline_encoding_error,
metric_type_t::counter, ipfix_inline_encoding_error_desc));

system_counter.push_back(system_counter_t("ipfix_inline_header_parser_success", ipfix_inline_header_parser_success,
metric_type_t::counter, ipfix_inline_header_parser_success_desc));

system_counter.push_back(system_counter_t("ipfix_active_flow_timeout_received", ipfix_active_flow_timeout_received,
metric_type_t::counter, ipfix_active_flow_timeout_received_desc));
system_counter.push_back(system_counter_t("ipfix_inactive_flow_timeout_received", ipfix_inactive_flow_timeout_received,
metric_type_t::counter, ipfix_inactive_flow_timeout_received_desc));

system_counter.push_back(system_counter_t("ipfix_sets_with_anomaly_padding", ipfix_sets_with_anomaly_padding,
metric_type_t::counter, ipfix_sets_with_anomaly_padding_desc));

return system_counter;
}


// https://tools.ietf.org/html/rfc5101#page-18
bool process_ipfix_options_template(const uint8_t* pkt, size_t flowset_length, uint32_t source_id, const std::string& client_addres_in_string_format) {
const ipfix_options_header_common_t* options_template_header = (ipfix_options_header_common_t*)pkt;
Expand Down Expand Up @@ -1234,6 +1427,7 @@ bool ipfix_options_flowset_to_store(const uint8_t* pkt,
return true;
}

void increment_duration_counters_ipfix(int64_t duration);

// This function reads flow set using passed template
// In case of irrecoverable errors it returns false
Expand Down Expand Up @@ -1613,7 +1807,7 @@ bool process_ipfix_data(const uint8_t* pkt,
// All other values may be sign of some kind of issues. For example, it may be template conflicts
// https://pavel.network/its-just-wrong-to-update-ipfix-templates/
if (flowset_padding > 7) {
ipfix_flowsets_with_anomaly_padding++;
ipfix_sets_with_anomaly_padding++;
}

if (number_flowsets > 0x4000) {
Expand Down
13 changes: 13 additions & 0 deletions src/netflow_plugin/ipfix_collector.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#pragma once

std::vector<system_counter_t> get_ipfix_sampling_rates();
std::vector<system_counter_t> get_ipfix_stats();

bool process_ipfix_packet(const uint8_t* packet,
uint32_t udp_packet_length,
const std::string& client_addres_in_string_format,
uint32_t client_ipv4_address);

void load_ipfix_template_cache();
void load_ipfix_sampling_cache();

Loading

0 comments on commit be99c63

Please sign in to comment.