Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions modules/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ add_subdirectory(sampler)
add_subdirectory(telemetry)
add_subdirectory(deduplicator)
add_subdirectory(clickhouse)
add_subdirectory(flowGrouper)
add_subdirectory(flowScatter)
add_subdirectory(fieldClassifier)
2 changes: 1 addition & 1 deletion modules/deduplicator/src/timeoutHashMap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ class TimeoutHashMap {
}

private:
std::function<size_t(const FlowKey&)> m_hasher;
std::function<size_t(const Key&)> m_hasher;
typename HashMapTimeoutBucket::TimeoutBucketCallables m_timeoutBucketCallables;
std::vector<HashMapTimeoutBucket> m_buckets;
const uint64_t M_BUCKET_MASK;
Expand Down
1 change: 1 addition & 0 deletions modules/flowGrouper/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_subdirectory(src)
70 changes: 70 additions & 0 deletions modules/flowGrouper/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# FlowGrouper module - README

## Description
FlowGrouper groups Unirec flow records that share the same 5-tuple (source IP,
destination IP, source port, destination port and protocol) within a configurable
time window and assigns a stable `FLOW_GROUP_KEY` to all records that belong to
the same group.

This module is useful when aggregating flow records that may be
received multiple times (e.g., from multiple exporters).


## Interfaces
- Input: 1
- Output: 1

## Required Unirec Fields
The module expects the input Unirec template to contain the following fields:
- `SRC_IP` (ipaddr)
- `DST_IP` (ipaddr)
- `SRC_PORT` (uint16)
- `DST_PORT` (uint16)
- `PROTOCOL` (uint8)

FlowGrouper will extend the template by adding `uint64 FLOW_GROUP_KEY` to the output records.

## Parameters
Command-line parameters follow the TRAP / Unirec conventions. The main module
parameters are:

- `-s, --size <int>` Exponent N for the hash map size (2^N entries). Default value is 15
- `-t, --timeout <int>` Time to consider similar flows as duplicates in milliseconds. Default value is 5000 (5s)

- `-m, --appfs-mountpoint <path>` Path where the appFs directory will be mounted

### Common TRAP / Unirec parameters
- `-h` : print help and module-specific parameters
- `-v`, `-vv`, `-vvv` : verbosity levels

## How Flow Grouping Works
- Records are grouped when they arrive within the configured `--timeout`
interval and share the same `SRC_IP`, `DST_IP`, `SRC_PORT`, `DST_PORT` and
`PROTOCOL` values.
- When a record arrives and no existing group matches, a new `FLOW_GROUP_KEY`
is created and stored in an internal timeout hash map keyed by the 5-tuple.
- Subsequent records that match the tuple within the timeout receive the same`FLOW_GROUP_KEY`.
Note: FLOW_GROUP_KEY is not unique identifier. It identifies records that belong to the same group only in the context of the 5-tuple (SRC_IP, DST_IP, SRC_PORT, DST_PORT, PROTOCOL).
## Telemetry data format

```
├─ input/
│ └─ stats
└─ flowGrouper/
└─ statistics
```

Telemetry counters include:
- **Inserted groups:** number of newly created flow groups
- **Replaced groups:** number of times an existing bucket entry was replaced with new group
- **Found groups:** number of times a matching group was found for an input record


## Usage Examples
Process Unirec records from a TRAP input and forward them with an added
`FLOW_GROUP_KEY`. The example sets the hash map exponent to `15` (2^15 entries)
and timeout to `1000` ms:

```
$ FlowGrouper -i "u:in,u:out" -s 15 -t 1000
```
18 changes: 18 additions & 0 deletions modules/flowGrouper/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
add_executable(flowGrouper
main.cpp
flowGrouper.cpp
)

target_link_libraries(flowGrouper PRIVATE
telemetry::telemetry
telemetry::appFs
common
rapidcsv
unirec::unirec++
unirec::unirec
trap::trap
argparse
xxhash
)

