Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions modules/clickhouse/src/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,22 @@ static void parseColumns(const YAML::Node& columnsNode, Config& config)
Config::Column column;
size_t const spacePos = colValue.find(' ');

std::string const type = colValue.substr(0, spacePos);
std::string type = colValue.substr(0, spacePos);
std::string name = colValue.substr(spacePos + 1);

// Check for ! suffix indicating non-nullable column
column.nullable = true;
if (!type.empty() && type.back() == '!') {
column.nullable = false;
type.pop_back(); // Remove the '!' suffix
}

try {
column.type = g_string_to_columntype.at(type);

} catch (std::out_of_range& ex) {
std::stringstream sstream;
sstream << "Incorrect column type: " << colValue.substr(0, spacePos);
sstream << "Incorrect column type: " << type;
throw std::runtime_error(sstream.str());
}

Expand Down
1 change: 1 addition & 0 deletions modules/clickhouse/src/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ struct Config {
std::string name; ///< column name
ColumnType type; ///< column type
ur_field_id_t fieldID; ///< column unirec id
bool nullable = true; ///< whether the column accepts NULL values (default: true)
};

/**
Expand Down
88 changes: 62 additions & 26 deletions modules/clickhouse/src/datatype.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,23 +593,31 @@ static std::shared_ptr<clickhouse::Column> makeArrColumn(ColumnType type)
return column;
}

static std::shared_ptr<clickhouse::Column> makeNonArrColumn(ColumnType type)
static std::shared_ptr<clickhouse::Column> makeNonArrColumn(ColumnType type, bool nullable)
{
std::shared_ptr<clickhouse::Column> column;
visit(type, [&](auto traits) {
using ColType = clickhouse::ColumnNullableT<typename decltype(traits)::ColumnType>;
column = std::make_shared<ColType>();
});

if (nullable) {
visit(type, [&](auto traits) {
using ColType = clickhouse::ColumnNullableT<typename decltype(traits)::ColumnType>;
column = std::make_shared<ColType>();
});
} else {
visit(type, [&](auto traits) {
using ColType = typename decltype(traits)::ColumnType;
column = std::make_shared<ColType>();
});
}

return column;
}

std::shared_ptr<clickhouse::Column> makeColumn(ColumnType type)
std::shared_ptr<clickhouse::Column> makeColumn(ColumnType type, bool nullable)
{
if (isArr(type)) {
return makeArrColumn(type);
}
return makeNonArrColumn(type);
return makeNonArrColumn(type, nullable);
}

GetterFn makeGetter(ColumnType type)
Expand Down Expand Up @@ -644,40 +652,68 @@ static ColumnWriterFn makeArrColumnwriter(ColumnType type)
return columnwriter;
}

static ColumnWriterFn makeNonArrColumnwriter(ColumnType type)
static ColumnWriterFn makeNonArrColumnwriter(ColumnType type, bool nullable)
{
ColumnWriterFn columnwriter;

visitNonArr(type, [&](auto traits) {
columnwriter = [](ValueVariant* value, clickhouse::Column& column) {
using ColumnType = clickhouse::ColumnNullableT<typename decltype(traits)::ColumnType>;
using ValueType = std::invoke_result_t<
decltype(decltype(traits)::GETTER),
Nemea::UnirecRecordView&,
ur_field_type_t>;
auto* col = dynamic_cast<ColumnType*>(&column);
if (!value) {
col->Append(std::nullopt);
} else {
col->Append(std::get<ValueType>(*value));
}
};
});
if (nullable) {
visitNonArr(type, [&](auto traits) {
columnwriter = [](ValueVariant* value, clickhouse::Column& column) {
using ColumnType = clickhouse::ColumnNullableT<typename decltype(traits)::ColumnType>;
using ValueType = std::invoke_result_t<
decltype(decltype(traits)::GETTER),
Nemea::UnirecRecordView&,
ur_field_type_t>;
auto* col = dynamic_cast<ColumnType*>(&column);
if (!value) {
col->Append(std::nullopt);
} else {
col->Append(std::get<ValueType>(*value));
}
};
});
} else {
visitNonArr(type, [&](auto traits) {
columnwriter = [](ValueVariant* value, clickhouse::Column& column) {
using ColumnType = typename decltype(traits)::ColumnType;
using ValueType = std::invoke_result_t<
decltype(decltype(traits)::GETTER),
Nemea::UnirecRecordView&,
ur_field_type_t>;
auto* col = dynamic_cast<ColumnType*>(&column);
if (!value) {
throw std::runtime_error("NULL value for non-nullable column");
}
// Cast to avoid sign conversion warnings for DateTime64
if constexpr (std::is_same_v<ValueType, uint64_t> && std::is_same_v<ColumnType, ColumnDateTime64<g_TIME_PRECISION>>) {
col->Append(static_cast<int64_t>(std::get<ValueType>(*value)));
} else {
col->Append(std::get<ValueType>(*value));
}
};
});
}

return columnwriter;
}

ColumnWriterFn makeColumnwriter(ColumnType type)
ColumnWriterFn makeColumnwriter(ColumnType type, bool nullable)
{
if (isArr(type)) {
return makeArrColumnwriter(type);
}
return makeNonArrColumnwriter(type);
return makeNonArrColumnwriter(type, nullable);
}

std::string typeToClickhouse(ColumnType type)
std::string typeToClickhouse(ColumnType type, bool nullable)
{
std::string result;
visit(type, [&](auto traits) { result = traits.CLICKHOUSE_TYPE_NAME; });

// Don't wrap arrays in Nullable
if (nullable && !isArr(type)) {
result = "Nullable(" + result + ")";
}

return result;
}
10 changes: 7 additions & 3 deletions modules/clickhouse/src/datatype.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ struct ColumnCtx {
std::string name; ///< Column name
ColumnType type; ///< Column type
ur_field_id_t fieldID; ///< unirec template field id
bool nullable = true; ///< whether the column is nullable

ColumnFactoryFn columnFactory = nullptr; ///< lambda for creating columns
GetterFn getter = nullptr; ///< lambda for converting unirec data to clickhouse column
Expand Down Expand Up @@ -121,9 +122,10 @@ const int g_TIME_PRECISION = 9;
* @brief Make a ClickHouse column that is able to store values of the supplied data type
*
* @param type The data type
* @param nullable Whether the column should be nullable
* @return The ClickHouse column object
*/
std::shared_ptr<clickhouse::Column> makeColumn(ColumnType type);
std::shared_ptr<clickhouse::Column> makeColumn(ColumnType type, bool nullable = true);

