Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions configure.ac
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,32 @@ if [[ -z "$WITH_NDP_TRUE" ]]; then
RPM_BUILDREQ+=" netcope-common-devel"
fi

AC_ARG_WITH([ctt],
AC_HELP_STRING([--with-ctt],[Compile ipfixprobe with ctt plugin for using Connection Tracking Table]),
[
if test "$withval" = "yes"; then
withctt="yes"
else
withctt="no"
fi
], [withctt="no"]
)

if test x${withctt} = xyes; then
AC_LANG_PUSH([C++])
CXXFLAGS="$CXXFLAGS -std=c++17"
AC_CHECK_HEADERS([ctt.hpp], [libctt=yes], AC_MSG_ERROR([ctt.hpp not found. Try installing libctt-devel]))
AC_LANG_POP([C++])
fi

AM_CONDITIONAL(WITH_CTT, test x${libctt} = xyes && test x${withctt} = xyes)
if [[ -z "$WITH_CTT_TRUE" ]]; then
AC_DEFINE([WITH_CTT], [1], [Define to 1 if the ctt is available])
LIBS="-lctt $LIBS"
RPM_REQUIRES+=" libctt"
RPM_BUILDREQ+=" libctt-devel"
fi

AC_ARG_WITH([pcap],
AC_HELP_STRING([--with-pcap],[Compile ipfixprobe with pcap plugin for capturing using libpcap library]),
[
Expand Down
10 changes: 10 additions & 0 deletions include/ipfixprobe/flowifc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <stdint.h>
#include <stdlib.h>
#include <sys/time.h>
#include <chrono>

#ifdef WITH_NEMEA
#include <unirec/unirec.h>
Expand Down Expand Up @@ -263,6 +264,14 @@ struct Flow : public Record {
};

uint64_t flow_hash;

#ifdef WITH_CTT
uint64_t flow_hash_ctt; /**< Flow hash for CTT. */
bool record_in_ctt; /**< CTT - offload or not. */
bool is_delayed; /**< Delayed export flag. */
time_t delay_time; /**< Time until export of the flow is delayed. */
#endif

PluginsStatus plugins_status; /**< Statuses of the process plugins for this flow, used to check
if the flow process plugins requires all available data, only
metadata or nothing of this. */
Expand Down Expand Up @@ -290,4 +299,5 @@ struct Flow : public Record {
};

}

#endif /* IPXP_FLOWIFC_HPP */
7 changes: 5 additions & 2 deletions include/ipfixprobe/packet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ namespace ipxp {
* \brief Structure for storing parsed packet fields
*/
struct Packet : public Record {
Metadata_CTT cttmeta; /**< Metadata from CTT */
#ifdef WITH_CTT
Metadata_CTT cttmeta; /**< Metadata from CTT */
bool cttmeta_valid; /**< True if CTT metadata is valid */
#endif /* WITH_CTT */
struct timeval ts;

uint8_t dst_mac[6];
Expand Down Expand Up @@ -108,7 +111,7 @@ struct Packet : public Record {
* \brief Constructor.
*/
Packet() :
ts({0}),
cttmeta_valid(false), ts({0}),
dst_mac(), src_mac(), ethertype(0),
ip_len(0), ip_payload_len(0), ip_version(0), ip_ttl(0),
ip_proto(0), ip_tos(0), ip_flags(0), src_ip({0}), dst_ip({0}), vlan_id(0),
Expand Down
4 changes: 4 additions & 0 deletions include/ipfixprobe/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,10 @@ class StoragePlugin : public Plugin
*/
int plugins_post_create(Flow& rec, const Packet& pkt)
{
// if metadata are valid, add flow hash ctt to the flow record
if (pkt.cttmeta_valid) {
rec.flow_hash_ctt = pkt.cttmeta.flow_hash;
}
PluginStatusConverter plugin_status_converter(m_plugins_status);
int ret = 0;
for (unsigned int i = 0; i < m_plugin_cnt; i++) {
Expand Down
6 changes: 5 additions & 1 deletion input/ndp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <iostream>
#include <netinet/in.h>
#include <sys/types.h>
#include <cstdint>
Expand Down Expand Up @@ -170,7 +171,10 @@ InputPlugin::Result NdpPacketReader::get(PacketBlock &packets)
m_stats.bad_metadata++;
parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
} else {
parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
if (parse_packet_ctt_metadata(&opt, m_parser_stats, ctt, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length) == -1) {
m_stats.bad_metadata++;
parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
}
}
} else {
parse_packet(&opt, m_parser_stats, timestamp, ndp_packet->data, ndp_packet->data_length, ndp_packet->data_length);
Expand Down
21 changes: 16 additions & 5 deletions input/parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

#include "parser.hpp"
#include "headers.hpp"
#include <ipfixprobe/cttmeta.hpp>
#include <ipfixprobe/packet.hpp>

namespace ipxp {
Expand Down Expand Up @@ -776,12 +777,21 @@ void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, cons
opt->pblock->bytes += len;
}

void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen)
int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen)
{
if (opt->pblock->cnt >= opt->pblock->size) {
return;
return 0;
}
Packet *pkt = &opt->pblock->pkts[opt->pblock->cnt];

// check metadata validity
if (metadata.parser_status == PA_OK) {
pkt->cttmeta_valid = true;
} else {
pkt->cttmeta_valid = false;
return -1;
}

pkt->cttmeta = metadata;

pkt->packet_len_wire = len;
Expand Down Expand Up @@ -831,7 +841,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta
stats.pppoe_packets++;
} else { // if not previous, we try delegate to original parser
parse_packet(opt, stats, metadata.ts, data, len, caplen);
return;
return 0;
}

// L4
Expand All @@ -843,11 +853,11 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta
stats.udp_packets++;
} else { // if not previous, we try delegate to original parser
parse_packet(opt, stats, metadata.ts, data, len, caplen);
return;
return 0;
}
} catch (const char *err) {
DEBUG_MSG("%s\n", err);
return;
return 0;
}

if (pkt->vlan_id) {
Expand Down Expand Up @@ -880,6 +890,7 @@ void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Meta
opt->packet_valid = true;
opt->pblock->cnt++;
opt->pblock->bytes += len;
return 0;
}

}
2 changes: 1 addition & 1 deletion input/parser.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ typedef struct parser_opt_s {
*/
void parse_packet(parser_opt_t *opt, ParserStats& stats, struct timeval ts, const uint8_t *data, uint16_t len, uint16_t caplen);

void parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen);
int parse_packet_ctt_metadata(parser_opt_t *opt, ParserStats& stats, const Metadata_CTT& metadata, const uint8_t *data, uint16_t len, uint16_t caplen);

}
#endif /* IPXP_INPUT_PARSER_HPP */
94 changes: 94 additions & 0 deletions storage/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <cstdlib>
#include <iostream>
#include <cstring>
#include <ratio>
#include <sys/time.h>