install(TARGETS flowGrouper DESTINATION ${INSTALL_DIR_BIN})
100 changes: 100 additions & 0 deletions modules/flowGrouper/src/flowGrouper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@

#include "flowGrouper.hpp"

#include <stdexcept>
#include <type_traits>
#include <xxhash.h>

using namespace Nemea;

namespace FlowGrouper {

template <typename Key>
static uint64_t xxHasher(const Key& key)
{
return XXH3_64bits(reinterpret_cast<const void*>(&key), sizeof(key));
}

static FlowGrouper::Timestamp timeSum(const FlowGrouper::Timestamp& value, uint64_t timeout)
{
return value + std::chrono::milliseconds(timeout);
}

static ur_field_id_t getUnirecIdByName(const char* str)
{
auto unirecId = ur_get_id_by_name(str);
if (unirecId == UR_E_INVALID_NAME) {
throw std::runtime_error(std::string("Invalid Unirec name:") + str);
}
return static_cast<ur_field_id_t>(unirecId);
}

FlowGrouper::FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters)
: m_hashMap(parameters, xxHasher<FlowKey>, std::less<>(), timeSum)
{
constexpr const size_t timeoutBucketSize = 256;
static_assert(
sizeof(FlowGrouperHashMap::HashMapTimeoutBucket) == timeoutBucketSize,
"TimeoutBucket size is not 256 bytes");
}

void FlowGrouper::updateUnirecIds()
{
m_ids.srcIpId = getUnirecIdByName("SRC_IP");
m_ids.dstIpId = getUnirecIdByName("DST_IP");
m_ids.srcPortId = getUnirecIdByName("SRC_PORT");
m_ids.dstPortId = getUnirecIdByName("DST_PORT");
m_ids.protocolId = getUnirecIdByName("PROTOCOL");
m_ids.flowGroupKeyId = getUnirecIdByName(getOutputFieldName().c_str());

}

FlowGrouper::FlowGroupKey FlowGrouper::getFlowKey(Nemea::UnirecRecordView& view)
{
FlowKey flowKey;
flowKey.srcIp = view.getFieldAsType<IpAddress>(m_ids.srcIpId);
flowKey.dstIp = view.getFieldAsType<IpAddress>(m_ids.dstIpId);
flowKey.srcPort = view.getFieldAsType<uint16_t>(m_ids.srcPortId);
flowKey.dstPort = view.getFieldAsType<uint16_t>(m_ids.dstPortId);
flowKey.proto = view.getFieldAsType<uint8_t>(m_ids.protocolId);

const FlowGrouper::FlowGroupKey newFlowKey = (FlowGrouper::FlowGroupKey) std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
const auto [it, insertResult]
= m_hashMap.insert({flowKey, newFlowKey}, std::chrono::steady_clock::now());

if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::INSERTED) {
m_newInserted++;
return newFlowKey;
}
if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::REPLACED) {
m_replaced++;
return newFlowKey;
}
m_found++;
return *it;
}

void FlowGrouper::addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord)
{
const FlowGrouper::FlowGroupKey flowKey = getFlowKey(inputRecord);
outputRecord.setFieldFromType<uint64_t>(flowKey,m_ids.flowGroupKeyId);
}

void FlowGrouper::setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory)
{
m_holder.add(directory);

const telemetry::FileOps fileOps
= {[this]() {
telemetry::Dict dict;
dict["replacedCount"] = telemetry::Scalar((long unsigned int) m_replaced);
dict["newInsertedCount"] = telemetry::Scalar((long unsigned int) m_newInserted);
dict["foundCount"] = telemetry::Scalar((long unsigned int) m_found);
return dict;
},
nullptr};

m_holder.add(directory->addFile("statistics", fileOps));
}

} // namespace FlowGrouper
133 changes: 133 additions & 0 deletions modules/flowGrouper/src/flowGrouper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#pragma once

#include "../../deduplicator/src/timeoutHashMap.hpp"

#include <atomic>
#include <memory>
#include <telemetry.hpp>
#include <thread>
#include <unirec++/unirecRecordView.hpp>
#include <unirec++/unirecRecord.hpp>
#include <unirec++/urTime.hpp>
#include <vector>

