From e27d3a0d1e5cd963a241f21b43335b27c17beed5 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Wed, 18 Dec 2024 17:29:58 +0100 Subject: [PATCH 1/2] CTT - Fix plugin events --- include/ipfixprobe/storage.hpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/include/ipfixprobe/storage.hpp b/include/ipfixprobe/storage.hpp index 5296557e4..bbe0fd550 100644 --- a/include/ipfixprobe/storage.hpp +++ b/include/ipfixprobe/storage.hpp @@ -138,7 +138,7 @@ class StoragePlugin : public Plugin */ bool all_data_required(const Flow& flow) const noexcept { - return m_plugins_status.get_all_data.any(); + return flow.plugins_status.get_all_data.any(); } /** @@ -148,7 +148,7 @@ class StoragePlugin : public Plugin */ bool no_data_required(const Flow& flow) const noexcept { - return m_plugins_status.get_no_data.all(); + return flow.plugins_status.get_no_data.all(); } /** @@ -161,6 +161,9 @@ class StoragePlugin : public Plugin return !all_data_required(flow) && !no_data_required(flow); } protected: + uint32_t get_plugin_count() const noexcept { + return m_plugin_cnt; + } //Every StoragePlugin implementation should call these functions at appropriate places /** From 4f63fb032c65b94a6d2d6fd6862c6ada4d02b782 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Wed, 18 Dec 2024 17:30:19 +0100 Subject: [PATCH 2/2] CTT - fixes --- storage/cache.cpp | 58 +++++++++++++++++++++++++++++------------------ 1 file changed, 36 insertions(+), 22 deletions(-) diff --git a/storage/cache.cpp b/storage/cache.cpp index af8171c4e..d7cb89555 100644 --- a/storage/cache.cpp +++ b/storage/cache.cpp @@ -150,9 +150,11 @@ void FlowRecord::update(const Packet &pkt, bool src) if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow auto flow_hash = m_hash; m_delayed_flow = m_flow; - m_delayed_flow_waiting = true; erase(); // erase the old flow, keeping the delayed flow + // Pokud ten update() se vola po pokusu exportovat m_flow kvuli nesotatku mista v radku muze se stat ze flow_hash != XXH64(pkt) create(pkt, flow_hash); + // Chybi post_create() + m_delayed_flow_waiting = true; return; } m_flow.time_last = pkt.ts; @@ -272,17 +274,22 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue) void NHTFlowCache::export_flow(size_t index) { - if (m_flow_table[index]->m_flow.is_delayed) { + //Delayed tok by se mel exportovat drive + if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { + m_total_exported++; + update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); + update_flow_record_stats( + m_flow_table[index]->m_delayed_flow.src_packets + + m_flow_table[index]->m_delayed_flow.dst_packets); + ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); + //Ten stary tok smazat + m_flow_table[index]->m_delayed_flow_waiting = false; + } + + if (m_flow_table[index]->m_flow.is_delayed) { return; } - if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) { - m_total_exported++; - update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason); - update_flow_record_stats( - m_flow_table[index]->m_delayed_flow.src_packets - + m_flow_table[index]->m_delayed_flow.dst_packets); - ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow); - } + m_total_exported++; update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason); update_flow_record_stats( @@ -467,24 +474,29 @@ int NHTFlowCache::put_pkt(Packet &pkt) } } else { /* Check if flow record is expired (inactive timeout). */ - if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive) { + //Pokud odkomentovat ten kod pod tim je to lepsi ale stale to pada :) + if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive /* && (!flow->m_flow.is_delayed || pkt.ts.tv_sec > flow->m_flow.delay_time) */) { + //flow->m_flow.is_delayed = false; m_flow_table[flow_index]->m_flow.end_reason = get_export_reason(flow->m_flow); plugins_pre_export(flow->m_flow); export_flow(flow_index); #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ + // Pokud m_flow_table[flow_index]->m_flow.is_delayed == true tady je stack overflow return put_pkt(pkt); } /* Check if flow record is expired (active timeout). */ - if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active) { + if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active /* && (!flow->m_flow.is_delayed || pkt.ts.tv_sec > flow->m_flow.delay_time) */) { + //flow->m_flow.is_delayed = false; m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE; plugins_pre_export(flow->m_flow); export_flow(flow_index); #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */ + // Pokud m_flow_table[flow_index]->m_flow.is_delayed == true tady je stack overflow return put_pkt(pkt); } @@ -529,16 +541,18 @@ void NHTFlowCache::export_expired(time_t ts) m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow); plugins_pre_export(m_flow_table[i]->m_flow); export_flow(i); - if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) { - m_flow_table[i]->m_flow.is_delayed = false; - plugins_pre_export(m_flow_table[i]->m_flow); - export_flow(i); - } - if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) { - m_flow_table[i]->m_delayed_flow_waiting = false; - plugins_pre_export(m_flow_table[i]->m_delayed_flow); - export_flow(i); - } + // >= vede na to ze pokud jsme vyexportovali tok v 15.999 + 1(delay) = 16.999 >= 16 -> muze se stat ze tok bude exportovan mhohem driv nez za vterinu + if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time > ts) { + m_flow_table[i]->m_flow.is_delayed = false; + plugins_pre_export(m_flow_table[i]->m_flow); + export_flow(i); + } + // to same tady + if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time > ts) { + m_flow_table[i]->m_delayed_flow_waiting = false; + plugins_pre_export(m_flow_table[i]->m_delayed_flow); + export_flow(i); + } #ifdef FLOW_CACHE_STATS m_expired++; #endif /* FLOW_CACHE_STATS */