/**
* @brief Makes a function (lambda) which converts unirec column data into clickhouse column
Expand All @@ -137,14 +139,16 @@ GetterFn makeGetter(ColumnType type);
* @brief Converts Columntype into clickhouse string specification of column
*
* @param type The data type
* @param nullable Whether the column is nullable
* @return The ClickHouse column name
*/
ColumnWriterFn makeColumnwriter(ColumnType type);
ColumnWriterFn makeColumnwriter(ColumnType type, bool nullable = true);

/**
* @brief Converts Columntype into clickhouse string specification of column
*
* @param type The data type
* @param nullable Whether the column is nullable
* @return The ClickHouse column name
*/
std::string typeToClickhouse(ColumnType type);
std::string typeToClickhouse(ColumnType type, bool nullable = true);
17 changes: 4 additions & 13 deletions modules/clickhouse/src/inserter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ static void ensureSchema(
sstream << "CREATE TABLE " << table << "(\n";
size_t columnIndex = 0;
for (const auto& column : columns) {
const auto& clickhouseType = typeToClickhouse(columns[columnIndex].type);
const auto& clickhouseType = typeToClickhouse(columns[columnIndex].type, columns[columnIndex].nullable);
sstream << " \"" << column.name << "\" " << clickhouseType
<< (columnIndex < columns.size() - 1 ? "," : "") << '\n';
columnIndex++;
Expand All @@ -143,18 +143,9 @@ static void ensureSchema(

for (size_t i = 0; i < dbColumns.size(); i++) {
const auto& expectedName = columns[i].name;
const auto& expectedType = typeToClickhouse(columns[i].type);
const auto& expectedType = typeToClickhouse(columns[i].type, columns[i].nullable);
const auto& [actual_name, actual_type] = dbColumns[i];

// strip Nullable(...) wrapper for comparison
std::string actualBaseType = actual_type;
static const std::string nullablePrefix = "Nullable(";
if (actual_type.rfind(nullablePrefix, 0) == 0 && actual_type.back() == ')') {
actualBaseType = actual_type.substr(
nullablePrefix.size(),
actual_type.size() - nullablePrefix.size() - 1);
}

if (expectedName != actual_name) {
std::stringstream sstream;
sstream << "Expected column #" << i << " in table \"" << table << "\" to be named \""
Expand All @@ -163,8 +154,8 @@ static void ensureSchema(
throw std::runtime_error(sstream.str());
}

// compare expected to stripped actual type
if (expectedType != actualBaseType) {
// Compare expected type with actual type (exact match required for nullable vs non-nullable)
if (expectedType != actual_type) {
std::stringstream sstream;
sstream << "Expected column #" << i << " in table \"" << table << "\" to be of type \""
<< expectedType << "\" but it is \"" << actual_type << "\"\n"
Expand Down
5 changes: 3 additions & 2 deletions modules/clickhouse/src/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ static std::vector<ColumnCtx> prepareColumns(const std::vector<Config::Column>&
column.name = columnCfg.name;
column.type = columnCfg.type;
column.fieldID = columnCfg.type;
column.nullable = columnCfg.nullable;

column.getter = makeGetter(columnCfg.type);
column.columnWriter = makeColumnwriter(columnCfg.type);
column.columnFactory = [=]() { return makeColumn(columnCfg.type); };
column.columnWriter = makeColumnwriter(columnCfg.type, columnCfg.nullable);
column.columnFactory = [=]() { return makeColumn(columnCfg.type, columnCfg.nullable); };

columns.emplace_back(std::move(column));
}
Expand Down
2 changes: 1 addition & 1 deletion modules/deduplicator/src/deduplicator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Deduplicator {
std::function<bool(const Timestamp&, const Timestamp&)>,
std::function<Timestamp(const Timestamp&, uint64_t)>>;

static inline const int DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s
static inline const uint64_t DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s

/**
* @brief Deduplicator constructor
Expand Down
11 changes: 6 additions & 5 deletions modules/deduplicator/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,16 @@ int main(int argc, char** argv)
try {
program.add_argument("-s", "--size")
.required()
.help("Size of the hash map. Default value is 2^20 for 1'048'576 records.")
.help("Exponent N for the hash map size (2^N entries). Default: 20 (~1 048 576).")
.default_value(Deduplicator::Deduplicator::DeduplicatorHashMap::
TimeoutHashMapParameters::DEFAULT_HASHMAP_EXPONENT)
.scan<'i', int>();
.scan<'u', uint32_t>();
program.add_argument("-t", "--timeout")
.required()
.help(
"Count of millisecond to consider flows as duplicates. Default value is 5000(5s).")
"Count of millisecond to consider flows as duplicates. Default value is 5000 (5s).")
.default_value(Deduplicator::Deduplicator::DEFAULT_HASHMAP_TIMEOUT)
.scan<'i', int>();
.scan<'u', uint64_t>();
program.add_argument("-m", "--appfs-mountpoint")
.required()
.help("path where the appFs directory will be mounted")
Expand Down Expand Up @@ -161,7 +161,7 @@ int main(int argc, char** argv)
std::cerr << "Table size must be at least 8.\n";
return EXIT_FAILURE;
}
const auto timeout = program.get<uint32_t>("--timeout");
const auto timeout = program.get<uint64_t>("--timeout");
if (timeout <= 0) {
std::cerr << "Timeout must be higher than zero.\n";
return EXIT_FAILURE;
Expand All @@ -185,6 +185,7 @@ int main(int argc, char** argv)
biInterface.setRequieredFormat(
"uint16 SRC_PORT, uint16 DST_PORT, ipaddr DST_IP,ipaddr SRC_IP, uint64 LINK_BIT_FIELD, "
"uint8 PROTOCOL, time TIME_LAST");
deduplicator.updateUnirecIds();
processUnirecRecords(biInterface, deduplicator);

} catch (std::exception& ex) {
Expand Down
2 changes: 1 addition & 1 deletion modules/deduplicator/src/timeoutHashMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class TimeoutHashMap {
/**
* @brief Default size for the hash map.
*/
static inline const int DEFAULT_HASHMAP_EXPONENT = 20; // 1'048'576 records
static inline const uint32_t DEFAULT_HASHMAP_EXPONENT = 20; // 1'048'576 records

uint32_t bucketCountExponent; ///< Total amount of records in table
uint64_t timeout; ///< Time interval to consider flow unique
Expand Down
61 changes: 61 additions & 0 deletions modules/deduplicator/tests/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#!/bin/bash

function exit_with_error {
pkill logger
pkill logreplay
pkill deduplicator
exit 1
}

function process_started {
pid=$1
if ! ps -p $pid > /dev/null
then
echo "Failed to start process"
exit_with_error
fi
}

data_path="$(dirname "$0")/testsData/"
deduplicator=$1

set -e
trap 'echo "Command \"$BASH_COMMAND\" failed!"; exit_with_error' ERR
for input_file in $data_path/inputs/*; do
index=$(echo "$input_file" | grep -o '[0-9]\+')
echo "Running test $index"

res_file="/tmp/res"
logger -i "u:deduplicator" -w $res_file &
logger_pid=$!
sleep 0.1

process_started $logger_pid

$deduplicator \
-i "u:din,u:deduplicator" &

detector_pid=$!
sleep 0.1
process_started $detector_pid

logreplay -i "u:din" -f "$data_path/inputs/input$index.csv" 2>/dev/null &
sleep 0.1
process_started $!

wait $logger_pid
wait $detector_pid

if [ -f "$res_file" ]; then
if ! cmp -s "$data_path/results/res$index.csv" "$res_file"; then
echo "Files results/res$index.csv and $res_file are not equal"
exit_with_error
fi
else
echo "File $res_file not found"
exit_with_error
fi
done

echo "All tests passed"
exit 0
9 changes: 9 additions & 0 deletions modules/deduplicator/tests/testsData/inputs/input1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ipaddr SRC_IP, ipaddr DST_IP, uint16 SRC_PORT, uint16 DST_PORT, uint8 PROTOCOL, uint64 LINK_BIT_FIELD, time TIME_LAST
154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z
54.175.29.123,54.175.219.24,123,80,6,10,2020-01-01T00:00:02Z
54.175.21.66,54.175.219.77,443,22,6,10,2020-01-01T00:00:03Z
54.175.3.17,54.175.219.99,8080,53,17,10,2020-01-01T00:00:04Z
154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z
154.175.219.8,155.175.219.8,123,123,6,11,2020-01-01T00:00:01Z
154.175.219.8,155.175.219.8,123,123,6,10,2020-01-01T00:00:01Z
154.175.219.8,155.175.219.8,123,123,6,11,2020-01-01T00:00:01Z
6 changes: 6 additions & 0 deletions modules/deduplicator/tests/testsData/results/res1.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
155.175.219.8,154.175.219.8,10,2020-01-01T00:00:01.000000,123,123,6
54.175.219.24,54.175.29.123,10,2020-01-01T00:00:02.000000,80,123,6
54.175.219.77,54.175.21.66,10,2020-01-01T00:00:03.000000,22,443,6
54.175.219.99,54.175.3.17,10,2020-01-01T00:00:04.000000,53,8080,17
155.175.219.8,154.175.219.8,10,2020-01-01T00:00:01.000000,123,123,6
155.175.219.8,154.175.219.8,10,2020-01-01T00:00:01.000000,123,123,6