Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions include/ipfixprobe/storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ class StoragePlugin : public Plugin
*/
bool all_data_required(const Flow& flow) const noexcept
{
return m_plugins_status.get_all_data.any();
return flow.plugins_status.get_all_data.any();
}

/**
Expand All @@ -148,7 +148,7 @@ class StoragePlugin : public Plugin
*/
bool no_data_required(const Flow& flow) const noexcept
{
return m_plugins_status.get_no_data.all();
return flow.plugins_status.get_no_data.all();
}

/**
Expand All @@ -161,6 +161,9 @@ class StoragePlugin : public Plugin
return !all_data_required(flow) && !no_data_required(flow);
}
protected:
uint32_t get_plugin_count() const noexcept {
return m_plugin_cnt;
}
//Every StoragePlugin implementation should call these functions at appropriate places

/**
Expand Down
58 changes: 36 additions & 22 deletions storage/cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ void FlowRecord::update(const Packet &pkt, bool src)
if (m_flow.is_delayed && !pkt.cttmeta.ctt_rec_matched) { // it means, the flow is waiting for export and it is not matched in CTT -> it must be new flow
auto flow_hash = m_hash;
m_delayed_flow = m_flow;
m_delayed_flow_waiting = true;
erase(); // erase the old flow, keeping the delayed flow
// Pokud ten update() se vola po pokusu exportovat m_flow kvuli nesotatku mista v radku muze se stat ze flow_hash != XXH64(pkt)
create(pkt, flow_hash);
// Chybi post_create()
m_delayed_flow_waiting = true;
return;
}
m_flow.time_last = pkt.ts;
Expand Down Expand Up @@ -272,17 +274,22 @@ void NHTFlowCache::set_queue(ipx_ring_t *queue)

void NHTFlowCache::export_flow(size_t index)
{
if (m_flow_table[index]->m_flow.is_delayed) {
//Delayed tok by se mel exportovat drive
if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) {
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason);
update_flow_record_stats(
m_flow_table[index]->m_delayed_flow.src_packets
+ m_flow_table[index]->m_delayed_flow.dst_packets);
ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow);
//Ten stary tok smazat
m_flow_table[index]->m_delayed_flow_waiting = false;
}

if (m_flow_table[index]->m_flow.is_delayed) {
return;
}
if (m_flow_table[index]->m_delayed_flow_waiting && !m_flow_table[index]->m_delayed_flow.is_delayed) {
m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_delayed_flow.end_reason);
update_flow_record_stats(
m_flow_table[index]->m_delayed_flow.src_packets
+ m_flow_table[index]->m_delayed_flow.dst_packets);
ipx_ring_push(m_export_queue, &m_flow_table[index]->m_delayed_flow);
}

m_total_exported++;
update_flow_end_reason_stats(m_flow_table[index]->m_flow.end_reason);
update_flow_record_stats(
Expand Down Expand Up @@ -467,24 +474,29 @@ int NHTFlowCache::put_pkt(Packet &pkt)
}
} else {
/* Check if flow record is expired (inactive timeout). */
if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive) {
//Pokud odkomentovat ten kod pod tim je to lepsi ale stale to pada :)
if (pkt.ts.tv_sec - flow->m_flow.time_last.tv_sec >= m_inactive /* && (!flow->m_flow.is_delayed || pkt.ts.tv_sec > flow->m_flow.delay_time) */) {
//flow->m_flow.is_delayed = false;
m_flow_table[flow_index]->m_flow.end_reason = get_export_reason(flow->m_flow);
plugins_pre_export(flow->m_flow);
export_flow(flow_index);
#ifdef FLOW_CACHE_STATS
m_expired++;
#endif /* FLOW_CACHE_STATS */
// Pokud m_flow_table[flow_index]->m_flow.is_delayed == true tady je stack overflow
return put_pkt(pkt);
}

/* Check if flow record is expired (active timeout). */
if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active) {
if (pkt.ts.tv_sec - flow->m_flow.time_first.tv_sec >= m_active /* && (!flow->m_flow.is_delayed || pkt.ts.tv_sec > flow->m_flow.delay_time) */) {
//flow->m_flow.is_delayed = false;
m_flow_table[flow_index]->m_flow.end_reason = FLOW_END_ACTIVE;
plugins_pre_export(flow->m_flow);
export_flow(flow_index);
#ifdef FLOW_CACHE_STATS
m_expired++;
#endif /* FLOW_CACHE_STATS */
// Pokud m_flow_table[flow_index]->m_flow.is_delayed == true tady je stack overflow
return put_pkt(pkt);
}

Expand Down Expand Up @@ -529,16 +541,18 @@ void NHTFlowCache::export_expired(time_t ts)
m_flow_table[i]->m_flow.end_reason = get_export_reason(m_flow_table[i]->m_flow);
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time >= ts) {
m_flow_table[i]->m_flow.is_delayed = false;
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
}
if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time >= ts) {
m_flow_table[i]->m_delayed_flow_waiting = false;
plugins_pre_export(m_flow_table[i]->m_delayed_flow);
export_flow(i);
}
// >= vede na to ze pokud jsme vyexportovali tok v 15.999 + 1(delay) = 16.999 >= 16 -> muze se stat ze tok bude exportovan mhohem driv nez za vterinu
if (!m_flow_table[i]->is_empty() && m_flow_table[i]->m_flow.is_delayed && m_flow_table[i]->m_flow.delay_time > ts) {
m_flow_table[i]->m_flow.is_delayed = false;
plugins_pre_export(m_flow_table[i]->m_flow);
export_flow(i);
}
// to same tady
if(!m_flow_table[i]->is_empty() && m_flow_table[i]->m_delayed_flow_waiting && m_flow_table[i]->m_delayed_flow.delay_time > ts) {
m_flow_table[i]->m_delayed_flow_waiting = false;
plugins_pre_export(m_flow_table[i]->m_delayed_flow);
export_flow(i);
}
#ifdef FLOW_CACHE_STATS
m_expired++;
#endif /* FLOW_CACHE_STATS */
Expand Down
Loading