From 04bae718d3306398cc1f3e0237a2a319ffb74c02 Mon Sep 17 00:00:00 2001 From: GalaxP Date: Fri, 24 Oct 2025 16:16:22 +0200 Subject: [PATCH] Flowscatter - RSS mode --- modules/flowScatter/src/flowScatter.cpp | 39 +++++++++++++++++++++++++ modules/flowScatter/src/flowScatter.hpp | 4 +++ 2 files changed, 43 insertions(+) diff --git a/modules/flowScatter/src/flowScatter.cpp b/modules/flowScatter/src/flowScatter.cpp index 5f9c2a8..3d44b24 100644 --- a/modules/flowScatter/src/flowScatter.cpp +++ b/modules/flowScatter/src/flowScatter.cpp @@ -266,11 +266,19 @@ void FlowScatter::ruleParse(const std::string& rule) { m_rules.branches.clear(); m_cachedBranches.clear(); + m_rssMode = false; + m_rss_src_id = 0; + m_rss_dst_id = 0; if (rule.empty()) { return; // No rules defined, nothing to parse } + if (rule == "rss") { + m_rssMode = true; + return; + } + std::istringstream ruleStream(rule); std::string branchStr; @@ -306,6 +314,18 @@ void FlowScatter::changeTemplate() { m_cachedBranches.clear(); + if (m_rssMode) { + int const srcId = ur_get_id_by_name("SRC_IP"); + int const dstId = ur_get_id_by_name("DST_IP"); + if (srcId < 0 || dstId < 0) { + throw std::runtime_error("RSS mode requires SRC_IP and DST_IP fields in template"); + } + m_rss_src_id = static_cast(srcId); + m_rss_dst_id = static_cast(dstId); + + return; + } + for (const auto& branch : m_rules.branches) { CachedBranch cachedBranch; @@ -344,6 +364,25 @@ size_t FlowScatter::outputIndex(const UnirecRecordView& record) { m_totalRecords++; + if (m_rssMode) { + auto srcIp = record.getFieldAsType(m_rss_src_id); + auto dstIp = record.getFieldAsType(m_rss_dst_id); + + IpAddress orResult; + const uint8_t* srcBytes = reinterpret_cast(&srcIp); + const uint8_t* dstBytes = reinterpret_cast(&dstIp); + uint8_t* resultBytes = reinterpret_cast(&orResult); + + for (size_t i = 0; i < sizeof(IpAddress); ++i) { + resultBytes[i] = srcBytes[i] | dstBytes[i]; + } + + auto hashValue = XXH64(&orResult, sizeof(IpAddress), 0xdeadd00de); + auto index = static_cast(hashValue % M_NUM_OUTPUTS); + m_sentRecords[index]++; + return index; + } + // If no rules are defined, distribute records round-robin if (m_rules.branches.empty()) { size_t const index = (m_totalRecords - 1) % M_NUM_OUTPUTS; diff --git a/modules/flowScatter/src/flowScatter.hpp b/modules/flowScatter/src/flowScatter.hpp index 65fdf50..e570a39 100644 --- a/modules/flowScatter/src/flowScatter.hpp +++ b/modules/flowScatter/src/flowScatter.hpp @@ -124,6 +124,10 @@ class FlowScatter { }; std::vector m_cachedBranches; + + bool m_rssMode = false; + ur_field_id_t m_rss_src_id = 0; + ur_field_id_t m_rss_dst_id = 0; }; } // namespace Fs