diff --git a/blacklistfilter/evaluator/evaluator.py b/blacklistfilter/evaluator/evaluator.py new file mode 100644 index 00000000..9486b01a --- /dev/null +++ b/blacklistfilter/evaluator/evaluator.py @@ -0,0 +1,316 @@ +import csv +import json +from queue import Queue +import os +import sys +import signal +import time +from threading import Thread, Event +import requests +from datetime import datetime +import pytrap +import shutil +from optparse import OptionParser +from collections import defaultdict + +#Enter an API key to obtain informations from nerd +NERD_API_KEY = None + +#Constant defined in Adaptive Filter +ADAPTIVE_BLACKLIST_ID = 999 + +parser = OptionParser(add_help_option=True) +parser.add_option("-i", "--ifcspec", dest="ifcspec", + help="TRAP IFC specifier", metavar="IFCSPEC") +parser.add_option("-c", "--csv-path", dest="csv", help="Path to the csv files generated by Split Evidence") +parser.add_option("-e","--evidence-path", dest="evidence", help="Path to the folder for storing data of unreported detections") + +#Signal handler +def SignalHandler(signal, frame): + global signal_recieved, trap_ctx + if signal_recieved: + print('Caught another SIGINT, exiting forcefully..') + os._exit(1) + trap_ctx.terminate() + signal_recieved = True + +class Alert: + def __init__(self, client, nerd_info): + self.ip = client.ip_addr; + self.statistics = client.statistics + self.nerd_info = nerd_info + +class MonitoredClient: + + def __init__(self, ip_addr, detection): + self.ip_addr = ip_addr + self.detection = detection + self.ports = set() + self.duration_in = 0.0 + self.duration_out = 0.0 + self.start = datetime.now() + self.end = datetime.utcfromtimestamp(0) + self.ips_contacted = set() + self.statistics = defaultdict(int) + + def AddFlow(self, flow, src): + first = datetime.strptime(flow["time TIME_FIRST"], '%Y-%m-%dT%H:%M:%S.%f') + last = datetime.strptime(flow["time TIME_LAST"], '%Y-%m-%dT%H:%M:%S.%f') + + if first < self.start: + self.start = first + if last > self.end: + self.end = last + + if src: + self.duration_out += (last - first).total_seconds(); + self.statistics["bytes_sent"] += int(flow["uint64 BYTES"]) + self.statistics["packets_sent"] += int(flow["uint32 PACKETS"]) + self.statistics["flows_sent"] += 1 + self.ips_contacted.add(flow["ipaddr DST_IP"]) + self.ports.add(flow["uint16 DST_PORT"]) + else: + self.duration_in += (last - first).total_seconds(); + self.statistics["bytes_recv"] += int(flow["uint64 BYTES"]) + self.statistics["packets_recv"] += int(flow["uint32 PACKETS"]) + self.statistics["flows_recv"] += 1 + + #Determine other statistics + def process(self): + self.statistics['floats_per_sec_recv'] = self.statistics["flows_recv"] / (self.end - self.start).total_seconds() if (self.end - self.start).total_seconds() else 0.0 + self.statistics['floats_per_sec_sent'] = self.statistics["flows_sent"] / (self.end - self.start).total_seconds() if (self.end - self.start).total_seconds() else 0.0 + self.statistics['pkts_per_sec_recv'] = self.statistics["packets_recv"] / self.duration_in if self.duration_in else 0.0 + self.statistics['pkts_per_sec_sent'] = self.statistics["packets_sent"] / self.duration_out if self.duration_out else 0.0 + self.statistics['bytes_per_sec_recv'] = self.statistics["bytes_recv"] / self.duration_in if self.duration_in else 0.0 + self.statistics['bytes_per_sec_sent'] = self.statistics["bytes_sent"] / self.duration_out if self.duration_out else 0.0 + self.statistics['bytes_per_pkt_recv'] = self.statistics["bytes_recv"] / self.statistics["packets_recv"] if self.statistics["packets_recv"] else 0.0 + self.statistics['bytes_per_pkt_sent'] = self.statistics["bytes_sent"] / self.statistics["packets_sent"] if self.statistics["packets_sent"] else 0.0 + self.statistics['ips_contacted'] = len(self.ips_contacted) + self.statistics['ports'] = len(self.ports) + +#Main class managing threads +class Controller: + def __init__(self, receiver_cnt = 1, worker_cnt = 2, sender_cnt = 1): + self.reciever_cnt = receiver_cnt + self.worker_cnt = worker_cnt + self.recv_queue = Queue() + self.send_queue = Queue() + self.receivers = [] + self.workers = [] + self.sender = Sender(self.send_queue) + + for i in range(receiver_cnt): + self.receivers.append(Receiver(0, self.recv_queue)) + + for i in range(worker_cnt): + self.workers.append(Worker(self.recv_queue, self.send_queue)) + + def Start(self): + print("Starting threads...") + + for r in self.receivers: + r.start() + + for w in self.workers: + w.start() + + self.sender.start() + + while not signal_recieved: + time.sleep(2) + + def Join(self): + print("Joining threads...") + for r in self.receivers: + r.join() + + for i in range(len(self.workers)): + self.recv_queue.put(None) + + for w in self.workers: + w.join() + + self.send_queue.put(None) + self.sender.join() + +#Sending thread +class Sender(Thread): + def __init__(self, queue): + self.queue = queue + Thread.__init__(self, name = "SendingThread") + + #Request IP information from NERD + def nerd_query(self, api_key, ip_addr): + if api_key == None: + return None + base_url = 'https://nerd.cesnet.cz/nerd/api/v1' + headers = { + "Authorization": api_key + } + try: + req = requests.get(base_url + '/ip/{}/full'.format(ip_addr), headers=headers, timeout=3) + if req.status_code == 200: + return json.loads(req.text) + except requests.RequestException as e: + print('Failed to get info from NERD\n', e) + return None + + #Send report to the output interface + def SendReport(self, client): + nerd_record = self.nerd_query(NERD_API_KEY, client.ip_addr) + alert = Alert(client, nerd_record) + trap_ctx.send(bytearray(json.dumps(alert.__dict__), "utf-8")) + + #Move files related to unreported events for later use + def MoveToEvidence(self, client ): + os.mkdir(options.evidence + "/" + client["detection"]["id"]) + shutil.move(options.csv+"/"+id+".csv", options.evidence + "/" + client["detection"]["id"] + "/" + client["ip_addr"].replace(".","_") + ".csv") + + def run(self): + while True: + clients = self.queue.get() + + for key in clients: + if (int(clients[key].statistics['bytes_sent']) == 0 or int(clients[key].statistics['flows_sent']) == 1): + try: + os.remove(options.csv+"/"+clients[key].detection["id"]+".csv") + except FileNotFoundError: + continue + continue + thresholds_reached = 0; + clients[key].process() + if (float(clients[key].statistics['bytes_per_pkt_sent']) > 187.1207): + thresholds_reached += 1 + if (float(clients[key].statistics['bytes_per_pkt_recv']) > 227.5888): + thresholds_reached += 1 + if (float(clients[key].statistics['bytes_recv']) / float(clients[key].statistics['bytes_sent']) < 2.5198): + thresholds_reached += 1 + if (float(clients[key].statistics['packets_recv']) / float(clients[key].statistics['packets_sent']) < 1.4844): + thresholds_reached += 1 + if (float(clients[key].statistics['flows_recv']) / float(clients[key].statistics['flows_sent']) < 1.9401): + thresholds_reached += 1 + if (float(clients[key].statistics['flows_recv']) != 0): + if (float(clients[key].statistics['packets_recv']) / float(clients[key].statistics['flows_recv']) > 5.0973): + thresholds_reached += 1 + else: + thresholds_reached += 1 + if (float(clients[key].statistics['pps_recv']) < 3.7455): + thresholds_reached += 1 + if (float(clients[key].statistics['pps_sent']) < 227.5888): + thresholds_reached += 1 + if( float(cleints[key].statistics['ips_contacted'] > 124.7984 + thresholds_reached += 1 + if (thresholds_reached > 4): + self.SendReport(clients[key]) + else: + self.MoveToEvidence(clients[key]) + + + +class Worker(Thread): + + def __init__(self, recv_queue, send_queue): + Thread.__init__(self, name = "ConsumerThread" ) + + self.recv_queue = recv_queue + self.send_queue = send_queue + + def join(self): + Thread.join(self, None) + + def run(self): + while True: + detection = self.recv_queue.get() + if detection == None: + self.send_queue.put(None) + break; + + id = detection["id"] + + if not os.path.isfile(options.csv+"/"+id+".csv"): + print("Could not locate the file related to the event {}".format(id)) + continue + + clients = dict() + + for event in detection["grouped_events"]: + for target in event["targets"]: + clients[target] = MonitoredClient(target, detection) + + with open(options.csv+"/"+id+".csv", 'r') as f: + reader = csv.DictReader(f) + for line in reader: + flow = dict(line) + try: + if (int(flow["uint16 SRC_PORT"]) == 53 or int(flow["uint16 DST_PORT"]) == 53): + continue + except ValueError: + continue + if int(flow["uint64 SRC_BLACKLIST"]) == ADAPTIVE_BLACKLIST_ID: + try: + clients[flow["ipaddr SRC_IP"]].AddFlow(flow, True) + except KeyError: + clients[flow["ipaddr SRC_IP"]] = MonitoredClient(flow["ipaddr SRC_IP"],detection) + clients[flow["ipaddr SRC_IP"]].AddFlow(flow, True) + else: + try: + clients[flow["ipaddr DST_IP"]].AddFlow(flow, False) + except KeyError: + clients[flow["ipaddr DST_IP"]] = MonitoredClient(flow["ipaddr DST_IP"],detection) + clients[flow["ipaddr DST_IP"]].AddFlow(flow, False) + + self.send_queue.put(clients.copy()) + self.recv_queue.task_done() + +class Receiver(Thread): + def __init__(self, ifc_in, queue): + self.ifc_in = ifc_in + trap_ctx.setRequiredFmt(self.ifc_in, pytrap.FMT_JSON, "blacklist_evidence") + self.queue = queue + + self.stop = Event() + + Thread.__init__(self, name = "ReceiverThread" ) + + def join(self): + print(("{}: Joining...").format(self.name)) + self.stop.set() + Thread.join(self, None) + + def run(self): + while not self.stop.is_set(): + try: + data = trap_ctx.recv(0) + except pytrap.FormatMismatch: + print("Pytrap Format Mismatch") + break + except pytrap.TrapError: + print("Pytrap Error") + break + except pytrap.Terminated: + print("TRAP terminated") + break + if len(data) <= 1: + break + + recv = json.loads(data.decode()) + if recv["event_type"] != "BotnetDetection": + continue + + self.queue.put(recv) + +signal_recieved = False + +if __name__ == '__main__': + options, args = parser.parse_args() + options.evidence.rstrip('/') + options.csv.rstrip('/') + signal.signal(signal.SIGINT, SignalHandler) + + trap_ctx = pytrap.TrapCtx() + trap_ctx.init(sys.argv, 1, 1) + + controller = Controller() + controller.Start() + controller.Join() + diff --git a/blacklistfilter/evaluator/split_evidence/FileHandler.cpp b/blacklistfilter/evaluator/split_evidence/FileHandler.cpp new file mode 100644 index 00000000..2ab8a95e --- /dev/null +++ b/blacklistfilter/evaluator/split_evidence/FileHandler.cpp @@ -0,0 +1,131 @@ +#include "FileHandler.h" +#include +#include +#include +#include + +FileHandler::TFileWrap::TFileWrap(const std::string& filename, urcsv_t* csv ) +: m_filename( filename ), +m_buffer_len( 0 ) { + + struct stat stat_struct; + if( stat(filename.c_str(), &stat_struct) != 0 ) { + char* hdr = urcsv_header(csv); + m_buffer_len = std::snprintf( &m_buffer[m_buffer_len], BUFF_SIZE, "%s\n", hdr ); + free(hdr); + } + + m_last_write = std::chrono::system_clock::now(); + +} + +FileHandler::TFileWrap::~TFileWrap() { + + if( m_buffer_len ) { + std::ofstream ofs( m_filename, std::ios::app ); + ofs << m_buffer; + } + +} + +void FileHandler::TFileWrap::write( const void* in_rec, urcsv_t* csv) { + + char *rec = urcsv_record(csv, in_rec); + size_t rec_len = std::strlen( rec ); + + //Append the record to the buffer if there is still free capacity, flush to the file otherwise + if( m_buffer_len + rec_len + 1 < BUFF_SIZE ) { + m_buffer_len += std::snprintf( &m_buffer[m_buffer_len], BUFF_SIZE - m_buffer_len, "%s\n", rec ); + } else { + std::ofstream ofs( m_filename, std::ios::app ); + ofs << m_buffer << rec << '\n'; + m_buffer_len = 0; + } + free( rec ); + + //Update the timestamp + m_last_write = std::chrono::system_clock::now(); + +} + +FileHandler::FileHandler(const std::string &path, const std::chrono::milliseconds &check_interval, + const std::chrono::milliseconds &close_timeout) : + m_path( path ), + m_check_interval( check_interval ), + m_close_timeout( close_timeout ), + m_stop( true ){ + if( m_path.back() != '/' ) + m_path.append( 1, '/' ); + +} + +FileHandler::~FileHandler() { + stop_handler(); + for (auto it = m_files.begin(); it != m_files.end(); ++it) { + delete it->second; + } +} +void FileHandler::closed_unused_files() { + + while( ! m_stop ) { + std::unique_lock lock(m_mtx); + + //Wait for check_interval to expire or termination + m_cond.wait_for(lock, m_check_interval, [this]() { return m_stop; }); + + for (auto it = m_files.begin(); it != m_files.end();) { + + //Get the time since the last write + auto diff = std::chrono::duration_cast( + std::chrono::system_clock::now() - it->second->m_last_write); + + if ( diff > m_close_timeout ) { + delete it->second; + it = m_files.erase(it); + } else { + ++it; + } + } + + lock.unlock(); + } + +} + +void FileHandler::start_handler() { + + m_stop = false; + m_watcher = std::thread( &FileHandler::closed_unused_files, this ); + +} + +void FileHandler::stop_handler() { + + { + std::unique_lock lock( m_mtx ); + m_stop = true; + } + + m_cond.notify_one(); + m_watcher.join(); + +} + +void FileHandler::write_to_file( const std::string& filename, const void* in_rec, urcsv_t* csv ) { + + std::unique_lock lock( m_mtx ); + + auto it = m_files.find( filename ); + if( it != m_files.end() ) { + it->second->write( in_rec, csv ); + } else { + try { + auto file_it = m_files.insert(std::make_pair(filename, new FileHandler::TFileWrap( m_path + filename + ".csv", csv ))); + file_it.first->second->write( in_rec, csv ); + } catch ( const std::bad_alloc &e ) { + //Memory allocation failed + std::cerr << e.what() << '\n'; + } + } +} + diff --git a/blacklistfilter/evaluator/split_evidence/FileHandler.h b/blacklistfilter/evaluator/split_evidence/FileHandler.h new file mode 100644 index 00000000..aefda77a --- /dev/null +++ b/blacklistfilter/evaluator/split_evidence/FileHandler.h @@ -0,0 +1,61 @@ +#ifndef UNTITLED1_FILEHANDLER_H +#define UNTITLED1_FILEHANDLER_H + + +#include +#include +#include +#include +#include + +#include + +#define BUFF_SIZE 131072 + +class FileHandler { + + //Structure holding buffer + struct TFileWrap { + std::string m_filename; + char m_buffer[BUFF_SIZE]; + size_t m_buffer_len; + std::chrono::time_point m_last_write; + + TFileWrap( const std::string& file, urcsv_t* csv ); + ~TFileWrap(); + void write( const void* in_rec, urcsv_t* csv ); + }; + //Map with files held in memory + std::map m_files; + + //Path to a folder where files will be created + std::string m_path; + + //Interval in which the the module check m_last_write variable + std::chrono::milliseconds m_check_interval; + + //If no write occured during this time, the buffer is flushed + std::chrono::milliseconds m_close_timeout; + + //Indicates the termination of the module + bool m_stop; + + std::thread m_watcher; + std::condition_variable m_cond; + std::mutex m_mtx; + + void closed_unused_files(); + void stop_handler(); + +public: + + FileHandler( const std::string& path, const std::chrono::milliseconds& check_interval, const std::chrono::milliseconds& close_timeout ); + ~FileHandler(); + void start_handler(); + + void write_to_file( const std::string& filename, const void* in_rec, urcsv_t* csv ); + +}; + + +#endif //UNTITLED1_FILEHANDLER_H diff --git a/blacklistfilter/evaluator/split_evidence/SplitEvidence.cpp b/blacklistfilter/evaluator/split_evidence/SplitEvidence.cpp new file mode 100644 index 00000000..7e6e90fb --- /dev/null +++ b/blacklistfilter/evaluator/split_evidence/SplitEvidence.cpp @@ -0,0 +1,207 @@ +#ifdef HAVE_CONFIG_H +#include +#endif + +#include +#include +#include +#include +#include +#include +#include +#include +#include "fields.h" + +#include "FileHandler.h" + +trap_module_info_t *module_info = NULL; + +/** + * Definition of basic module information - module name, module description, number of input and output interfaces + */ +#define MODULE_BASIC_INFO(BASIC) \ + BASIC("split_evidence", \ + "The module saves flows into files with the name given by value of UniRec field." \ + "", 1, 0) + +/** + * Definition of module parameters - every parameter has short_opt, long_opt, description, + * flag whether an argument is required or it is optional and argument type which is NULL + * in case the parameter does not need argument. + * Module parameter argument types: int8, int16, int32, int64, uint8, uint16, uint32, uint64, float, string + */ +#define MODULE_PARAMS(PARAM) \ + PARAM('f', "field", "UniRec field used for splitting.", required_argument, "string") \ + PARAM('p', "path", "Path to directory where to store files.", required_argument, "string") +//PARAM(char, char *, char *, no_argument or required_argument, char *) + +#define CHECK_INTERVAL 5000 +#define CLOSE_TIMEOUT 10000 + +#define MAX_KEYSTR_SIZE 400 +static const char KEYSTR_DELIM[] = ","; +static int stop = 0; + +TRAP_DEFAULT_SIGNAL_HANDLER(stop = 1) +urcsv_t *csv = NULL; + +ur_template_t *in_tmplt = NULL; + +int main(int argc, char **argv) { + int ret; + signed char opt; + const char *field_name = NULL; + const char *output_path = NULL; + + /* **** TRAP initialization **** */ + + INIT_MODULE_INFO_STRUCT(MODULE_BASIC_INFO, MODULE_PARAMS) + + /** + * Let TRAP library parse program arguments, extract its parameters and initialize module interfaces + */ + TRAP_DEFAULT_INITIALIZATION(argc, argv, *module_info); + + /** + * Register signal handler. + */ + TRAP_REGISTER_DEFAULT_SIGNAL_HANDLER(); + + /** + * Parse program arguments defined by MODULE_PARAMS macro with getopt() function (getopt_long() if available) + * This macro is defined in config.h file generated by configure script + */ + while ((opt = TRAP_GETOPT(argc, argv, module_getopt_string, long_options)) != -1) { + switch (opt) { + case 'f': + field_name = optarg; + break; + case 'p': + output_path = optarg; + break; + default: + fprintf(stderr, "Invalid arguments.\n"); + FREE_MODULE_INFO_STRUCT(MODULE_BASIC_INFO, MODULE_PARAMS); + TRAP_DEFAULT_FINALIZATION(); + return -1; + } + } + + if (field_name == NULL || output_path == NULL) { + fprintf(stderr, "Error: missing required parameter -f or -p.\n"); + + // Do all necessary cleanup in libtrap before exiting + TRAP_DEFAULT_FINALIZATION(); + + // Release allocated memory for module_info structure + FREE_MODULE_INFO_STRUCT(MODULE_BASIC_INFO, MODULE_PARAMS) + + // Free unirec template + ur_free_template(in_tmplt); + ur_finalize(); + + return 1; + } + + /* **** Create UniRec templates **** */ + + trap_set_required_fmt(0, TRAP_FMT_UNIREC, ""); + + /* **** Main processing loop **** */ + + int field_id = UR_E_INVALID_NAME; + + FileHandler file_handler(output_path, std::chrono::milliseconds(CHECK_INTERVAL), std::chrono::milliseconds(CLOSE_TIMEOUT)); + file_handler.start_handler(); + // Read data from input, process them and write to output + while (!stop) { + const void *in_rec; + uint16_t in_rec_size; + + // Receive data from input interface 0. + // Block if data are not available immediately (unless a timeout is set using trap_ifcctl) + ret = TRAP_RECEIVE(0, in_rec, in_rec_size, in_tmplt); + if (ret == TRAP_E_FORMAT_CHANGED) { + free(csv); + csv = urcsv_init(in_tmplt, ','); + if (csv == NULL) { + fprintf(stderr, "Failed to initialize UniRec2CSV converter.\n"); + break; + } + } + // Handle possible errors + TRAP_DEFAULT_RECV_ERROR_HANDLING(ret, continue, break); + + if (field_id == UR_E_INVALID_NAME) { + /* first initialization of splitter key */ + field_id = ur_get_id_by_name(field_name); + if (field_id == UR_E_INVALID_NAME) { + fprintf(stderr, "Error: field %s was not found in the input template.\n", field_name); + break; + } + } + + // Check size of received data + if (in_rec_size < ur_rec_fixlen_size(in_tmplt)) { + if (in_rec_size <= 1) { + break; // End of data (used for testing purposes) + } else { + fprintf(stderr, "Error: data with wrong size received (expected size: >= %hu, received size: %hu)\n", + ur_rec_fixlen_size(in_tmplt), in_rec_size); + break; + } + } + + char keystr[MAX_KEYSTR_SIZE]; + + uint32_t keysize = urcsv_field(keystr, MAX_KEYSTR_SIZE - 1, in_rec, static_cast(field_id), in_tmplt); + keystr[keysize] = 0; + + char *r; + while ((r = strchr(keystr, '/')) != NULL) { + *r = '_'; + } + + if ( keysize == 0 ) { + strcpy(keystr, "UNKNOWN"); + keysize = strlen("UNKNOWN"); + } + + char idstr[MAX_KEYSTR_SIZE]; + char *keystr_ptr = keystr; + + // Remove quotes from the key string + if (keystr[0] == '"') { + keystr_ptr++; + keystr[keysize - 1] = 0; + keysize -= 2; + } + + strcpy(idstr, keystr_ptr); + char *token = strtok(idstr, KEYSTR_DELIM); + + // Handle multiple IDs inside the 'field_id' + if (token == NULL) { + file_handler.write_to_file(keystr_ptr, in_rec, csv); + } else { + file_handler.write_to_file(token, in_rec, csv); + + while ( (token = strtok(NULL, ",")) != NULL ) + file_handler.write_to_file(token, in_rec, csv); + } + } + + // Do all necessary cleanup in libtrap before exiting + TRAP_DEFAULT_FINALIZATION(); + + // Release allocated memory for module_info structure + FREE_MODULE_INFO_STRUCT(MODULE_BASIC_INFO, MODULE_PARAMS) + + // Free unirec template + urcsv_free(&csv); + ur_free_template(in_tmplt); + ur_finalize(); + + return 0; +} + diff --git a/blacklistfilter/evaluator/split_evidence/fields.c b/blacklistfilter/evaluator/split_evidence/fields.c new file mode 100644 index 00000000..8c2c8d1d --- /dev/null +++ b/blacklistfilter/evaluator/split_evidence/fields.c @@ -0,0 +1,12 @@ +/************* THIS IS AUTOMATICALLY GENERATED FILE, DO NOT EDIT *************/ +// Tables are indexed by field ID +#include "fields.h" + +char *ur_field_names_static[] = { +}; +short ur_field_sizes_static[] = { +}; +ur_field_type_t ur_field_types_static[] = { +}; +ur_static_field_specs_t UR_FIELD_SPECS_STATIC = {ur_field_names_static, ur_field_sizes_static, ur_field_types_static, 0}; +ur_field_specs_t ur_field_specs = {ur_field_names_static, ur_field_sizes_static, ur_field_types_static, 0, 0, 0, NULL, UR_UNINITIALIZED}; diff --git a/blacklistfilter/evaluator/split_evidence/fields.h b/blacklistfilter/evaluator/split_evidence/fields.h new file mode 100644 index 00000000..faebc1f6 --- /dev/null +++ b/blacklistfilter/evaluator/split_evidence/fields.h @@ -0,0 +1,13 @@ +#ifndef _UR_FIELDS_H_ +#define _UR_FIELDS_H_ + +/************* THIS IS AUTOMATICALLY GENERATED FILE, DO NOT EDIT *************/ +#include + + +extern uint16_t ur_last_id; +extern ur_static_field_specs_t UR_FIELD_SPECS_STATIC; +extern ur_field_specs_t ur_field_specs; + +#endif +