From 32f8f086808a718e2efb15473bfca939b325eb1d Mon Sep 17 00:00:00 2001 From: David Kezlinek <103315902+Dakevid@users.noreply.github.com> Date: Tue, 18 Nov 2025 13:42:59 +0100 Subject: [PATCH] FlowGrouper - Introduce new module --- modules/CMakeLists.txt | 1 + modules/deduplicator/src/timeoutHashMap.hpp | 2 +- modules/flowGrouper/CMakeLists.txt | 1 + modules/flowGrouper/README.md | 70 +++++ modules/flowGrouper/src/CMakeLists.txt | 18 ++ modules/flowGrouper/src/flowGrouper.cpp | 100 ++++++++ modules/flowGrouper/src/flowGrouper.hpp | 133 ++++++++++ modules/flowGrouper/src/main.cpp | 240 ++++++++++++++++++ modules/flowGrouper/tests/test.sh | 123 +++++++++ .../tests/testsData/inputs/input1.csv | 7 + .../tests/testsData/results/res1.csv | 7 + 11 files changed, 701 insertions(+), 1 deletion(-) create mode 100644 modules/flowGrouper/CMakeLists.txt create mode 100644 modules/flowGrouper/README.md create mode 100644 modules/flowGrouper/src/CMakeLists.txt create mode 100644 modules/flowGrouper/src/flowGrouper.cpp create mode 100644 modules/flowGrouper/src/flowGrouper.hpp create mode 100644 modules/flowGrouper/src/main.cpp create mode 100755 modules/flowGrouper/tests/test.sh create mode 100644 modules/flowGrouper/tests/testsData/inputs/input1.csv create mode 100644 modules/flowGrouper/tests/testsData/results/res1.csv diff --git a/modules/CMakeLists.txt b/modules/CMakeLists.txt index 2d84c1c3..8a6cb6d7 100644 --- a/modules/CMakeLists.txt +++ b/modules/CMakeLists.txt @@ -3,3 +3,4 @@ add_subdirectory(sampler) add_subdirectory(telemetry) add_subdirectory(deduplicator) add_subdirectory(clickhouse) +add_subdirectory(flowGrouper) diff --git a/modules/deduplicator/src/timeoutHashMap.hpp b/modules/deduplicator/src/timeoutHashMap.hpp index 12ed8382..1062a1d9 100644 --- a/modules/deduplicator/src/timeoutHashMap.hpp +++ b/modules/deduplicator/src/timeoutHashMap.hpp @@ -302,7 +302,7 @@ class TimeoutHashMap { } private: - std::function m_hasher; + std::function m_hasher; typename HashMapTimeoutBucket::TimeoutBucketCallables m_timeoutBucketCallables; std::vector m_buckets; const uint64_t M_BUCKET_MASK; diff --git a/modules/flowGrouper/CMakeLists.txt b/modules/flowGrouper/CMakeLists.txt new file mode 100644 index 00000000..febd4f0a --- /dev/null +++ b/modules/flowGrouper/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(src) diff --git a/modules/flowGrouper/README.md b/modules/flowGrouper/README.md new file mode 100644 index 00000000..f6cc6f2c --- /dev/null +++ b/modules/flowGrouper/README.md @@ -0,0 +1,70 @@ +# FlowGrouper module - README + +## Description +FlowGrouper groups Unirec flow records that share the same 5-tuple (source IP, +destination IP, source port, destination port and protocol) within a configurable +time window and assigns a stable `FLOW_GROUP_KEY` to all records that belong to +the same group. + +This module is useful when aggregating flow records that may be +received multiple times (e.g., from multiple exporters). + + +## Interfaces +- Input: 1 +- Output: 1 + +## Required Unirec Fields +The module expects the input Unirec template to contain the following fields: +- `SRC_IP` (ipaddr) +- `DST_IP` (ipaddr) +- `SRC_PORT` (uint16) +- `DST_PORT` (uint16) +- `PROTOCOL` (uint8) + +FlowGrouper will extend the template by adding `uint64 FLOW_GROUP_KEY` to the output records. + +## Parameters +Command-line parameters follow the TRAP / Unirec conventions. The main module +parameters are: + +- `-s, --size ` Exponent N for the hash map size (2^N entries). Default value is 15 +- `-t, --timeout ` Time to consider similar flows as duplicates in milliseconds. Default value is 5000 (5s) + +- `-m, --appfs-mountpoint ` Path where the appFs directory will be mounted + +### Common TRAP / Unirec parameters +- `-h` : print help and module-specific parameters +- `-v`, `-vv`, `-vvv` : verbosity levels + +## How Flow Grouping Works +- Records are grouped when they arrive within the configured `--timeout` + interval and share the same `SRC_IP`, `DST_IP`, `SRC_PORT`, `DST_PORT` and + `PROTOCOL` values. +- When a record arrives and no existing group matches, a new `FLOW_GROUP_KEY` + is created and stored in an internal timeout hash map keyed by the 5-tuple. +- Subsequent records that match the tuple within the timeout receive the same`FLOW_GROUP_KEY`. + Note: FLOW_GROUP_KEY is not unique identifier. It identifies records that belong to the same group only in the context of the 5-tuple (SRC_IP, DST_IP, SRC_PORT, DST_PORT, PROTOCOL). +## Telemetry data format + +``` +├─ input/ +│ └─ stats +└─ flowGrouper/ + └─ statistics +``` + +Telemetry counters include: +- **Inserted groups:** number of newly created flow groups +- **Replaced groups:** number of times an existing bucket entry was replaced with new group +- **Found groups:** number of times a matching group was found for an input record + + +## Usage Examples +Process Unirec records from a TRAP input and forward them with an added +`FLOW_GROUP_KEY`. The example sets the hash map exponent to `15` (2^15 entries) +and timeout to `1000` ms: + +``` +$ FlowGrouper -i "u:in,u:out" -s 15 -t 1000 +``` diff --git a/modules/flowGrouper/src/CMakeLists.txt b/modules/flowGrouper/src/CMakeLists.txt new file mode 100644 index 00000000..afb3e9b8 --- /dev/null +++ b/modules/flowGrouper/src/CMakeLists.txt @@ -0,0 +1,18 @@ +add_executable(flowGrouper + main.cpp + flowGrouper.cpp +) + +target_link_libraries(flowGrouper PRIVATE + telemetry::telemetry + telemetry::appFs + common + rapidcsv + unirec::unirec++ + unirec::unirec + trap::trap + argparse + xxhash +) + +install(TARGETS flowGrouper DESTINATION ${INSTALL_DIR_BIN}) diff --git a/modules/flowGrouper/src/flowGrouper.cpp b/modules/flowGrouper/src/flowGrouper.cpp new file mode 100644 index 00000000..4b12b211 --- /dev/null +++ b/modules/flowGrouper/src/flowGrouper.cpp @@ -0,0 +1,100 @@ + +#include "flowGrouper.hpp" + +#include +#include +#include + +using namespace Nemea; + +namespace FlowGrouper { + +template +static uint64_t xxHasher(const Key& key) +{ + return XXH3_64bits(reinterpret_cast(&key), sizeof(key)); +} + +static FlowGrouper::Timestamp timeSum(const FlowGrouper::Timestamp& value, uint64_t timeout) +{ + return value + std::chrono::milliseconds(timeout); +} + +static ur_field_id_t getUnirecIdByName(const char* str) +{ + auto unirecId = ur_get_id_by_name(str); + if (unirecId == UR_E_INVALID_NAME) { + throw std::runtime_error(std::string("Invalid Unirec name:") + str); + } + return static_cast(unirecId); +} + +FlowGrouper::FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters) + : m_hashMap(parameters, xxHasher, std::less<>(), timeSum) +{ + constexpr const size_t timeoutBucketSize = 256; + static_assert( + sizeof(FlowGrouperHashMap::HashMapTimeoutBucket) == timeoutBucketSize, + "TimeoutBucket size is not 256 bytes"); +} + +void FlowGrouper::updateUnirecIds() +{ + m_ids.srcIpId = getUnirecIdByName("SRC_IP"); + m_ids.dstIpId = getUnirecIdByName("DST_IP"); + m_ids.srcPortId = getUnirecIdByName("SRC_PORT"); + m_ids.dstPortId = getUnirecIdByName("DST_PORT"); + m_ids.protocolId = getUnirecIdByName("PROTOCOL"); + m_ids.flowGroupKeyId = getUnirecIdByName(getOutputFieldName().c_str()); + +} + +FlowGrouper::FlowGroupKey FlowGrouper::getFlowKey(Nemea::UnirecRecordView& view) +{ + FlowKey flowKey; + flowKey.srcIp = view.getFieldAsType(m_ids.srcIpId); + flowKey.dstIp = view.getFieldAsType(m_ids.dstIpId); + flowKey.srcPort = view.getFieldAsType(m_ids.srcPortId); + flowKey.dstPort = view.getFieldAsType(m_ids.dstPortId); + flowKey.proto = view.getFieldAsType(m_ids.protocolId); + + const FlowGrouper::FlowGroupKey newFlowKey = (FlowGrouper::FlowGroupKey) std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); + const auto [it, insertResult] + = m_hashMap.insert({flowKey, newFlowKey}, std::chrono::steady_clock::now()); + + if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::INSERTED) { + m_newInserted++; + return newFlowKey; + } + if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::REPLACED) { + m_replaced++; + return newFlowKey; + } + m_found++; + return *it; +} + +void FlowGrouper::addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord) +{ + const FlowGrouper::FlowGroupKey flowKey = getFlowKey(inputRecord); + outputRecord.setFieldFromType(flowKey,m_ids.flowGroupKeyId); +} + +void FlowGrouper::setTelemetryDirectory(const std::shared_ptr& directory) +{ + m_holder.add(directory); + + const telemetry::FileOps fileOps + = {[this]() { + telemetry::Dict dict; + dict["replacedCount"] = telemetry::Scalar((long unsigned int) m_replaced); + dict["newInsertedCount"] = telemetry::Scalar((long unsigned int) m_newInserted); + dict["foundCount"] = telemetry::Scalar((long unsigned int) m_found); + return dict; + }, + nullptr}; + + m_holder.add(directory->addFile("statistics", fileOps)); +} + +} // namespace FlowGrouper diff --git a/modules/flowGrouper/src/flowGrouper.hpp b/modules/flowGrouper/src/flowGrouper.hpp new file mode 100644 index 00000000..afc9bff7 --- /dev/null +++ b/modules/flowGrouper/src/flowGrouper.hpp @@ -0,0 +1,133 @@ +#pragma once + +#include "../../deduplicator/src/timeoutHashMap.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace FlowGrouper { + +/** + * @brief FlowGrouper class to add same flowID to duplicate records + */ +class FlowGrouper { +public: + /** + * @brief Timestamp type used by flowGrouper. + */ + using Timestamp = std::chrono::time_point; + + /** + * @brief Field type representing Flow ID (FLOW_GROUP_KEY). + */ + using FlowGroupKey = uint64_t; + + /** + * @brief Represents key fields of flow that belong to the same group. + */ + struct FlowKey { + Nemea::IpAddress srcIp; ///< Source IP address. + Nemea::IpAddress dstIp; ///< Destination IP address. + uint16_t srcPort; ///< Source port. + uint16_t dstPort; ///< Destination port. + uint8_t proto; ///< Protocol ID. + }; + + + + /** + * @brief Timeout hash map type used by FlowGrouper. + */ + using FlowGrouperHashMap = Deduplicator::TimeoutHashMap< + FlowKey, + FlowGroupKey, + Timestamp, + std::function, + std::function, + std::function>; + + static inline const uint64_t DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s + static inline const uint32_t DEFAULT_HASHMAP_EXPONENT = 20; ///< Default size exponent - 2^20 entries + + /** + * @brief FlowGrouper constructor + * + * @param parameters Parameters to build hash table of flowGrouper + */ + explicit FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters); + + /** + * @brief Checks if the given UnirecRecordView group already exists in the hash map if not adds it. + * @param view The Unirec record to check. + * @return FlowGroupKey of the flow. + */ + FlowGroupKey getFlowKey(Nemea::UnirecRecordView& view); + + /** + * @brief Adds FLOW_GROUP_KEY field to the output Unirec record. + * @param inputRecord The input Unirec record view to get field values from. + * @param outputRecord The output Unirec record where FLOW_GROUP_KEY will be added. + */ + void addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord); + + /** + * @brief Sets the telemetry directory for the flowGrouper. + * @param directory directory for flowGrouper telemetry. + */ + void setTelemetryDirectory(const std::shared_ptr& directory); + + /** + * @brief Update Unirec Id of required fields after template format change. + */ + void updateUnirecIds(); + + /** + * @brief Gets the name of the output field added by FlowGrouper. + * @return Name of the output field. + */ + static std::string getOutputFieldName() { + return "FLOW_GROUP_KEY"; + } + + /** + * @brief Gets the output template string after adding FLOW_GROUP_KEY field. + * @param inputTemplate The input Unirec template string. + * @return The output Unirec template string with FLOW_GROUP_KEY field added. + */ + static std::string getOutputTemplate(std::string inputTemplate) { + //check if input template already contains output field + if (inputTemplate.find(" "+getOutputFieldName()) != std::string::npos) { + return inputTemplate; + } + return inputTemplate + ", uint64 " + getOutputFieldName(); + } + +private: + FlowGrouperHashMap m_hashMap; ///< Hash map to keep flows + + uint32_t m_newInserted {0}; ///< Count of new groups + uint32_t m_replaced {0}; ///< Count of replaced groups + uint32_t m_found {0}; ///< Count of when groupkey was found + + telemetry::Holder m_holder; ///< Telemetry holder + + struct UnirecIdStorage { + ur_field_id_t srcIpId; ///< Unirec ID of source ip. + ur_field_id_t dstIpId; ///< Unirec ID of destination ip. + ur_field_id_t srcPortId; ///< Unirec ID of source port. + ur_field_id_t dstPortId; ///< Unirec ID of destination port. + ur_field_id_t protocolId; ///< Unirec ID of protocol field. + + ur_field_id_t flowGroupKeyId; ///< Unirec ID of FLOW_GROUP_KEY field. + }; + + UnirecIdStorage m_ids; ///< Ids of Unirec fields used by flowGrouper module +}; + +} // namespace FlowGrouper diff --git a/modules/flowGrouper/src/main.cpp b/modules/flowGrouper/src/main.cpp new file mode 100644 index 00000000..59d8a189 --- /dev/null +++ b/modules/flowGrouper/src/main.cpp @@ -0,0 +1,240 @@ +/** + * @file + * @brief FlowGrouper Module + * + * This module processes Unirec records and assigns a FLOW_GROUP_KEY to flows + * that share the same tuple of fields: SRC_IP, DST_IP, SRC_PORT, DST_PORT, + * and PROTOCOL and that are received within a configured time interval. + * + * The group key is not globally unique per flow; it identifies the group by + * the tuple of those fields. All flows with the same tuple during the interval + * receive the same FLOW_GROUP_KEY value. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + + +#include "flowGrouper.hpp" +#include "logger/logger.hpp" +#include "unirec/unirec-telemetry.hpp" + +#include +#include +#include +#include +#include +#include + +using namespace Nemea; + + +/** + * @brief Handle a format change exception by adjusting the template. + * + * This function is called when a `FormatChangeException` is caught in the main loop. + * It updates the input template and replaces the output template to match the new format. + * + * @param inInterface Input Unirec interface. + * @param outInterface Output Unirec interface. + * @param flowGrouper FlowGrouper instance used to build the new output template. + */ +static void handleFormatChange( + UnirecInputInterface& inInterface, + UnirecOutputInterface& outInterface, + FlowGrouper::FlowGrouper& flowGrouper) +{ + inInterface.changeTemplate(); + + auto* templateDef = inInterface.getTemplate(); + if (templateDef == nullptr) { + throw std::runtime_error(std::string("Unable to get template from trap input")); + } + // convert template to string and append new fields + const std::string stringTemp = static_cast(ur_template_string(templateDef)); + + outInterface.changeTemplate(FlowGrouper::FlowGrouper::getOutputTemplate(stringTemp)); + flowGrouper.updateUnirecIds(); +} + + + +/** + * @brief Process the next Unirec record and sample them. + * + * This function receives the next Unirec record through the input interface, + * finds group key using FlowGrouper, adds it to the output Unirec record, + * + * @param input Input Unirec interface. + * @param output Output Unirec interface. + * @param flowGrouper FlowGrouper instance used to process flows. + */ +static void processNextRecord( + UnirecInputInterface& input, + UnirecOutputInterface& output, + FlowGrouper::FlowGrouper& flowGrouper) +{ + std::optional inputUnirecView = input.receive(); + if (!inputUnirecView) { + throw std::runtime_error(std::string("Unable to create record")); + } + + std::optional unirecRecord = output.getUnirecRecord(); + if (!unirecRecord) { + throw std::runtime_error(std::string("Unable to create output Unirec record")); + } + unirecRecord->copyFieldsFrom(*inputUnirecView); + + // Add FLOW_GROUP_KEY to the output records + try { + flowGrouper.addFlowKey(*inputUnirecView, *unirecRecord); + } catch (const std::exception& ex) { + throw std::runtime_error( + std::string("Error while loading data to Unirec record: ") + ex.what()); + } + + output.send(*unirecRecord); +} + +/** + * @brief Process Unirec records. + * + * The `processUnirecRecords` function continuously receives Unirec records through the provided + * input and output interfaces (`inInterface` and `outInterface`) and processes them. The loop + * runs until an end-of-file condition is encountered. + * + * @param inInterface Input Unirec interface. + * @param outInterface Output Unirec interface. + * @param flowGrouper FlowGrouper instance used to process flows. + */ +static void processUnirecRecords( + UnirecInputInterface& inInterface, + UnirecOutputInterface& outInterface, + FlowGrouper::FlowGrouper& flowGrouper) +{ + while (true) { + try { + processNextRecord(inInterface, outInterface, flowGrouper); + } catch (FormatChangeException& ex) { + handleFormatChange(inInterface, outInterface, flowGrouper); + } catch (const EoFException& ex) { + break; + } catch (const std::exception& ex) { + throw; + } + } +} + +int main(int argc, char** argv) +{ + argparse::ArgumentParser program("Unirec FlowGrouper"); + + Unirec unirec({1, 1, "FlowGrouper", "Unirec FlowGrouper module"}); + + Nm::loggerInit(); + auto logger = Nm::loggerGet("main"); + + + try { + program.add_argument("-s", "--size") + .required() + .help("Exponent N for the hash map size (2^N entries). Default: 20 (~1 048 576).") + .default_value(FlowGrouper::FlowGrouper::DEFAULT_HASHMAP_EXPONENT) + .scan<'u', uint32_t>(); + program.add_argument("-t", "--timeout") + .required() + .help( + "Number of milliseconds to consider flows part of the same group. Default: 5000 (5s).") + .default_value(FlowGrouper::FlowGrouper::DEFAULT_HASHMAP_TIMEOUT) + .scan<'u', uint64_t>(); + program.add_argument("-m", "--appfs-mountpoint") + .required() + .help("path where the appFs directory will be mounted") + .default_value(std::string("")); + } catch (const std::exception& ex) { + logger->error(ex.what()); + std::cerr << program; + return EXIT_FAILURE; + } + + try { + unirec.init(argc, argv); + } catch (const HelpException& ex) { + std::cerr << program; + return EXIT_SUCCESS; + } catch (const std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + program.parse_args(argc, argv); + } catch (const std::exception& ex) { + logger->error(ex.what()); + std::cerr << program; + return EXIT_FAILURE; + } + + std::shared_ptr telemetryRootDirectory; + telemetryRootDirectory = telemetry::Directory::create(); + + std::unique_ptr appFs; + + try { + auto mountPoint = program.get("--appfs-mountpoint"); + if (!mountPoint.empty()) { + const bool tryToUnmountOnStart = true; + const bool createMountPoint = true; + appFs = std::make_unique( + telemetryRootDirectory, + mountPoint, + tryToUnmountOnStart, + createMountPoint); + appFs->start(); + } + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + try { + const auto tableSize = program.get("--size"); + if (tableSize <= 0) { + std::cerr << "Table size must be at least 8.\n"; + return EXIT_FAILURE; + } + const auto timeout = program.get("--timeout"); + if (timeout <= 0) { + std::cerr << "Timeout must be higher than zero.\n"; + return EXIT_FAILURE; + } + + UnirecInputInterface inInterface = unirec.buildInputInterface(); + UnirecOutputInterface outInterface = unirec.buildOutputInterface(); + + auto telemetryInputDirectory = telemetryRootDirectory->addDir("input"); + const telemetry::FileOps inputFileOps + = {[&inInterface]() { return Nm::getInterfaceTelemetry(inInterface); }, nullptr}; + const auto inputFile = telemetryInputDirectory->addFile("stats", inputFileOps); + + + auto telemetryFlowGrouperDirectory = telemetryRootDirectory->addDir("flowGrouper"); + + FlowGrouper::FlowGrouper::FlowGrouperHashMap::TimeoutHashMapParameters parameters; + parameters.bucketCountExponent = tableSize; + parameters.timeout = timeout; + + FlowGrouper::FlowGrouper flowGrouper(parameters); + flowGrouper.setTelemetryDirectory(telemetryFlowGrouperDirectory); + inInterface.setRequieredFormat( + "uint16 SRC_PORT, uint16 DST_PORT, ipaddr DST_IP,ipaddr SRC_IP, " + "uint8 PROTOCOL"); + + processUnirecRecords(inInterface, outInterface, flowGrouper); + + } catch (std::exception& ex) { + logger->error(ex.what()); + return EXIT_FAILURE; + } + + return EXIT_SUCCESS; +} diff --git a/modules/flowGrouper/tests/test.sh b/modules/flowGrouper/tests/test.sh new file mode 100755 index 00000000..2626e173 --- /dev/null +++ b/modules/flowGrouper/tests/test.sh @@ -0,0 +1,123 @@ +#!/bin/bash + +function exit_with_error { + pkill logger + pkill logreplay + pkill flowGrouper + exit 1 +} + +function process_started { + pid=$1 + if ! ps -p $pid > /dev/null + then + echo "Failed to start process" + exit_with_error + fi +} + + +function compare_files { +file1=$1 +file2=$2 + +# Skip header and compare +tail -n +2 "$file1" > /tmp/f1_data +tail -n +2 "$file2" > /tmp/f2_data + +# Create arrays to store flow keys +declare -A f1_keys +declare -A f2_keys + +# Read file1 and store FLOW_GROUP_KEY (column 3) for each row +line_num=0 +while IFS=',' read -r dst_ip src_ip flow_key rest; do + ((line_num++)) + f1_keys[$line_num]=$flow_key +done < /tmp/f1_data + +# Read file2 and store FLOW_GROUP_KEY (column 3) for each row +line_num=0 +while IFS=',' read -r dst_ip src_ip flow_key rest; do + ((line_num++)) + f2_keys[$line_num]=$flow_key +done < /tmp/f2_data + +# Compare: if two rows have same flow_key in file1, they must have same flow_key in file2 +errors=0 +total_lines=${#f1_keys[@]} + +for ((i=1; i<=total_lines; i++)); do + for ((j=i+1; j<=total_lines; j++)); do + # Check if same in file1 + if [ "${f1_keys[$i]}" == "${f1_keys[$j]}" ]; then + # Must be same in file2 + if [ "${f2_keys[$i]}" != "${f2_keys[$j]}" ]; then + echo "ERROR: Lines $i and $j have same FLOW_KEY in file1 (${f1_keys[$i]}) but different in file2 (${f2_keys[$i]} vs ${f2_keys[$j]})" + ((errors++)) + fi + else + # Must be different in file2 + if [ "${f2_keys[$i]}" == "${f2_keys[$j]}" ]; then + echo "ERROR: Lines $i and $j have different FLOW_KEY in file1 (${f1_keys[$i]} vs ${f1_keys[$j]}) but same in file2 (${f2_keys[$i]})" + ((errors++)) + fi + fi + done +done +rm /tmp/f1_data /tmp/f2_data + +if [ $errors -eq 0 ]; then + return 0 +else + echo "FAILED: Found $errors inconsistencies" + return 1 +fi +} + + +data_path="$(dirname "$0")/testsData/" +flowGrouper=$1 + +set -e +trap 'echo "Command \"$BASH_COMMAND\" failed!"; exit_with_error' ERR +for input_file in $data_path/inputs/*; do + index=$(echo "$input_file" | grep -o '[0-9]\+') + echo "Running test $index" + + res_file="/tmp/res" + logger -i "u:flowGrouper" -t -w $res_file & + logger_pid=$! + sleep 0.1 + + process_started $logger_pid + + $flowGrouper \ + -i "u:din,u:flowGrouper" & + + detector_pid=$! + sleep 0.1 + process_started $detector_pid + + logreplay -i "u:din" -f "$data_path/inputs/input$index.csv" 2>/dev/null & + sleep 0.1 + process_started $! + + wait $logger_pid + wait $detector_pid + + if [ -f "$res_file" ]; then + if compare_files "$data_path/results/res$index.csv" "$res_file" ; then + echo "Test $index passed" + else + echo "FLOW_GROUP_KEY equivalence patterns are NOT consistent" + exit_with_error + fi + else + echo "File $res_file not found" + exit_with_error + fi +done + +echo "All tests passed" +exit 0 diff --git a/modules/flowGrouper/tests/testsData/inputs/input1.csv b/modules/flowGrouper/tests/testsData/inputs/input1.csv new file mode 100644 index 00000000..8bc38e71 --- /dev/null +++ b/modules/flowGrouper/tests/testsData/inputs/input1.csv @@ -0,0 +1,7 @@ +ipaddr SRC_IP, ipaddr DST_IP, uint16 SRC_PORT, uint16 DST_PORT, uint8 PROTOCOL, uint64 LINK_BIT_FIELD, time TIME_LAST +154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z +54.175.29.123,54.175.219.24,123,80,6,10,2020-01-01T00:00:02Z +54.175.21.66,54.175.219.77,443,22,6,10,2020-01-01T00:00:03Z +54.175.3.17,54.175.219.99,8080,53,17,10,2020-01-01T00:00:04Z +154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z +154.175.219.8,155.175.219.8,123,123,6,11,2020-01-01T00:00:01Z diff --git a/modules/flowGrouper/tests/testsData/results/res1.csv b/modules/flowGrouper/tests/testsData/results/res1.csv new file mode 100644 index 00000000..11cb0662 --- /dev/null +++ b/modules/flowGrouper/tests/testsData/results/res1.csv @@ -0,0 +1,7 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint64 FLOW_GROUP_KEY,uint64 LINK_BIT_FIELD,time TIME_LAST,uint16 DST_PORT,uint16 SRC_PORT,uint8 PROTOCOL +155.175.219.8,154.175.219.8,1763465260,10,2020-01-01T00:00:01.000000,123,123,6 +54.175.219.24,54.175.29.123,1763465261,10,2020-01-01T00:00:02.000000,80,123,6 +54.175.219.77,54.175.21.66,1763465261,10,2020-01-01T00:00:03.000000,22,443,6 +54.175.219.99,54.175.3.17,1763465261,10,2020-01-01T00:00:04.000000,53,8080,17 +155.175.219.8,154.175.219.8,1763465260,10,2020-01-01T00:00:01.000000,123,123,6 +155.175.219.8,154.175.219.8,1763465260,11,2020-01-01T00:00:01.000000,123,123,6