From 33b7706d543e2baf89fc8a63fe11f779173974d5 Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Thu, 10 Jul 2025 08:17:58 +0200 Subject: [PATCH 1/2] Biflow aggregator - Update main.cpp --- biflow_aggregator/main.cpp | 43 +++++++++++++++++++++++++++++--------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/biflow_aggregator/main.cpp b/biflow_aggregator/main.cpp index 146d9109..6b252eba 100644 --- a/biflow_aggregator/main.cpp +++ b/biflow_aggregator/main.cpp @@ -79,7 +79,8 @@ UR_FIELDS ( PARAM('e', "eof", "End when receive EOF.", no_argument, "flag") \ PARAM('s', "size", "Max number of elements in flow cache.", required_argument, "number") \ PARAM('a', "active-timeout", "Active timeout.", required_argument, "number") \ - PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") + PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") \ + PARAM('g', "global-timeout", "Global timeout.", required_argument, "number") trap_module_info_t *module_info = NULL; static volatile int stop = 0; @@ -364,6 +365,20 @@ void update_flow( if (pt != t_data->value.passive_timeout) dll.swap(t_data); } + +static void flush_all(agg::Aggregator& aggregator, + ur_template_t* out_template, void* out_record, Dll& dll) +{ + for (auto flow_data : aggregator.flow_cache) { + proccess_and_send(aggregator, flow_data.first, flow_data.second, out_template, out_record); + agg::Flow_key_allocator::release_ptr(static_cast(flow_data.first.get_key().first)); + agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx); + } + dll.clear(); + aggregator.flow_cache.clear(); + trap_send_flush(0); +} + static int do_mainloop(Configuration& config) { @@ -383,8 +398,11 @@ do_mainloop(Configuration& config) time_t time_first; time_t time_last = 0; + time_t last_flush_time = 0; time_t t_passive = config.get_passive_timeout() >> 32; time_t t_active = config.get_active_timeout() >> 32; + const Configuration::Global_flush_configuration& flush_configuration + = config.get_global_flush_configuration(); std::size_t flow_cnt = 0; Dll dll; @@ -401,7 +419,7 @@ do_mainloop(Configuration& config) while (unlikely(stop == false)) { // Check timeouted flows - for (node *t_data = dll.begin(); t_data != NULL; t_data = t_data->next) { + for (node *t_data = dll.begin(); !flush_configuration.is_set() && t_data != NULL; t_data = t_data->next) { if (time_last >= t_data->value.passive_timeout) { // timeouted auto data = agg.flow_cache.find(t_data->value.key); proccess_and_send(agg, data->first, data->second, out_tmplt, out_rec); @@ -432,13 +450,7 @@ do_mainloop(Configuration& config) // clear all memory // flush all flows - for (auto flow_data : agg.flow_cache) { - proccess_and_send(agg, flow_data.first, flow_data.second, out_tmplt, out_rec); - agg::Flow_key_allocator::release_ptr(static_cast(flow_data.first.get_key().first)); - agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx); - } - - trap_send_flush(0); + flush_all(agg, out_tmplt, out_rec, dll); // Free previous record and temlate ur_free_template(out_tmplt); @@ -474,7 +486,7 @@ do_mainloop(Configuration& config) time_last = ur_time_get_sec(ur_get(in_tmplt, in_data, F_TIME_LAST)); // Check timeouted flows - for (node *t_data = dll.begin(); t_data != NULL; t_data = t_data->next) { + for (node *t_data = dll.begin(); !flush_configuration.is_set() && t_data != NULL; t_data = t_data->next) { if (time_first >= t_data->value.passive_timeout || time_last >= t_data->value.active_timeout) { // timeouted auto data = agg.flow_cache.find(t_data->value.key); proccess_and_send(agg, data->first, data->second, out_tmplt, out_rec); @@ -490,6 +502,14 @@ do_mainloop(Configuration& config) trap_send_flush(0); timeouted = false; } + + if (unlikely(flush_configuration.is_set() && time_last - last_flush_time >= flush_configuration.interval)) { + last_flush_time = time_last; + if (flush_configuration.type == Configuration::Global_flush_configuration::Type::ABSOLUTE) { + last_flush_time = last_flush_time / flush_configuration.interval * flush_configuration.interval; + } + flush_all(agg, out_tmplt, out_rec, dll); + } bool is_key_reversed = key.generate(in_data, in_tmplt, config.is_biflow_key()); @@ -662,6 +682,9 @@ main(int argc, char **argv) case 's': config.set_flow_cache_size(optarg); break; + case 'g': + config.set_global_flush_configuration(optarg); + break; default: std::cerr << "Invalid argument " << opt << ", skipped..." << std::endl; } From 6b89b96fb514485cff90f2a0e0b5138ddf9a8add Mon Sep 17 00:00:00 2001 From: Damir Zainullin Date: Sat, 12 Jul 2025 15:39:28 +0200 Subject: [PATCH 2/2] Biflow aggregator - Add tests --- biflow_aggregator/Makefile.am | 2 + biflow_aggregator/tests/config.xml | 85 +++++++++++++++++++ .../tests/inputs/input1_packet_aggregation | 3 + .../tests/inputs/input2_packet_aggregation | 6 ++ .../inputs/input3_generic_flow_key_min_ports | 6 ++ .../inputs/input4_src_dst_ip_all_aggregations | 3 + .../tests/references/reference1_gt0 | 1 + .../tests/references/reference1_gt5a | 2 + .../tests/references/reference1_gt5r | 1 + .../tests/references/reference2_gt0 | 2 + .../tests/references/reference2_gt5a | 5 ++ .../tests/references/reference2_gt5r | 4 + .../tests/references/reference3_gt0 | 2 + .../tests/references/reference3_gt5a | 5 ++ .../tests/references/reference3_gt5r | 4 + .../tests/references/reference4_gt0 | 1 + .../tests/references/reference4_gt5a | 2 + .../tests/references/reference4_gt5r | 1 + biflow_aggregator/tests/test.sh | 82 ++++++++++++++++++ 19 files changed, 217 insertions(+) create mode 100644 biflow_aggregator/tests/config.xml create mode 100644 biflow_aggregator/tests/inputs/input1_packet_aggregation create mode 100644 biflow_aggregator/tests/inputs/input2_packet_aggregation create mode 100644 biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports create mode 100644 biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations create mode 100644 biflow_aggregator/tests/references/reference1_gt0 create mode 100644 biflow_aggregator/tests/references/reference1_gt5a create mode 100644 biflow_aggregator/tests/references/reference1_gt5r create mode 100644 biflow_aggregator/tests/references/reference2_gt0 create mode 100644 biflow_aggregator/tests/references/reference2_gt5a create mode 100644 biflow_aggregator/tests/references/reference2_gt5r create mode 100644 biflow_aggregator/tests/references/reference3_gt0 create mode 100644 biflow_aggregator/tests/references/reference3_gt5a create mode 100644 biflow_aggregator/tests/references/reference3_gt5r create mode 100644 biflow_aggregator/tests/references/reference4_gt0 create mode 100644 biflow_aggregator/tests/references/reference4_gt5a create mode 100644 biflow_aggregator/tests/references/reference4_gt5r create mode 100755 biflow_aggregator/tests/test.sh diff --git a/biflow_aggregator/Makefile.am b/biflow_aggregator/Makefile.am index 5b5f699c..c68445bb 100644 --- a/biflow_aggregator/Makefile.am +++ b/biflow_aggregator/Makefile.am @@ -5,3 +5,5 @@ biflow_aggregator_SOURCES=main.cpp fields.c fields.h configuration.cpp configura rapidxml.hpp biflow_aggregator_LDADD=-lunirec -ltrap include ../aminclude.am + +TESTS = tests/test.sh diff --git a/biflow_aggregator/tests/config.xml b/biflow_aggregator/tests/config.xml new file mode 100644 index 00000000..da1a851b --- /dev/null +++ b/biflow_aggregator/tests/config.xml @@ -0,0 +1,85 @@ + + + + + SUM + PACKETS + + + + + FLOW_ID + KEY + + + + SRC_PORT + MIN + + + DST_PORT + MIN + + + + + SRC_IP + KEY + + + DST_IP + KEY + + + + SUM + SUM + + + MIN + MIN + + + MAX + MAX + + + FIRST_NON_EMPTY + FIRST_NON_EMPTY + + + LAST_NON_EMPTY + LAST_NON_EMPTY + + + FIRST + FIRST + + + LAST + LAST + + + AVG + AVG + + + BITOR + BITOR + + + STR_APPEND + APPEND + : + 10 + + + SORTED_MERGE_VALUE + SORTED_MERGE + : + SORTED_MERGE_KEY + ASCENDING + 10 + + + diff --git a/biflow_aggregator/tests/inputs/input1_packet_aggregation b/biflow_aggregator/tests/inputs/input1_packet_aggregation new file mode 100644 index 00000000..4ae4e83d --- /dev/null +++ b/biflow_aggregator/tests/inputs/input1_packet_aggregation @@ -0,0 +1,3 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST +192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0 diff --git a/biflow_aggregator/tests/inputs/input2_packet_aggregation b/biflow_aggregator/tests/inputs/input2_packet_aggregation new file mode 100644 index 00000000..453ba648 --- /dev/null +++ b/biflow_aggregator/tests/inputs/input2_packet_aggregation @@ -0,0 +1,6 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST +192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0 +192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0 +192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0 +192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:51.0 diff --git a/biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports b/biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports new file mode 100644 index 00000000..8abaa7d5 --- /dev/null +++ b/biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports @@ -0,0 +1,6 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST,uint32 FLOW_ID,uint16 SRC_PORT,uint16 DST_PORT +192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,1,0,6666 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0,1,6666,0 +192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0,2,6666,0 +192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0,2,3333,3333 +192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:41.0,2,0,6666 diff --git a/biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations b/biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations new file mode 100644 index 00000000..5798ebe4 --- /dev/null +++ b/biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations @@ -0,0 +1,3 @@ +ipaddr DST_IP,ipaddr SRC_IP,time TIME_FIRST,time TIME_LAST,uint32 SUM,uint32 MIN,uint32 MAX,string FIRST_NON_EMPTY,string LAST_NON_EMPTY,uint32 FIRST,uint32 LAST,double AVG,string STR_APPEND,uint32 BITOR,uint32* SORTED_MERGE_KEY,uint32* SORTED_MERGE_VALUE +192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,333,5,5,,222,16,32,7,test1,1,[3|2|1],[1|2|3] +192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:11.0,333,1,4,test,,555,33,9,test2,3,[8|7|6],[6|5|4] diff --git a/biflow_aggregator/tests/references/reference1_gt0 b/biflow_aggregator/tests/references/reference1_gt0 new file mode 100644 index 00000000..a62b2f5e --- /dev/null +++ b/biflow_aggregator/tests/references/reference1_gt0 @@ -0,0 +1 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667 diff --git a/biflow_aggregator/tests/references/reference1_gt5a b/biflow_aggregator/tests/references/reference1_gt5a new file mode 100644 index 00000000..a4ad9a5e --- /dev/null +++ b/biflow_aggregator/tests/references/reference1_gt5a @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,666 diff --git a/biflow_aggregator/tests/references/reference1_gt5r b/biflow_aggregator/tests/references/reference1_gt5r new file mode 100644 index 00000000..a62b2f5e --- /dev/null +++ b/biflow_aggregator/tests/references/reference1_gt5r @@ -0,0 +1 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667 diff --git a/biflow_aggregator/tests/references/reference2_gt0 b/biflow_aggregator/tests/references/reference2_gt0 new file mode 100644 index 00000000..1c168c6b --- /dev/null +++ b/biflow_aggregator/tests/references/reference2_gt0 @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:23.000000,4,694 +2016-10-28T17:00:26.000000,2016-10-28T17:00:51.000000,1,53 diff --git a/biflow_aggregator/tests/references/reference2_gt5a b/biflow_aggregator/tests/references/reference2_gt5a new file mode 100644 index 00000000..bfcda95f --- /dev/null +++ b/biflow_aggregator/tests/references/reference2_gt5a @@ -0,0 +1,5 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,666 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,10 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,17 +2016-10-28T17:00:26.000000,2016-10-28T17:00:51.000000,1,53 diff --git a/biflow_aggregator/tests/references/reference2_gt5r b/biflow_aggregator/tests/references/reference2_gt5r new file mode 100644 index 00000000..74d4a06d --- /dev/null +++ b/biflow_aggregator/tests/references/reference2_gt5r @@ -0,0 +1,4 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,10 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,17 +2016-10-28T17:00:26.000000,2016-10-28T17:00:51.000000,1,53 diff --git a/biflow_aggregator/tests/references/reference3_gt0 b/biflow_aggregator/tests/references/reference3_gt0 new file mode 100644 index 00000000..24210f65 --- /dev/null +++ b/biflow_aggregator/tests/references/reference3_gt0 @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1,0,0 +2016-10-28T17:00:13.000000,2016-10-28T17:00:41.000000,3,2,0,0 diff --git a/biflow_aggregator/tests/references/reference3_gt5a b/biflow_aggregator/tests/references/reference3_gt5a new file mode 100644 index 00000000..913131ae --- /dev/null +++ b/biflow_aggregator/tests/references/reference3_gt5a @@ -0,0 +1,5 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1,6666,0 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,1,0,6666 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,2,0,6666 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,2,3333,3333 +2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,2,6666,0 diff --git a/biflow_aggregator/tests/references/reference3_gt5r b/biflow_aggregator/tests/references/reference3_gt5r new file mode 100644 index 00000000..4d58ba78 --- /dev/null +++ b/biflow_aggregator/tests/references/reference3_gt5r @@ -0,0 +1,4 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1,0,0 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,2,0,6666 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,2,3333,3333 +2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,2,6666,0 diff --git a/biflow_aggregator/tests/references/reference4_gt0 b/biflow_aggregator/tests/references/reference4_gt0 new file mode 100644 index 00000000..f79fed90 --- /dev/null +++ b/biflow_aggregator/tests/references/reference4_gt0 @@ -0,0 +1 @@ +192.168.1.1,192.168.1.2,8.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,2,16,33,5,1,666,"test","222","test1:",[3|2|1|4|5|6] diff --git a/biflow_aggregator/tests/references/reference4_gt5a b/biflow_aggregator/tests/references/reference4_gt5a new file mode 100644 index 00000000..8cd58c84 --- /dev/null +++ b/biflow_aggregator/tests/references/reference4_gt5a @@ -0,0 +1,2 @@ +192.168.1.1,192.168.1.2,7.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1,16,32,5,5,333,"","222","test1:",[3|2|1] +192.168.1.1,192.168.1.2,9.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,1,555,33,4,1,333,"test","","test2:",[4|5|6] diff --git a/biflow_aggregator/tests/references/reference4_gt5r b/biflow_aggregator/tests/references/reference4_gt5r new file mode 100644 index 00000000..f79fed90 --- /dev/null +++ b/biflow_aggregator/tests/references/reference4_gt5r @@ -0,0 +1 @@ +192.168.1.1,192.168.1.2,8.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,2,16,33,5,1,666,"test","222","test1:",[3|2|1|4|5|6] diff --git a/biflow_aggregator/tests/test.sh b/biflow_aggregator/tests/test.sh new file mode 100755 index 00000000..06c9e3f3 --- /dev/null +++ b/biflow_aggregator/tests/test.sh @@ -0,0 +1,82 @@ +#!/bin/bash +#set -e + +die_if_not_running() { + local pid=$1 + local error_message=$2 + if not kill -0 "$pid" 2>/dev/null; then + echo $error_message + exit 1 + fi +} + +run_test_with_global_timeout() { + local input=$1 + local output=$2 + local reference=$3 + local config=$4 + local global_timeout=$5 + + (../biflow_aggregator -i "u:lr,u:ba" -e -c config.xml -n $config -g $global_timeout & ) || true + local AGGREGATOR_PID=$! + sleep 0.5 + die_if_not_running $AGGREGATOR_PID "Failed to start biflow aggregator" + + ../../logger/logger -w $output -i "u:ba" & + local LOGGER_PID=$! + sleep 0.5 + die_if_not_running $LOGGER_PID "Failed to start logger" + + ../../logreplay/logreplay -f $input -i "u:lr" & + local LOGREPLAY_PID=$! + sleep 0.5 + die_if_not_running $LOGREPLAY_PID "Failed to start logreplay" + + sleep 2 + wait $LOGREPLAY_PID + kill $AGGREGATOR_PID 2>/dev/null + sleep 0.2 + kill $LOGGER_PID 2>/dev/null + wait $LOGGER_PID + + if ! colordiff $output $reference; then + echo $output doesnt match $reference + success="false" + fi +} + +script_path="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" +cd $script_path +success="true" +mkdir -p outputs +for input in inputs/input*; do + basename=$(basename "$input") + echo Found $basename + if [[ $basename =~ input([0-9]+)_(.+) ]]; then + index="${BASH_REMATCH[1]}" + config="${BASH_REMATCH[2]}" + output=outputs/output$index + reference=references/reference$index + else + echo $input + echo "Incorrect input name. Must be input_" + exit 1 + fi + + echo "Running test without global timeout..." + run_test_with_global_timeout $input ${output}_gt0 ${reference}_gt0 $config "0" + + echo "Running test with relative global timeout..." + run_test_with_global_timeout $input ${output}_gt5r ${reference}_gt5r $config "5r" + + echo "Running test with absolute global timeout..." + run_test_with_global_timeout $input ${output}_gt5a ${reference}_gt5a $config "5a" + +done + +if [ "$success" = "true" ]; then + echo "All tests passed successfully." +else + echo "Some tests failed." + exit 1 +fi \ No newline at end of file