#include <ipfixprobe/ring.h>
Expand Down Expand Up @@ -138,10 +139,22 @@ void FlowRecord::create(const Packet &pkt, uint64_t hash)
m_flow.src_port = pkt.src_port;
m_flow.dst_port = pkt.dst_port;
}
#ifdef WITH_CTT
m_flow.is_delayed = false;
m_delayed_flow_waiting = false;
#endif /* WITH_CTT */
}

void FlowRecord::update(const Packet &pkt, bool src)
{
if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow
auto flow_hash = m_hash;
m_delayed_flow = m_flow;
m_delayed_flow_waiting = true;
erase(); // erase the old flow, keeping the delayed flow
create(pkt, flow_hash);
return;
}
m_flow.time_last = pkt.ts;
if (src) {
m_flow.src_packets++;
Expand Down Expand Up @@ -192,6 +205,9 @@ void NHTFlowCache::init(const char *params)
m_timeout_idx = 0;
m_line_mask = (m_cache_size - 1) & ~(m_line_size - 1);
m_line_new_idx = m_line_size / 2;
#ifdef WITH_CTT
m_ctt_controller.init(parser.m_dev, 0);
#endif /* WITH_CTT */

if (m_export_queue == nullptr) {
throw PluginError("output queue must be set before init");
Expand Down Expand Up @@ -256,6 +272,17 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue)

void NHTFlowCache::export_flow(size_t index)
{
if (m_flow_table[index]->m_flow.is_delayed) {
return;
}
if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) {
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason);
update_flow_record_stats(
m_flow_table[index]->m_delayed_flow.src_packets
+ m_flow_table[index]->m_delayed_flow.dst_packets);
ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow);
}
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason);
update_flow_record_stats(
Expand Down Expand Up @@ -502,6 +529,16 @@ void NHTFlowCache::export_expired(time_t ts)
m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow);
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) {
m_flow_table[i]->m_flow.is_delayed = false;
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
}
if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) {
m_flow_table[i]->m_delayed_flow_waiting = false;
plugins_pre_export(m_flow_table[i]->m_delayed_flow);
export_flow(i);
}
#ifdef FLOW_CACHE_STATS
m_expired++;
#endif /* FLOW_CACHE_STATS */
Expand Down Expand Up @@ -658,4 +695,61 @@ void NHTFlowCache::prefetch_export_expired() const
__builtin_prefetch(m_flow_table[i], 0, 1);
}
}

#ifdef WITH_CTT

void CttController::create_record(uint64_t flow_hash_ctt, const struct timeval& ts)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
std::vector<std::byte> state = assemble_state(
OffloadMode::PACKET_OFFLOAD,
MetaType::FULL,
ts);
m_commander->write_record(std::move(key), std::move(state));
}
catch (const std::exception& e) {
throw;
}
}

void CttController::export_record(uint64_t flow_hash_ctt)
{
try {
std::vector<std::byte> key = assemble_key(flow_hash_ctt);
m_commander->export_and_delete_record(std::move(key));
}
catch (const std::exception& e) {
throw;
}
}

std::vector<std::byte> CttController::assemble_key(uint64_t flow_hash_ctt)
{
std::vector<std::byte> key(key_size_bytes, std::byte(0));
for (size_t i = 0; i < sizeof(flow_hash_ctt) && i < key_size_bytes; ++i) {
key[i] = static_cast<std::byte>((flow_hash_ctt >> (8 * i)) & 0xFF);
}
return key;
}

std::vector<std::byte> CttController::assemble_state(
OffloadMode offload_mode, MetaType meta_type, const struct timeval& ts)
{
std::vector<std::byte> state(state_size_bytes, std::byte(0));
std::vector<std::byte> state_mask(state_mask_size_bytes, std::byte(0));

state[0] = static_cast<std::byte>(offload_mode);
state[1] = static_cast<std::byte>(meta_type);

// timestamp in sec/ns format, 32+32 bits - 64 bits in total
for (size_t i = 0; i < sizeof(ts.tv_sec) && i < 4; ++i) {
state[2 + i] = static_cast<std::byte>((ts.tv_sec >> (8 * i)) & 0xFF);
}
for (size_t i = 0; i < sizeof(ts.tv_usec) && i < 4; ++i) {
state[6 + i] = static_cast<std::byte>((ts.tv_usec >> (8 * i)) & 0xFF);
}
return state;
}
#endif // WITH_CTT
}
Loading
Loading