diff --git a/Makefile.am b/Makefile.am index 2b95a9fd..54c07634 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1,10 +1,10 @@ ACLOCAL_AMFLAGS = -I m4 +# Biflow aggregator depends on logger and logreplay SUBDIRS= \ aggregator \ anonymizer \ backscatter \ -biflow_aggregator \ debug_sender \ device_classifier \ email_reporter \ @@ -31,6 +31,7 @@ topn \ traffic_repeater \ unirec2json \ endiverter \ +biflow_aggregator \ googletest_example EXTRA_DIST = AUTHORS COPYING ChangeLog INSTALL NEWS README.md nfreader \ diff --git a/biflow_aggregator/Makefile.am b/biflow_aggregator/Makefile.am index 5b5f699c..365329f0 100644 --- a/biflow_aggregator/Makefile.am +++ b/biflow_aggregator/Makefile.am @@ -5,3 +5,10 @@ biflow_aggregator_SOURCES=main.cpp fields.c fields.h configuration.cpp configura rapidxml.hpp biflow_aggregator_LDADD=-lunirec -ltrap include ../aminclude.am + +TESTS = tests/test.sh + +EXTRA_DIST = tests/test.sh \ + tests/references \ + tests/inputs \ + tests/config.xml diff --git a/biflow_aggregator/aggregator.cpp b/biflow_aggregator/aggregator.cpp index ddef2c2d..917c2108 100644 --- a/biflow_aggregator/aggregator.cpp +++ b/biflow_aggregator/aggregator.cpp @@ -184,6 +184,18 @@ int Field_template::assign_append() noexcept return 0; } +template +int Field_template::assign_unique_count() noexcept +{ + typename_size = sizeof(T); + ag_fnc = unique_count; + post_proc_fnc = Unique_count_data::postprocessing; + init_fnc = Unique_count_data::init; + deinit_fnc = Unique_count_data::deinit; + ag_data_size = sizeof(Unique_count_data); + return 0; +} + template int Field_template::assign_min_max() noexcept { @@ -431,6 +443,23 @@ int Field_template::set_templates(const Field_type ag_type, const ur_field_type_ std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to APPEND function." << std::endl; return 1; } + case UNIQUE_COUNT: + switch (ur_f_type) { + case UR_TYPE_UINT8: return assign_unique_count(); + case UR_TYPE_INT8: return assign_unique_count(); + case UR_TYPE_UINT16: return assign_unique_count(); + case UR_TYPE_INT16: return assign_unique_count(); + case UR_TYPE_UINT32: return assign_unique_count(); + case UR_TYPE_INT32: return assign_unique_count(); + case UR_TYPE_UINT64: return assign_unique_count(); + case UR_TYPE_INT64: return assign_unique_count(); + case UR_TYPE_MAC: return assign_unique_count(); + case UR_TYPE_IP: return assign_unique_count(); + case UR_TYPE_STRING: return assign_unique_count(); + default: + std::cerr << "Only string and int, uint, float, double, mac, time, and IP array can be used to UNIQUE_COUNT function." << std::endl; + return 1; + } default: assert("Invalid case option.\n"); return 1; @@ -732,8 +761,10 @@ int Field_template::set_templates_dir(const ur_field_type_t ur_f_type, const ur_ } } -Field::Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid) : - ur_fid(ur_fid), ur_r_fid(ur_r_fid), ur_sort_key_id(0), ur_sort_key_type() +Field::Field(const Field_config cfg, const ur_field_id_t ur_fid_in, const ur_field_id_t ur_r_fid_in, + const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out) + : ur_fid(ur_fid_in), ur_r_fid(ur_r_fid_in), ur_fid_out(ur_fid_out), + ur_r_fid_out(ur_r_fid_out), ur_sort_key_id(0), ur_sort_key_type() { ur_field_type_t ur_field_type = ur_get_type(ur_fid); @@ -813,6 +844,11 @@ void Fields::init(uint8_t *memory) data.first.init(memory, &cfg); break; } + case UNIQUE_COUNT: { + struct Config_unique_count cfg = {data.first.limit}; + data.first.init(memory, &cfg); + break; + } case SORTED_MERGE: case SORTED_MERGE_DIR: { struct Config_sorted_merge cfg = {data.first.limit, data.first.delimiter, data.first.sort_type}; diff --git a/biflow_aggregator/aggregator.h b/biflow_aggregator/aggregator.h index 57a22959..1e962b52 100644 --- a/biflow_aggregator/aggregator.h +++ b/biflow_aggregator/aggregator.h @@ -40,6 +40,7 @@ enum Field_type { LAST, LAST_NON_EMPTY, APPEND, + UNIQUE_COUNT, SORTED_MERGE, SORTED_MERGE_DIR, INVALID_TYPE, @@ -95,6 +96,9 @@ class Field_template { template int assign_append() noexcept; + template + int assign_unique_count() noexcept; + template int assign_min_max() noexcept; @@ -275,7 +279,7 @@ struct Field_config { char delimiter; /** - * @brief Max size of append and sortd merge data + * @brief Max size of data */ std::size_t limit; @@ -301,6 +305,16 @@ class Field : public Field_config, public Field_template { */ ur_field_id_t ur_r_fid; + /** + * @brief ID of output unirec field + */ + ur_field_id_t ur_fid_out; + + /** + * @brief Reverse ID of output unirec field + */ + ur_field_id_t ur_r_fid_out; + /** * @brief ID of sort key unirec field * @@ -321,8 +335,11 @@ class Field : public Field_config, public Field_template { * @param cfg Field configuration * @param ur_fid Field ID * @param ur_r_fid Reverse field ID + * @param ur_fid_out Output field ID + * @param ur_r_fid_out Reverse output field ID */ - Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid); + Field(const Field_config cfg, const ur_field_id_t ur_fid, const ur_field_id_t ur_r_fid, + const ur_field_id_t ur_fid_out, const ur_field_id_t ur_r_fid_out); /** * @brief Call field init function. diff --git a/biflow_aggregator/aggregator_functions.h b/biflow_aggregator/aggregator_functions.h index af0bca47..e0c190c4 100644 --- a/biflow_aggregator/aggregator_functions.h +++ b/biflow_aggregator/aggregator_functions.h @@ -15,6 +15,7 @@ #include #include +#include #include @@ -174,6 +175,50 @@ struct Append_data : Config_append { } }; +/** + * @brief Configuration for unique count function + */ +struct Config_unique_count { + std::size_t filter_size; +}; + +/** + * @brief Structure used to store data for unique count function. + */ +template +struct Unique_count_data : Config_unique_count { + bloom_filter filter; + std::size_t unique_count; + + Unique_count_data(const bloom_parameters& parameters) + : filter(parameters) + {} + + static inline void init(void* memory, const void* config) + { + const auto* config_unique_count = static_cast(config); + bloom_parameters parameters; + parameters.projected_element_count = config_unique_count->filter_size; + parameters.false_positive_probability = 0.01; + parameters.compute_optimal_parameters(); + new(memory) Unique_count_data(parameters); + } + + static inline void deinit(void* memory) + { + Unique_count_data* unique_count_data = static_cast*>(memory); + unique_count_data->unique_count = 0; + unique_count_data->~Unique_count_data(); + } + + static inline const void* postprocessing(void* memory, std::size_t& elem_cnt) noexcept + { + Unique_count_data* unique_count_data = static_cast*>(memory); + elem_cnt = 1; + return static_cast(&unique_count_data->unique_count); + } +}; + /** * @brief Configuration to sorted merge function */ @@ -519,6 +564,32 @@ inline void append(const void *src, void *dst) noexcept static_cast(src_data->ptr_first) + src_data->cnt_elements); } +/** + * @brief Inserts element from src pointer into bloom filter. + * @tparam T template type variable. + * @param [in] src pointer to source of new data. + * @param [in,out] dst pointer to already stored data which will be updated (modified). + */ +template +inline void unique_count(const void* src, void* dst) noexcept +{ + Unique_count_data* unique_count_data = static_cast*>(dst); + + if (std::is_same::value) { + const ur_array_data* src_data = (static_cast(src)); + if (src_data->cnt_elements != 0 && !unique_count_data->filter.contains( + static_cast(src_data->ptr_first), src_data->cnt_elements)) { + unique_count_data->filter.insert(static_cast(src_data->ptr_first), src_data->cnt_elements); + unique_count_data->unique_count++; + } + return; + } + if (!unique_count_data->filter.contains(static_cast(src), sizeof(T))) { + unique_count_data->filter.insert(static_cast(src), sizeof(T)); + unique_count_data->unique_count++; + } +} + template inline void sorted_merge(const void *src, void *dst) noexcept { diff --git a/biflow_aggregator/configuration.cpp b/biflow_aggregator/configuration.cpp index 0df48889..6f9e8c23 100644 --- a/biflow_aggregator/configuration.cpp +++ b/biflow_aggregator/configuration.cpp @@ -71,6 +71,27 @@ bool Configuration::get_eof_termination() noexcept return _eof_terminate; } +void Configuration::set_global_flush_configuration(const char *input) +{ + std::size_t mode_start_index; + _global_flush_configuration.interval = std::stoul(input, &mode_start_index); + if (std::strcmp(input + mode_start_index, "a") == 0 || + std::strcmp(input + mode_start_index, "absolute") == 0) { + _global_flush_configuration.type = Global_flush_configuration::Type::ABSOLUTE; + } else if (std::strcmp(input + mode_start_index, "r") == 0 || + std::strcmp(input + mode_start_index, "relative") == 0 || + std::strcmp(input + mode_start_index, "") == 0) { + _global_flush_configuration.type = Global_flush_configuration::Type::RELATIVE; + } else { + throw std::invalid_argument("Invalid flush timeout format. Expected: [a|absolute|r|relative|]."); + } +} + +Configuration::Global_flush_configuration Configuration::get_global_flush_configuration() noexcept +{ + return _global_flush_configuration; +} + void Configuration::print() noexcept { std::cout << "***** Configuration *****" << std::endl; @@ -107,6 +128,7 @@ agg::Field_type Configuration::get_field_type(const char *input) if (!std::strcmp(input, "BITAND")) return agg::BIT_AND; if (!std::strcmp(input, "BITOR")) return agg::BIT_OR; if (!std::strcmp(input, "APPEND")) return agg::APPEND; + if (!std::strcmp(input, "UNIQUE_COUNT")) return agg::UNIQUE_COUNT; if (!std::strcmp(input, "SORTED_MERGE")) return agg::SORTED_MERGE; if (!std::strcmp(input, "SORTED_MERGE_DIR")) return agg::SORTED_MERGE_DIR; std::cerr << "Invalid type field. Given: " << input << ", Expected: KEY|SUM|MIN|MAX|AVG|FIRST|FIRST_NON_EMPTY|LAST|LAST_NON_EMPTY|BITAND|BITOR|APPEND|SORTED_MERGE|SORTED_MERGE_DIR." << std::endl; diff --git a/biflow_aggregator/configuration.h b/biflow_aggregator/configuration.h index b2b53a92..7b218d2d 100644 --- a/biflow_aggregator/configuration.h +++ b/biflow_aggregator/configuration.h @@ -23,6 +23,35 @@ * @brief Class thas holds module configuration */ class Configuration { +public: + + /** + * @brief Global flush configuration + * + * Flush interval is used to flush all records in flow cache to output interface once per given amount of seconds. + * If not set, no flush is performed. + */ + struct Global_flush_configuration { + enum class Type { + ABSOLUTE, ///< Flows must be flushed every interval seconds starting from epoch + RELATIVE, ///< Flows must be flushed every interval seconds starting from module start + } type; + time_t interval = 0; ///< Interval in seconds + + /** + * @brief Check if flush interval is set + * + * @return true Flush interval is set + * @return false Flush interval is not set + */ + [[nodiscard]] inline + bool is_set() const noexcept + { + return interval > 0; + } + }; + +private: /** * @brief Configuration of fields from config file. @@ -45,6 +74,14 @@ class Configuration { */ time_t _t_passive; + /** + * @brief Periodic flush configuration + * + * If set, module flush all records in flow cache to output interface once per given amount of seconds. + * If flush interval is set to 0, no flush is performed. + */ + Global_flush_configuration _global_flush_configuration; + /** * @brief active timeout * @@ -155,6 +192,20 @@ class Configuration { */ time_t get_active_timeout() noexcept; + /** + * @brief Set the flush timeout + * + * See _periodic_flush_configuration for more info. + * + * @param input Timeout in text format. + */ + void set_global_flush_configuration(const char *input); + + /** + * @brief Get the flush timeout object + */ + Global_flush_configuration get_global_flush_configuration() noexcept; + /** * @brief Set the passive timeout * diff --git a/biflow_aggregator/main.cpp b/biflow_aggregator/main.cpp index 146d9109..c46fcc51 100644 --- a/biflow_aggregator/main.cpp +++ b/biflow_aggregator/main.cpp @@ -79,7 +79,8 @@ UR_FIELDS ( PARAM('e', "eof", "End when receive EOF.", no_argument, "flag") \ PARAM('s', "size", "Max number of elements in flow cache.", required_argument, "number") \ PARAM('a', "active-timeout", "Active timeout.", required_argument, "number") \ - PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") + PARAM('p', "passive-timeout", "Passive timeout.", required_argument, "number") \ + PARAM('g', "global-timeout", "Global timeout.", required_argument, "number") trap_module_info_t *module_info = NULL; static volatile int stop = 0; @@ -197,10 +198,10 @@ proccess_and_send(agg::Aggregator& agg, const agg::FlowKey& key, c agg_data = field->post_processing(&flow_data.ctx->data[agg_field.second], size, elem_cnt); } if (ur_is_array(field->ur_fid)) { - ur_array_allocate(out_tmplt, out_rec, field->ur_fid, elem_cnt); - std::memcpy(ur_get_ptr_by_id(out_tmplt, out_rec, field->ur_fid), agg_data, size * elem_cnt); + ur_array_allocate(out_tmplt, out_rec, field->ur_fid_out, elem_cnt); + std::memcpy(ur_get_ptr_by_id(out_tmplt, out_rec, field->ur_fid_out), agg_data, size * elem_cnt); } else { - field_id = flow_data.reverse ? field->ur_r_fid : field->ur_fid; + field_id = flow_data.reverse ? field->ur_r_fid_out : field->ur_fid_out; std::memcpy(ur_get_ptr_by_id(out_tmplt, out_rec, field_id), agg_data, size); } } @@ -248,8 +249,19 @@ static int process_format_change( if (ur_get_type(ur_fid) == UR_TYPE_STRING) is_string_key = true; agg::Key_template::add(ur_fid, ur_r_fid); + } else if (field_cfg.type == agg::UNIQUE_COUNT) { + const int ur_fid_out = ur_define_field((field_cfg.name + "_UNIQUE_COUNT").c_str(), UR_TYPE_UINT32); + out_template.append("," + field_cfg.name + "_UNIQUE_COUNT"); + int ur_r_fid_out = ur_fid_out; + if (ur_fid != ur_r_fid) { + ur_define_field((field_cfg.reverse_name + "_UNIQUE_COUNT").c_str(), UR_TYPE_UINT32); + out_template.append("," + field_cfg.reverse_name + "_UNIQUE_COUNT"); + } + field_cfg.to_output = false; + agg::Field field(field_cfg, ur_fid, ur_r_fid, ur_fid_out, ur_r_fid_out); + agg.fields.add_field(field); } else { - agg::Field field(field_cfg, ur_fid, ur_r_fid); + agg::Field field(field_cfg, ur_fid, ur_r_fid, ur_fid, ur_r_fid); agg.fields.add_field(field); } if (field_cfg.to_output) @@ -364,6 +376,20 @@ void update_flow( if (pt != t_data->value.passive_timeout) dll.swap(t_data); } + +static void flush_all(agg::Aggregator& aggregator, + ur_template_t* out_template, void* out_record, Dll& dll) +{ + for (auto flow_data : aggregator.flow_cache) { + proccess_and_send(aggregator, flow_data.first, flow_data.second, out_template, out_record); + agg::Flow_key_allocator::release_ptr(static_cast(flow_data.first.get_key().first)); + agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx); + } + dll.clear(); + aggregator.flow_cache.clear(); + trap_send_flush(0); +} + static int do_mainloop(Configuration& config) { @@ -383,8 +409,11 @@ do_mainloop(Configuration& config) time_t time_first; time_t time_last = 0; + time_t last_flush_time = 0; time_t t_passive = config.get_passive_timeout() >> 32; time_t t_active = config.get_active_timeout() >> 32; + const Configuration::Global_flush_configuration& flush_configuration + = config.get_global_flush_configuration(); std::size_t flow_cnt = 0; Dll dll; @@ -401,7 +430,7 @@ do_mainloop(Configuration& config) while (unlikely(stop == false)) { // Check timeouted flows - for (node *t_data = dll.begin(); t_data != NULL; t_data = t_data->next) { + for (node *t_data = dll.begin(); !flush_configuration.is_set() && t_data != NULL; t_data = t_data->next) { if (time_last >= t_data->value.passive_timeout) { // timeouted auto data = agg.flow_cache.find(t_data->value.key); proccess_and_send(agg, data->first, data->second, out_tmplt, out_rec); @@ -432,13 +461,7 @@ do_mainloop(Configuration& config) // clear all memory // flush all flows - for (auto flow_data : agg.flow_cache) { - proccess_and_send(agg, flow_data.first, flow_data.second, out_tmplt, out_rec); - agg::Flow_key_allocator::release_ptr(static_cast(flow_data.first.get_key().first)); - agg::Flow_data_context_allocator::release_ptr(flow_data.second.ctx); - } - - trap_send_flush(0); + flush_all(agg, out_tmplt, out_rec, dll); // Free previous record and temlate ur_free_template(out_tmplt); @@ -474,7 +497,7 @@ do_mainloop(Configuration& config) time_last = ur_time_get_sec(ur_get(in_tmplt, in_data, F_TIME_LAST)); // Check timeouted flows - for (node *t_data = dll.begin(); t_data != NULL; t_data = t_data->next) { + for (node *t_data = dll.begin(); !flush_configuration.is_set() && t_data != NULL; t_data = t_data->next) { if (time_first >= t_data->value.passive_timeout || time_last >= t_data->value.active_timeout) { // timeouted auto data = agg.flow_cache.find(t_data->value.key); proccess_and_send(agg, data->first, data->second, out_tmplt, out_rec); @@ -490,6 +513,14 @@ do_mainloop(Configuration& config) trap_send_flush(0); timeouted = false; } + + if (unlikely(flush_configuration.is_set() && time_last - last_flush_time >= flush_configuration.interval)) { + last_flush_time = time_last; + if (flush_configuration.type == Configuration::Global_flush_configuration::Type::ABSOLUTE) { + last_flush_time = last_flush_time / flush_configuration.interval * flush_configuration.interval; + } + flush_all(agg, out_tmplt, out_rec, dll); + } bool is_key_reversed = key.generate(in_data, in_tmplt, config.is_biflow_key()); @@ -662,6 +693,9 @@ main(int argc, char **argv) case 's': config.set_flow_cache_size(optarg); break; + case 'g': + config.set_global_flush_configuration(optarg); + break; default: std::cerr << "Invalid argument " << opt << ", skipped..." << std::endl; } diff --git a/biflow_aggregator/tests/config.xml b/biflow_aggregator/tests/config.xml new file mode 100644 index 00000000..42ece91e --- /dev/null +++ b/biflow_aggregator/tests/config.xml @@ -0,0 +1,92 @@ + + + + + SUM + PACKETS + + + + + UNIQUE_COUNT + TEST + 1000 + + + + + FLOW_ID + KEY + + + + SRC_PORT + MIN + + + DST_PORT + MIN + + + + + SRC_IP + KEY + + + DST_IP + KEY + + + + SUM + SUM + + + MIN + MIN + + + MAX + MAX + + + FIRST_NON_EMPTY + FIRST_NON_EMPTY + + + LAST_NON_EMPTY + LAST_NON_EMPTY + + + FIRST + FIRST + + + LAST + LAST + + + AVG + AVG + + + BITOR + BITOR + + + STR_APPEND + APPEND + : + 10 + + + SORTED_MERGE_VALUE + SORTED_MERGE + : + SORTED_MERGE_KEY + ASCENDING + 10 + + + diff --git a/biflow_aggregator/tests/inputs/input1_packet_aggregation b/biflow_aggregator/tests/inputs/input1_packet_aggregation new file mode 100644 index 00000000..4ae4e83d --- /dev/null +++ b/biflow_aggregator/tests/inputs/input1_packet_aggregation @@ -0,0 +1,3 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST +192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0 diff --git a/biflow_aggregator/tests/inputs/input2_packet_aggregation b/biflow_aggregator/tests/inputs/input2_packet_aggregation new file mode 100644 index 00000000..453ba648 --- /dev/null +++ b/biflow_aggregator/tests/inputs/input2_packet_aggregation @@ -0,0 +1,6 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST +192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0 +192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0 +192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0 +192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:51.0 diff --git a/biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports b/biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports new file mode 100644 index 00000000..8abaa7d5 --- /dev/null +++ b/biflow_aggregator/tests/inputs/input3_generic_flow_key_min_ports @@ -0,0 +1,6 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 PACKETS,time TIME_FIRST,time TIME_LAST,uint32 FLOW_ID,uint16 SRC_PORT,uint16 DST_PORT +192.168.1.1,192.168.1.2,1,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,1,0,6666 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0,1,6666,0 +192.168.1.8,192.168.1.9,10,2016-10-28T17:00:13.0,2016-10-28T17:00:17.0,2,6666,0 +192.168.1.8,192.168.1.9,17,2016-10-28T17:00:21.0,2016-10-28T17:00:23.0,2,3333,3333 +192.168.1.8,192.168.1.9,53,2016-10-28T17:00:26.0,2016-10-28T17:00:41.0,2,0,6666 diff --git a/biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations b/biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations new file mode 100644 index 00000000..5798ebe4 --- /dev/null +++ b/biflow_aggregator/tests/inputs/input4_src_dst_ip_all_aggregations @@ -0,0 +1,3 @@ +ipaddr DST_IP,ipaddr SRC_IP,time TIME_FIRST,time TIME_LAST,uint32 SUM,uint32 MIN,uint32 MAX,string FIRST_NON_EMPTY,string LAST_NON_EMPTY,uint32 FIRST,uint32 LAST,double AVG,string STR_APPEND,uint32 BITOR,uint32* SORTED_MERGE_KEY,uint32* SORTED_MERGE_VALUE +192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0,333,5,5,,222,16,32,7,test1,1,[3|2|1],[1|2|3] +192.168.1.1,192.168.1.2,2016-10-28T17:00:1.0,2016-10-28T17:00:11.0,333,1,4,test,,555,33,9,test2,3,[8|7|6],[6|5|4] diff --git a/biflow_aggregator/tests/inputs/input5_unique_count b/biflow_aggregator/tests/inputs/input5_unique_count new file mode 100644 index 00000000..c61b8b5a --- /dev/null +++ b/biflow_aggregator/tests/inputs/input5_unique_count @@ -0,0 +1,5 @@ +ipaddr DST_IP,ipaddr SRC_IP,uint32 TEST,time TIME_FIRST,time TIME_LAST +192.168.1.1,192.168.1.2,7,2016-10-28T17:00:1.0,2016-10-28T17:00:7.0 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0 +192.168.1.5,192.168.1.6,666,2016-10-28T17:00:3.0,2016-10-28T17:00:18.0 +192.168.1.5,192.168.1.6,6,2016-10-28T17:00:3.0,2016-10-28T17:00:19.0 diff --git a/biflow_aggregator/tests/inputs/input6_unique_count b/biflow_aggregator/tests/inputs/input6_unique_count new file mode 100644 index 00000000..7aef544c --- /dev/null +++ b/biflow_aggregator/tests/inputs/input6_unique_count @@ -0,0 +1,5 @@ +ipaddr DST_IP,ipaddr SRC_IP,string TEST,time TIME_FIRST,time TIME_LAST +192.168.1.1,192.168.1.2,"x",2016-10-28T17:00:1.0,2016-10-28T17:00:7.0 +192.168.1.5,192.168.1.6,,2016-10-28T17:00:3.0,2016-10-28T17:00:11.0 +192.168.1.5,192.168.1.6,"y",2016-10-28T17:00:3.0,2016-10-28T17:00:18.0 +192.168.1.5,192.168.1.6,"y",2016-10-28T17:00:3.0,2016-10-28T17:00:22.0 diff --git a/biflow_aggregator/tests/references/reference1_gt0 b/biflow_aggregator/tests/references/reference1_gt0 new file mode 100644 index 00000000..a62b2f5e --- /dev/null +++ b/biflow_aggregator/tests/references/reference1_gt0 @@ -0,0 +1 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667 diff --git a/biflow_aggregator/tests/references/reference1_gt5a b/biflow_aggregator/tests/references/reference1_gt5a new file mode 100644 index 00000000..a4ad9a5e --- /dev/null +++ b/biflow_aggregator/tests/references/reference1_gt5a @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,666 diff --git a/biflow_aggregator/tests/references/reference1_gt5r b/biflow_aggregator/tests/references/reference1_gt5r new file mode 100644 index 00000000..a62b2f5e --- /dev/null +++ b/biflow_aggregator/tests/references/reference1_gt5r @@ -0,0 +1 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667 diff --git a/biflow_aggregator/tests/references/reference2_gt0 b/biflow_aggregator/tests/references/reference2_gt0 new file mode 100644 index 00000000..1c168c6b --- /dev/null +++ b/biflow_aggregator/tests/references/reference2_gt0 @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:23.000000,4,694 +2016-10-28T17:00:26.000000,2016-10-28T17:00:51.000000,1,53 diff --git a/biflow_aggregator/tests/references/reference2_gt5a b/biflow_aggregator/tests/references/reference2_gt5a new file mode 100644 index 00000000..bfcda95f --- /dev/null +++ b/biflow_aggregator/tests/references/reference2_gt5a @@ -0,0 +1,5 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,666 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,10 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,17 +2016-10-28T17:00:26.000000,2016-10-28T17:00:51.000000,1,53 diff --git a/biflow_aggregator/tests/references/reference2_gt5r b/biflow_aggregator/tests/references/reference2_gt5r new file mode 100644 index 00000000..74d4a06d --- /dev/null +++ b/biflow_aggregator/tests/references/reference2_gt5r @@ -0,0 +1,4 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,667 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,10 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,17 +2016-10-28T17:00:26.000000,2016-10-28T17:00:51.000000,1,53 diff --git a/biflow_aggregator/tests/references/reference3_gt0 b/biflow_aggregator/tests/references/reference3_gt0 new file mode 100644 index 00000000..24210f65 --- /dev/null +++ b/biflow_aggregator/tests/references/reference3_gt0 @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1,0,0 +2016-10-28T17:00:13.000000,2016-10-28T17:00:41.000000,3,2,0,0 diff --git a/biflow_aggregator/tests/references/reference3_gt5a b/biflow_aggregator/tests/references/reference3_gt5a new file mode 100644 index 00000000..913131ae --- /dev/null +++ b/biflow_aggregator/tests/references/reference3_gt5a @@ -0,0 +1,5 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1,6666,0 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,1,0,6666 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,2,0,6666 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,2,3333,3333 +2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,2,6666,0 diff --git a/biflow_aggregator/tests/references/reference3_gt5r b/biflow_aggregator/tests/references/reference3_gt5r new file mode 100644 index 00000000..4d58ba78 --- /dev/null +++ b/biflow_aggregator/tests/references/reference3_gt5r @@ -0,0 +1,4 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1,0,0 +2016-10-28T17:00:13.000000,2016-10-28T17:00:17.000000,1,2,0,6666 +2016-10-28T17:00:21.000000,2016-10-28T17:00:23.000000,1,2,3333,3333 +2016-10-28T17:00:26.000000,2016-10-28T17:00:41.000000,1,2,6666,0 diff --git a/biflow_aggregator/tests/references/reference4_gt0 b/biflow_aggregator/tests/references/reference4_gt0 new file mode 100644 index 00000000..f79fed90 --- /dev/null +++ b/biflow_aggregator/tests/references/reference4_gt0 @@ -0,0 +1 @@ +192.168.1.1,192.168.1.2,8.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,2,16,33,5,1,666,"test","222","test1:",[3|2|1|4|5|6] diff --git a/biflow_aggregator/tests/references/reference4_gt5a b/biflow_aggregator/tests/references/reference4_gt5a new file mode 100644 index 00000000..8cd58c84 --- /dev/null +++ b/biflow_aggregator/tests/references/reference4_gt5a @@ -0,0 +1,2 @@ +192.168.1.1,192.168.1.2,7.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1,16,32,5,5,333,"","222","test1:",[3|2|1] +192.168.1.1,192.168.1.2,9.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,1,555,33,4,1,333,"test","","test2:",[4|5|6] diff --git a/biflow_aggregator/tests/references/reference4_gt5r b/biflow_aggregator/tests/references/reference4_gt5r new file mode 100644 index 00000000..f79fed90 --- /dev/null +++ b/biflow_aggregator/tests/references/reference4_gt5r @@ -0,0 +1 @@ +192.168.1.1,192.168.1.2,8.000000,2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,3,2,16,33,5,1,666,"test","222","test1:",[3|2|1|4|5|6] diff --git a/biflow_aggregator/tests/references/reference5_gt0 b/biflow_aggregator/tests/references/reference5_gt0 new file mode 100644 index 00000000..3b9dd95d --- /dev/null +++ b/biflow_aggregator/tests/references/reference5_gt0 @@ -0,0 +1 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:19.000000,4,3 diff --git a/biflow_aggregator/tests/references/reference5_gt5a b/biflow_aggregator/tests/references/reference5_gt5a new file mode 100644 index 00000000..9c1604f1 --- /dev/null +++ b/biflow_aggregator/tests/references/reference5_gt5a @@ -0,0 +1,3 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:19.000000,2,2 diff --git a/biflow_aggregator/tests/references/reference5_gt5r b/biflow_aggregator/tests/references/reference5_gt5r new file mode 100644 index 00000000..972254fe --- /dev/null +++ b/biflow_aggregator/tests/references/reference5_gt5r @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,2 +2016-10-28T17:00:03.000000,2016-10-28T17:00:19.000000,2,2 diff --git a/biflow_aggregator/tests/references/reference6_gt0 b/biflow_aggregator/tests/references/reference6_gt0 new file mode 100644 index 00000000..1b9cc529 --- /dev/null +++ b/biflow_aggregator/tests/references/reference6_gt0 @@ -0,0 +1 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:22.000000,4,2 diff --git a/biflow_aggregator/tests/references/reference6_gt5a b/biflow_aggregator/tests/references/reference6_gt5a new file mode 100644 index 00000000..9ceb9264 --- /dev/null +++ b/biflow_aggregator/tests/references/reference6_gt5a @@ -0,0 +1,4 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:07.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:11.000000,1,0 +2016-10-28T17:00:03.000000,2016-10-28T17:00:18.000000,1,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:22.000000,1,1 diff --git a/biflow_aggregator/tests/references/reference6_gt5r b/biflow_aggregator/tests/references/reference6_gt5r new file mode 100644 index 00000000..c814d2ed --- /dev/null +++ b/biflow_aggregator/tests/references/reference6_gt5r @@ -0,0 +1,2 @@ +2016-10-28T17:00:01.000000,2016-10-28T17:00:11.000000,2,1 +2016-10-28T17:00:03.000000,2016-10-28T17:00:22.000000,2,1 diff --git a/biflow_aggregator/tests/test.sh b/biflow_aggregator/tests/test.sh new file mode 100755 index 00000000..d59e9cc7 --- /dev/null +++ b/biflow_aggregator/tests/test.sh @@ -0,0 +1,93 @@ +#!/bin/bash +#set -e + +die_if_not_running() { + local pid=$1 + local error_message=$2 + if not kill -0 "$pid" 2>/dev/null; then + echo $error_message + exit 1 + fi +} + +run_test_with_global_timeout() { + local input=$1 + local output=$2 + local reference=$3 + local section=$4 + local config=$5 + local global_timeout=$6 + (./biflow_aggregator -i "u:lr,u:ba" -e -c $config -n $section -g $global_timeout & ) || true + local AGGREGATOR_PID=$! + sleep 0.5 + die_if_not_running $AGGREGATOR_PID "Failed to start biflow aggregator" + + ./../logger/logger -w $output -i "u:ba" & + local LOGGER_PID=$! + sleep 0.5 + die_if_not_running $LOGGER_PID "Failed to start logger" + + ./../logreplay/logreplay -f $input -i "u:lr" & + local LOGREPLAY_PID=$! + sleep 0.5 + die_if_not_running $LOGREPLAY_PID "Failed to start logreplay" + + sleep 3 + wait $LOGREPLAY_PID + kill $AGGREGATOR_PID 2>/dev/null + sleep 0.2 + kill $LOGGER_PID 2>/dev/null + wait $LOGGER_PID + + if ! diff $output $reference; then + echo $output doesnt match $reference + success="false" + fi +} + pwd >&2 + +script_path="$(cd -- "$(dirname -- "${BASH_SOURCE[0]}")" &>/dev/null && pwd)" + +success="true" + +tmpdir=$(mktemp -d) + +trap "rm -rf \"$tmpdir\"" EXIT + +outputs="$tmpdir/outputs" + +mkdir -p "$outputs" + + +for input in "$script_path"/inputs/input*; do + basename=$(basename "$input") + echo Found $basename + if [[ $basename =~ input([0-9]+)_(.+) ]]; then + index="${BASH_REMATCH[1]}" + section="${BASH_REMATCH[2]}" + output="$outputs/output$index" + config="$script_path/config.xml" + reference="$script_path/references/reference$index" + else + echo $input + echo "Incorrect input name. Must be input_" + exit 1 + fi + + echo "Running test without global timeout..." + run_test_with_global_timeout $input ${output}_gt0 ${reference}_gt0 $section $config "0" + + echo "Running test with relative global timeout..." + run_test_with_global_timeout $input ${output}_gt5r ${reference}_gt5r $section $config "5r" + + echo "Running test with absolute global timeout..." + run_test_with_global_timeout $input ${output}_gt5a ${reference}_gt5a $section $config "5a" + +done + +if [ "$success" = "true" ]; then + echo "All tests passed successfully." +else + echo "Some tests failed." + exit 1 +fi diff --git a/logger/Makefile.am b/logger/Makefile.am index 0771fb24..4e01bdd2 100644 --- a/logger/Makefile.am +++ b/logger/Makefile.am @@ -6,3 +6,4 @@ pkgdocdir=${docdir}/logger pkgdoc_DATA=README.md EXTRA_DIST=README.md csv2nf.sh include ../aminclude.am +check_PROGRAMS = logger diff --git a/logreplay/Makefile.am b/logreplay/Makefile.am index c34832ff..584ed8e2 100644 --- a/logreplay/Makefile.am +++ b/logreplay/Makefile.am @@ -6,3 +6,4 @@ pkgdocdir=${docdir}/logreplay pkgdoc_DATA=README.md EXTRA_DIST=README.md include ../aminclude.am +check_PROGRAMS = logreplay