From ca1c9adf4c7b65f0bce1d077875c3d8119753024 Mon Sep 17 00:00:00 2001 From: David Kezlinek <103315902+Dakevid@users.noreply.github.com> Date: Fri, 24 Oct 2025 15:26:24 +0200 Subject: [PATCH 1/3] Deduplicator - fix input parameters and init template change --- modules/deduplicator/src/deduplicator.hpp | 2 +- modules/deduplicator/src/main.cpp | 11 ++++++----- modules/deduplicator/src/timeoutHashMap.hpp | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/modules/deduplicator/src/deduplicator.hpp b/modules/deduplicator/src/deduplicator.hpp index 56a27f35..34055494 100644 --- a/modules/deduplicator/src/deduplicator.hpp +++ b/modules/deduplicator/src/deduplicator.hpp @@ -46,7 +46,7 @@ class Deduplicator { std::function, std::function>; - static inline const int DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s + static inline const uint64_t DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s /** * @brief Deduplicator constructor diff --git a/modules/deduplicator/src/main.cpp b/modules/deduplicator/src/main.cpp index 984c4afa..a38ac3ba 100644 --- a/modules/deduplicator/src/main.cpp +++ b/modules/deduplicator/src/main.cpp @@ -112,16 +112,16 @@ int main(int argc, char** argv) try { program.add_argument("-s", "--size") .required() - .help("Size of the hash map. Default value is 2^20 for 1'048'576 records.") + .help("Exponent N for the hash map size (2^N entries). Default: 20 (~1 048 576).") .default_value(Deduplicator::Deduplicator::DeduplicatorHashMap:: TimeoutHashMapParameters::DEFAULT_HASHMAP_EXPONENT) - .scan<'i', int>(); + .scan<'u', uint32_t>(); program.add_argument("-t", "--timeout") .required() .help( - "Count of millisecond to consider flows as duplicates. Default value is 5000(5s).") + "Count of millisecond to consider flows as duplicates. Default value is 5000 (5s).") .default_value(Deduplicator::Deduplicator::DEFAULT_HASHMAP_TIMEOUT) - .scan<'i', int>(); + .scan<'u', uint64_t>(); program.add_argument("-m", "--appfs-mountpoint") .required() .help("path where the appFs directory will be mounted") @@ -161,7 +161,7 @@ int main(int argc, char** argv) std::cerr << "Table size must be at least 8.\n"; return EXIT_FAILURE; } - const auto timeout = program.get("--timeout"); + const auto timeout = program.get("--timeout"); if (timeout <= 0) { std::cerr << "Timeout must be higher than zero.\n"; return EXIT_FAILURE; @@ -185,6 +185,7 @@ int main(int argc, char** argv) biInterface.setRequieredFormat( "uint16 SRC_PORT, uint16 DST_PORT, ipaddr DST_IP,ipaddr SRC_IP, uint64 LINK_BIT_FIELD, " "uint8 PROTOCOL, time TIME_LAST"); + deduplicator.updateUnirecIds(); processUnirecRecords(biInterface, deduplicator); } catch (std::exception& ex) { diff --git a/modules/deduplicator/src/timeoutHashMap.hpp b/modules/deduplicator/src/timeoutHashMap.hpp index 5b695366..12ed8382 100644 --- a/modules/deduplicator/src/timeoutHashMap.hpp +++ b/modules/deduplicator/src/timeoutHashMap.hpp @@ -212,7 +212,7 @@ class TimeoutHashMap { /** * @brief Default size for the hash map. */ - static inline const int DEFAULT_HASHMAP_EXPONENT = 20; // 1'048'576 records + static inline const uint32_t DEFAULT_HASHMAP_EXPONENT = 20; // 1'048'576 records uint32_t bucketCountExponent; ///< Total amount of records in table uint64_t timeout; ///< Time interval to consider flow unique From f3d3c34ace81b1cf7e2aa51bd50e2b2589f7d639 Mon Sep 17 00:00:00 2001 From: David Kezlinek <103315902+Dakevid@users.noreply.github.com> Date: Fri, 24 Oct 2025 15:26:40 +0200 Subject: [PATCH 2/3] Deduplicator - Add tests --- modules/deduplicator/tests/test.sh | 61 +++++++++++++++++++ .../tests/testsData/inputs/input1.csv | 9 +++ .../tests/testsData/results/res1.csv | 6 ++ 3 files changed, 76 insertions(+) create mode 100755 modules/deduplicator/tests/test.sh create mode 100644 modules/deduplicator/tests/testsData/inputs/input1.csv create mode 100644 modules/deduplicator/tests/testsData/results/res1.csv diff --git a/modules/deduplicator/tests/test.sh b/modules/deduplicator/tests/test.sh new file mode 100755 index 00000000..1a4f8fb7 --- /dev/null +++ b/modules/deduplicator/tests/test.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +function exit_with_error { + pkill logger + pkill logreplay + pkill deduplicator + exit 1 +} + +function process_started { + pid=$1 + if ! ps -p $pid > /dev/null + then + echo "Failed to start process" + exit_with_error + fi +} + +data_path="$(dirname "$0")/testsData/" +deduplicator=$1 + +set -e +trap 'echo "Command \"$BASH_COMMAND\" failed!"; exit_with_error' ERR +for input_file in $data_path/inputs/*; do + index=$(echo "$input_file" | grep -o '[0-9]\+') + echo "Running test $index" + + res_file="/tmp/res" + logger -i "u:deduplicator" -w $res_file & + logger_pid=$! + sleep 0.1 + + process_started $logger_pid + + $deduplicator \ + -i "u:din,u:deduplicator" & + + detector_pid=$! + sleep 0.1 + process_started $detector_pid + + logreplay -i "u:din" -f "$data_path/inputs/input$index.csv" 2>/dev/null & + sleep 0.1 + process_started $! + + wait $logger_pid + wait $detector_pid + + if [ -f "$res_file" ]; then + if ! cmp -s "$data_path/results/res$index.csv" "$res_file"; then + echo "Files results/res$index.csv and $res_file are not equal" + exit_with_error + fi + else + echo "File $res_file not found" + exit_with_error + fi +done + +echo "All tests passed" +exit 0 diff --git a/modules/deduplicator/tests/testsData/inputs/input1.csv b/modules/deduplicator/tests/testsData/inputs/input1.csv new file mode 100644 index 00000000..b455482d --- /dev/null +++ b/modules/deduplicator/tests/testsData/inputs/input1.csv @@ -0,0 +1,9 @@ +ipaddr SRC_IP, ipaddr DST_IP, uint16 SRC_PORT, uint16 DST_PORT, uint8 PROTOCOL, uint64 LINK_BIT_FIELD, time TIME_LAST +154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z +54.175.29.123,54.175.219.24,123,80,6,10,2020-01-01T00:00:02Z +54.175.21.66,54.175.219.77,443,22,6,10,2020-01-01T00:00:03Z +54.175.3.17,54.175.219.99,8080,53,17,10,2020-01-01T00:00:04Z +154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z +154.175.219.8,155.175.219.8,123,123,6,11,2020-01-01T00:00:01Z +154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z +154.175.219.8,155.175.219.8,123,123,6,11,2020-01-01T00:00:01Z diff --git a/modules/deduplicator/tests/testsData/results/res1.csv b/modules/deduplicator/tests/testsData/results/res1.csv new file mode 100644 index 00000000..1f34da1e --- /dev/null +++ b/modules/deduplicator/tests/testsData/results/res1.csv @@ -0,0 +1,6 @@ +155.175.219.8,154.175.219.8,10,2020-01-01T00:00:01.000000,123,123,6 +54.175.219.24,54.175.29.123,10,2020-01-01T00:00:02.000000,80,123,6 +54.175.219.77,54.175.21.66,10,2020-01-01T00:00:03.000000,22,443,6 +54.175.219.99,54.175.3.17,10,2020-01-01T00:00:04.000000,53,8080,17 +155.175.219.8,154.175.219.8,10,2020-01-01T00:00:01.000000,123,123,6 +155.175.219.8,154.175.219.8,10,2020-01-01T00:00:01.000000,123,123,6 From 6990f817502f176b8746e60892661e07ea31d6ec Mon Sep 17 00:00:00 2001 From: GalaxP Date: Fri, 14 Nov 2025 20:38:35 +0100 Subject: [PATCH 3/3] Clickhouse nullable types --- modules/clickhouse/src/config.cpp | 11 +++- modules/clickhouse/src/config.hpp | 1 + modules/clickhouse/src/datatype.cpp | 88 ++++++++++++++++++++--------- modules/clickhouse/src/datatype.hpp | 10 +++- modules/clickhouse/src/inserter.cpp | 17 ++---- modules/clickhouse/src/manager.cpp | 5 +- 6 files changed, 86 insertions(+), 46 deletions(-) diff --git a/modules/clickhouse/src/config.cpp b/modules/clickhouse/src/config.cpp index 7ea49d6a..75f1c697 100644 --- a/modules/clickhouse/src/config.cpp +++ b/modules/clickhouse/src/config.cpp @@ -110,15 +110,22 @@ static void parseColumns(const YAML::Node& columnsNode, Config& config) Config::Column column; size_t const spacePos = colValue.find(' '); - std::string const type = colValue.substr(0, spacePos); + std::string type = colValue.substr(0, spacePos); std::string name = colValue.substr(spacePos + 1); + // Check for ! suffix indicating non-nullable column + column.nullable = true; + if (!type.empty() && type.back() == '!') { + column.nullable = false; + type.pop_back(); // Remove the '!' suffix + } + try { column.type = g_string_to_columntype.at(type); } catch (std::out_of_range& ex) { std::stringstream sstream; - sstream << "Incorrect column type: " << colValue.substr(0, spacePos); + sstream << "Incorrect column type: " << type; throw std::runtime_error(sstream.str()); } diff --git a/modules/clickhouse/src/config.hpp b/modules/clickhouse/src/config.hpp index 02200a47..7c18abc7 100644 --- a/modules/clickhouse/src/config.hpp +++ b/modules/clickhouse/src/config.hpp @@ -79,6 +79,7 @@ struct Config { std::string name; ///< column name ColumnType type; ///< column type ur_field_id_t fieldID; ///< column unirec id + bool nullable = true; ///< whether the column accepts NULL values (default: true) }; /** diff --git a/modules/clickhouse/src/datatype.cpp b/modules/clickhouse/src/datatype.cpp index 0e05ed48..04bfbb0a 100644 --- a/modules/clickhouse/src/datatype.cpp +++ b/modules/clickhouse/src/datatype.cpp @@ -593,23 +593,31 @@ static std::shared_ptr makeArrColumn(ColumnType type) return column; } -static std::shared_ptr makeNonArrColumn(ColumnType type) +static std::shared_ptr makeNonArrColumn(ColumnType type, bool nullable) { std::shared_ptr column; - visit(type, [&](auto traits) { - using ColType = clickhouse::ColumnNullableT; - column = std::make_shared(); - }); + + if (nullable) { + visit(type, [&](auto traits) { + using ColType = clickhouse::ColumnNullableT; + column = std::make_shared(); + }); + } else { + visit(type, [&](auto traits) { + using ColType = typename decltype(traits)::ColumnType; + column = std::make_shared(); + }); + } return column; } -std::shared_ptr makeColumn(ColumnType type) +std::shared_ptr makeColumn(ColumnType type, bool nullable) { if (isArr(type)) { return makeArrColumn(type); } - return makeNonArrColumn(type); + return makeNonArrColumn(type, nullable); } GetterFn makeGetter(ColumnType type) @@ -644,40 +652,68 @@ static ColumnWriterFn makeArrColumnwriter(ColumnType type) return columnwriter; } -static ColumnWriterFn makeNonArrColumnwriter(ColumnType type) +static ColumnWriterFn makeNonArrColumnwriter(ColumnType type, bool nullable) { ColumnWriterFn columnwriter; - visitNonArr(type, [&](auto traits) { - columnwriter = [](ValueVariant* value, clickhouse::Column& column) { - using ColumnType = clickhouse::ColumnNullableT; - using ValueType = std::invoke_result_t< - decltype(decltype(traits)::GETTER), - Nemea::UnirecRecordView&, - ur_field_type_t>; - auto* col = dynamic_cast(&column); - if (!value) { - col->Append(std::nullopt); - } else { - col->Append(std::get(*value)); - } - }; - }); + if (nullable) { + visitNonArr(type, [&](auto traits) { + columnwriter = [](ValueVariant* value, clickhouse::Column& column) { + using ColumnType = clickhouse::ColumnNullableT; + using ValueType = std::invoke_result_t< + decltype(decltype(traits)::GETTER), + Nemea::UnirecRecordView&, + ur_field_type_t>; + auto* col = dynamic_cast(&column); + if (!value) { + col->Append(std::nullopt); + } else { + col->Append(std::get(*value)); + } + }; + }); + } else { + visitNonArr(type, [&](auto traits) { + columnwriter = [](ValueVariant* value, clickhouse::Column& column) { + using ColumnType = typename decltype(traits)::ColumnType; + using ValueType = std::invoke_result_t< + decltype(decltype(traits)::GETTER), + Nemea::UnirecRecordView&, + ur_field_type_t>; + auto* col = dynamic_cast(&column); + if (!value) { + throw std::runtime_error("NULL value for non-nullable column"); + } + // Cast to avoid sign conversion warnings for DateTime64 + if constexpr (std::is_same_v && std::is_same_v>) { + col->Append(static_cast(std::get(*value))); + } else { + col->Append(std::get(*value)); + } + }; + }); + } return columnwriter; } -ColumnWriterFn makeColumnwriter(ColumnType type) +ColumnWriterFn makeColumnwriter(ColumnType type, bool nullable) { if (isArr(type)) { return makeArrColumnwriter(type); } - return makeNonArrColumnwriter(type); + return makeNonArrColumnwriter(type, nullable); } -std::string typeToClickhouse(ColumnType type) +std::string typeToClickhouse(ColumnType type, bool nullable) { std::string result; visit(type, [&](auto traits) { result = traits.CLICKHOUSE_TYPE_NAME; }); + + // Don't wrap arrays in Nullable + if (nullable && !isArr(type)) { + result = "Nullable(" + result + ")"; + } + return result; } diff --git a/modules/clickhouse/src/datatype.hpp b/modules/clickhouse/src/datatype.hpp index 7ec39f71..0bbd2074 100644 --- a/modules/clickhouse/src/datatype.hpp +++ b/modules/clickhouse/src/datatype.hpp @@ -82,6 +82,7 @@ struct ColumnCtx { std::string name; ///< Column name ColumnType type; ///< Column type ur_field_id_t fieldID; ///< unirec template field id + bool nullable = true; ///< whether the column is nullable ColumnFactoryFn columnFactory = nullptr; ///< lambda for creating columns GetterFn getter = nullptr; ///< lambda for converting unirec data to clickhouse column @@ -121,9 +122,10 @@ const int g_TIME_PRECISION = 9; * @brief Make a ClickHouse column that is able to store values of the supplied data type * * @param type The data type + * @param nullable Whether the column should be nullable * @return The ClickHouse column object */ -std::shared_ptr makeColumn(ColumnType type); +std::shared_ptr makeColumn(ColumnType type, bool nullable = true); /** * @brief Makes a function (lambda) which converts unirec column data into clickhouse column @@ -137,14 +139,16 @@ GetterFn makeGetter(ColumnType type); * @brief Converts Columntype into clickhouse string specification of column * * @param type The data type + * @param nullable Whether the column is nullable * @return The ClickHouse column name */ -ColumnWriterFn makeColumnwriter(ColumnType type); +ColumnWriterFn makeColumnwriter(ColumnType type, bool nullable = true); /** * @brief Converts Columntype into clickhouse string specification of column * * @param type The data type + * @param nullable Whether the column is nullable * @return The ClickHouse column name */ -std::string typeToClickhouse(ColumnType type); +std::string typeToClickhouse(ColumnType type, bool nullable = true); diff --git a/modules/clickhouse/src/inserter.cpp b/modules/clickhouse/src/inserter.cpp index 0a67ac5a..e931a8de 100644 --- a/modules/clickhouse/src/inserter.cpp +++ b/modules/clickhouse/src/inserter.cpp @@ -124,7 +124,7 @@ static void ensureSchema( sstream << "CREATE TABLE " << table << "(\n"; size_t columnIndex = 0; for (const auto& column : columns) { - const auto& clickhouseType = typeToClickhouse(columns[columnIndex].type); + const auto& clickhouseType = typeToClickhouse(columns[columnIndex].type, columns[columnIndex].nullable); sstream << " \"" << column.name << "\" " << clickhouseType << (columnIndex < columns.size() - 1 ? "," : "") << '\n'; columnIndex++; @@ -143,18 +143,9 @@ static void ensureSchema( for (size_t i = 0; i < dbColumns.size(); i++) { const auto& expectedName = columns[i].name; - const auto& expectedType = typeToClickhouse(columns[i].type); + const auto& expectedType = typeToClickhouse(columns[i].type, columns[i].nullable); const auto& [actual_name, actual_type] = dbColumns[i]; - // strip Nullable(...) wrapper for comparison - std::string actualBaseType = actual_type; - static const std::string nullablePrefix = "Nullable("; - if (actual_type.rfind(nullablePrefix, 0) == 0 && actual_type.back() == ')') { - actualBaseType = actual_type.substr( - nullablePrefix.size(), - actual_type.size() - nullablePrefix.size() - 1); - } - if (expectedName != actual_name) { std::stringstream sstream; sstream << "Expected column #" << i << " in table \"" << table << "\" to be named \"" @@ -163,8 +154,8 @@ static void ensureSchema( throw std::runtime_error(sstream.str()); } - // compare expected to stripped actual type - if (expectedType != actualBaseType) { + // Compare expected type with actual type (exact match required for nullable vs non-nullable) + if (expectedType != actual_type) { std::stringstream sstream; sstream << "Expected column #" << i << " in table \"" << table << "\" to be of type \"" << expectedType << "\" but it is \"" << actual_type << "\"\n" diff --git a/modules/clickhouse/src/manager.cpp b/modules/clickhouse/src/manager.cpp index fb4037aa..ecbb3555 100644 --- a/modules/clickhouse/src/manager.cpp +++ b/modules/clickhouse/src/manager.cpp @@ -31,10 +31,11 @@ static std::vector prepareColumns(const std::vector& column.name = columnCfg.name; column.type = columnCfg.type; column.fieldID = columnCfg.type; + column.nullable = columnCfg.nullable; column.getter = makeGetter(columnCfg.type); - column.columnWriter = makeColumnwriter(columnCfg.type); - column.columnFactory = [=]() { return makeColumn(columnCfg.type); }; + column.columnWriter = makeColumnwriter(columnCfg.type, columnCfg.nullable); + column.columnFactory = [=]() { return makeColumn(columnCfg.type, columnCfg.nullable); }; columns.emplace_back(std::move(column)); }