namespace FlowGrouper {

/**
* @brief FlowGrouper class to add same flowID to duplicate records
*/
class FlowGrouper {
public:
/**
* @brief Timestamp type used by flowGrouper.
*/
using Timestamp = std::chrono::time_point<std::chrono::steady_clock>;

/**
* @brief Field type representing Flow ID (FLOW_GROUP_KEY).
*/
using FlowGroupKey = uint64_t;

/**
* @brief Represents key fields of flow that belong to the same group.
*/
struct FlowKey {
Nemea::IpAddress srcIp; ///< Source IP address.
Nemea::IpAddress dstIp; ///< Destination IP address.
uint16_t srcPort; ///< Source port.
uint16_t dstPort; ///< Destination port.
uint8_t proto; ///< Protocol ID.
};



/**
* @brief Timeout hash map type used by FlowGrouper.
*/
using FlowGrouperHashMap = Deduplicator::TimeoutHashMap<
FlowKey,
FlowGroupKey,
Timestamp,
std::function<size_t(const FlowKey&)>,
std::function<bool(const Timestamp&, const Timestamp&)>,
std::function<Timestamp(const Timestamp&, uint64_t)>>;

static inline const uint64_t DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s
static inline const uint32_t DEFAULT_HASHMAP_EXPONENT = 20; ///< Default size exponent - 2^20 entries

/**
* @brief FlowGrouper constructor
*
* @param parameters Parameters to build hash table of flowGrouper
*/
explicit FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters);

/**
* @brief Checks if the given UnirecRecordView group already exists in the hash map if not adds it.
* @param view The Unirec record to check.
* @return FlowGroupKey of the flow.
*/
FlowGroupKey getFlowKey(Nemea::UnirecRecordView& view);

/**
* @brief Adds FLOW_GROUP_KEY field to the output Unirec record.
* @param inputRecord The input Unirec record view to get field values from.
* @param outputRecord The output Unirec record where FLOW_GROUP_KEY will be added.
*/
void addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord);

/**
* @brief Sets the telemetry directory for the flowGrouper.
* @param directory directory for flowGrouper telemetry.
*/
void setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory);

/**
* @brief Update Unirec Id of required fields after template format change.
*/
void updateUnirecIds();

/**
* @brief Gets the name of the output field added by FlowGrouper.
* @return Name of the output field.
*/
static std::string getOutputFieldName() {
return "FLOW_GROUP_KEY";
}

/**
* @brief Gets the output template string after adding FLOW_GROUP_KEY field.
* @param inputTemplate The input Unirec template string.
* @return The output Unirec template string with FLOW_GROUP_KEY field added.
*/
static std::string getOutputTemplate(std::string inputTemplate) {
//check if input template already contains output field
if (inputTemplate.find(" "+getOutputFieldName()) != std::string::npos) {
return inputTemplate;
}
return inputTemplate + ", uint64 " + getOutputFieldName();
}

private:
FlowGrouperHashMap m_hashMap; ///< Hash map to keep flows

uint32_t m_newInserted {0}; ///< Count of new groups
uint32_t m_replaced {0}; ///< Count of replaced groups
uint32_t m_found {0}; ///< Count of when groupkey was found

telemetry::Holder m_holder; ///< Telemetry holder

struct UnirecIdStorage {
ur_field_id_t srcIpId; ///< Unirec ID of source ip.
ur_field_id_t dstIpId; ///< Unirec ID of destination ip.
ur_field_id_t srcPortId; ///< Unirec ID of source port.
ur_field_id_t dstPortId; ///< Unirec ID of destination port.
ur_field_id_t protocolId; ///< Unirec ID of protocol field.

ur_field_id_t flowGroupKeyId; ///< Unirec ID of FLOW_GROUP_KEY field.
};

UnirecIdStorage m_ids; ///< Ids of Unirec fields used by flowGrouper module
};

} // namespace FlowGrouper
Loading
Loading