Log Sentinel es un mini-SIEM educativo escrito en Python.
Procesa logs, detecta fuerza bruta, eventos fuera de horario, genera alertas,
mantiene historial histórico, correlaciona eventos y presenta un dashboard de seguridad en consola.
Log_Sentinel/
├── README.md
├── main.py # Punto de entrada del SIEM
├── rules.json # Reglas configurables
├── logs/
│ └── sample.log # Log de ejemplo
├── reports/ # Reportes completos (CSV)
├── alerts/ # Reportes de alertas (CSV)
├── data/
│ └── alert_history.csv # Historial persistente de alertas
└── core/
├── parser.py # Extracción de IP / usuario / timestamp / evento
├── analyzer.py # Detección (offhours + brute force)
├── reporter.py # Consola + guardado de reportes
├── alert_storage.py # Persistencia de alertas en historial
├── history_reader.py # Carga + estadísticas del historial
└── correlator.py # Correlación local + histórica de eventos
git clone https://github.com/gasparswidzinski/Log_Sentinel.git
cd Log_Sentinel
pip install -r requirements.txt
python main.py💡 Asegurate de tener Python 3.11 y
pipinstalados.
Esta sección explica, paso a paso, qué hace el programa cuando ejecutás:
python main.py- Carga las reglas desde
rules.json. - Lee y parsea el archivo de logs con
LogParser. - Normaliza todo en un DataFrame de pandas.
- Detecta:
- eventos fuera de horario laboral (
offhours); - posibles ataques de fuerza bruta (
bruteforce).
- eventos fuera de horario laboral (
- Guarda las alertas en un histórico (
data/alert_history.csv). - Calcula estadísticas históricas sobre todas las corridas.
- Correlaciona eventos:
- fallidos → exitosos (correlación local);
- brute force histórico → login exitoso actual (correlación histórica).
- Genera reportes en CSV y muestra un dashboard en consola usando Rich.
Archivo de configuración principal:
{
"working_hours": { "start": 9, "end": 18 },
"failed_login": {
"threshold": 3,
"window_minutes": 10,
"message": "posible ataque de fuerza bruta"
},
"sudo_command": {
"outside_working_hours": true,
"message": "uso de comando sudo fuera del horario laboral"
}
}🔧 Para qué se usa cada campo:
working_hours.start/end: horario laboral permitido (en horas).failed_login.threshold: cantidad mínima de intentos fallidos para considerar brute force.failed_login.window_minutes: ventana de tiempo (en minutos) para contar fallos.failed_login.message/sudo_command.message: texto que se usa en la descripción de la regla.
La clase LogParser se encarga de tomar una línea de log cruda y devolver un diccionario normalizado con la info clave.
- IP → usando un regex IPv4 en
extract_ip(). - Usuario →
extract_user()intenta, en orden, patrones típicos:"invalid user (\w+)""for (\w+)""sudo:\s+(\w+)""\((\w+)\)"(caso CRON)
- Evento (
event) →classify_event()mapea:"Failed password"→failed_login"Accepted password"→successful_login"sudo:"→sudo_command"CRON"→cron_job"GET"/"POST"→web_request- cualquier otro →
other
- Timestamp →
extract_timestamp()soporta:- formato syslog:
Oct 30 10:10:10 ... - formato Apache:
[30/Oct/2025:13:20:01 +0000]
- formato syslog:
{
"raw_line": "Oct 30 10:10:10 server1 sshd[1111]: Failed password for root from 45.23.11.90 port 54421 ssh2",
"ip": "45.23.11.90",
"user": "root",
"event": "failed_login",
"timestamp": datetime(...)
}Módulo: core/analyzer.py
LogAnalyzer usa el parser para leer el archivo de log completo y armar un DataFrame de pandas con todas las líneas relevantes:
from core.parser import LogParser
from core.analyzer import LogAnalyzer
parser = LogParser()
analyzer = LogAnalyzer(parser, rules=rules)
df = analyzer.read_log_file("logs/sample.log")El DataFrame resultante tiene, al menos, las columnas:
timestampipusereventraw_line
Ejemplo conceptual:
| timestamp | ip | user | event | raw_line |
|---|---|---|---|---|
| 2025-10-30 10:10:10 | 45.23.11.90 | root | failed_login | ... |
| 2025-10-30 10:10:30 | 45.23.11.90 | root | successful_login | ... |
- Lee
working_hoursdesderules.json. - Filtra filas con
timestampno nulo. - Calcula
hour = df["timestamp"].dt.hour. - Devuelve solo aquellas filas donde la hora está fuera del rango
[start, end].
Uso típico:
fuera = analyzer.detect_offhour(df)- Filtra
event == "failed_login". - Lee
thresholdywindow_minutesdesderules.json. - Si no hay
window_minutes:- cuenta intentos fallidos totales por IP.
- Si hay
window_minutes:- usa una ventana móvil de tiempo por IP para ver si se superan los N intentos en X minutos.
La función devuelve:
brute_df, threshold, window_minutes = analyzer.detect_bruteforce(df)El módulo alert_storage.py se encarga de persistir las alertas de cada corrida en un archivo histórico CSV.
ensure_history_dir()→ creadata/si no existe.append_alert(offhours_df, bruteforce_df, rules=None):- normaliza
offhours_dfybruteforce_dfa un formato común; - agrega:
run_timestamp(momento de la corrida);alert_type("offhours"o"bruteforce");rule(mensaje baseado enrules.json);
- guarda o appendea en:
- normaliza
data/alert_history.csv
| run_timestamp | event_timestamp | alert_type | ip | user | rule | raw_line |
|---|---|---|---|---|---|---|
| 2025-10-30 10:11:00 | 2025-10-30 10:10:10 | bruteforce | 45.23.11.90 | root | posible fuerza bruta: ≥3 en 10 minutos | ... |
Este módulo permite mirar el historial completo de alertas.
-
load_history()- carga el CSV
data/alert_history.csv; - convierte
run_timestampyevent_timestampadatetime.
- carga el CSV
-
history_stats(df)devuelve un diccionario con:
{
"total": <total_alertas>,
"by_type": {"offhours": N1, "bruteforce": N2, ...},
"top_ips": {"45.23.11.90": X, "200.55.22.100": Y, ...},
"top_users": {"root": A, "admin": B, ...}
}Estas estadísticas se utilizan para:
- mostrar el total de alertas;
- listar el top de IPs y usuarios;
- armar paneles del dashboard.
LogCorrelator compara:
- eventos de la corrida actual (
events_df); - historial de alertas (
history_df);
para detectar patrones más avanzados.
from core.correlator import LogCorrelator
correlator = LogCorrelator(events_df=df, history_df=history_df)
local_corr = correlator.correlate_local(window_minutes=20, min_fails=1)
historical_corr = correlator.correlate_with_history(history_window_hours=24, min_alerts=1)
correlator.save_correlations(local_corr, historical_corr)Busca secuencias:
- Uno o varios
failed_login - Seguidos de un
successful_login - Misma
(ip, user) - Dentro de una ventana de X minutos
Es útil para casos del tipo:
“Hubo varios intentos fallidos y luego un login exitoso para la misma IP/usuario.”
Cruza:
- alertas históricas de tipo
bruteforce - logins exitosos actuales (
successful_login)
Si una IP/usuario tuvo fuerza bruta en las últimas N horas y ahora tiene un login exitoso, se genera una correlación sospechosa.
Las correlaciones se guardan como CSV en:
alerts/correlated_local.csv
alerts/correlated_history.csv
LogReporter se encarga de:
- guardar DataFrames en CSV;
- mostrar resultados formateados en consola con Rich.
Por defecto se crean (o se pueden crear) archivos como:
reports/full_log_report.csv→ todos los eventos parseadosalerts/offhours_report.csv→ eventos fuera de horarioalerts/bruteforce_report.csv→ intentos de fuerza brutaalerts/correlated_local.csv→ correlaciones localesalerts/correlated_history.csv→ correlaciones históricas
Funciones como:
show_offhours(df)show_bruteforce(df, threshold, window_minutes)
usan Rich para mostrar:
- tablas con headers;
- colores;
- mensajes claros sobre si se detectaron o no eventos sospechosos.
En main.py se define una función show_dashboard_summary(...) que muestra un mini dashboard tipo SIEM, con paneles Rich:
-
Resumen general:
- líneas procesadas;
- total de alertas;
- correlaciones locales e históricas.
-
Alertas por tipo:
Tabla con cantidad por tipo de alerta. -
Top IPs y Top usuarios:
Paneles con las IPs y usuarios con más alertas. -
Resumen de correlaciones:
Cantidad de correlaciones locales e históricas detectadas.
Es el panel central que da una vista rápida del estado de seguridad según los logs analizados.
┌───────────────┐
│ rules.json │
└───────┬───────┘
▼
┌────────────────────┐
│ LogParser │
│ (parse_line) │
└─────────┬──────────┘
▼
┌─────────────────────────────┐
│ DataFrame (df) │
└─────────┬──────────┬────────┘
│ │
▼ ▼
┌──────────────────┐ ┌───────────────────┐
│ detect_offhour │ │ detect_bruteforce │
└─────────┬────────┘ └──────────┬────────┘
▼ ▼
┌────────────────┐ ┌────────────────┐
│ offhours_df │ │ bruteforce_df │
└─────────┬────────┘ └──────────┬────┘
▼ ▼
┌────────────────────┐ ┌─────────────────────┐
│ append_alert() │ │ alert_history.csv │
└─────────┬──────────┘ └──────────┬──────────┘
▼ ▼
┌──────────────────┐ ┌─────────────────────────┐
│ history_stats │ │ LogCorrelator │
└─────────┬────────┘ └──────────┬─────────────┘
▼ ▼
┌─────────────────┐ ┌────────────────────────┐
│ Dashboard (Rich) │ │ save_correlations() │
└─────────────────┘ └────────────────────────┘
Log Sentinel está pensado para funcionar con:
- Logs de sistema tipo syslog:
sshd,sudo,CRON, etc.
- Logs de Apache/Nginx en formato access log.
- Cualquier log de texto que contenga:
- timestamps reconocibles;
- mensajes con IPs;
- información de usuario.
Con modificaciones mínimas al parser, se puede extender a otros formatos.
- Reglas ampliadas para distintos tipos de eventos (web, base de datos, etc.).
- Integración con API / webhooks para enviar alertas a otros sistemas.
- Dashboard web con Streamlit u otra herramienta.
- Soporte para múltiples archivos de log en una sola corrida.
- CLI con argumentos para elegir archivo de input, reglas, path de salida, etc.
Gaspar Swidzinski
Proyecto personal de aprendizaje en:
- 🛡️ Ciberseguridad
- 🐍 Desarrollo en Python
- 📊 Análisis y correlación de logs
Distribuido bajo la licencia MIT License.
Podés usarlo, modificarlo y compartirlo libremente citando al autor original.
Log Sentinel is an educational mini-SIEM written in Python.
It processes logs, detects brute-force attempts and off-hours activity,
keeps a historical alert store, correlates events, and shows a security dashboard in the console.
Log_Sentinel/
├── README.md
├── main.py # SIEM entry point
├── rules.json # Detection rules
├── logs/
│ └── sample.log # Sample log
├── reports/ # Full reports (CSV)
├── alerts/ # Alert reports (CSV)
├── data/
│ └── alert_history.csv # Persistent alert history
└── core/
├── parser.py # Extract IP / user / timestamp / event
├── analyzer.py # Detection (offhours + brute force)
├── reporter.py # Console output + report saving
├── alert_storage.py # Alert history persistence
├── history_reader.py # History loading + statistics
└── correlator.py # Local + historical event correlation
git clone https://github.com/gasparswidzinski/Log_Sentinel.git
cd Log_Sentinel
pip install -r requirements.txt
python main.py💡 Make sure you have Python 3.11 and
pipinstalled.
This section explains, step by step, what the program does when you run:
python main.py- Loads rules from
rules.json. - Reads and parses the log file using
LogParser. - Normalizes everything into a pandas DataFrame.
- Detects:
- off-hours activity (
offhours); - possible brute-force attacks (
bruteforce).
- off-hours activity (
- Appends alerts to a persistent history (
data/alert_history.csv). - Computes historical statistics across all runs.
- Correlates events:
- failed → successful logins (local correlation);
- past brute-force → current successful login (historical correlation).
- Generates CSV reports and shows a Rich-powered console dashboard.
Main configuration file:
{
"working_hours": { "start": 9, "end": 18 },
"failed_login": {
"threshold": 3,
"window_minutes": 10,
"message": "possible brute force attack"
},
"sudo_command": {
"outside_working_hours": true,
"message": "sudo command used outside working hours"
}
}🔧 What each field is used for:
working_hours.start/end: allowed working hour range (in hours).failed_login.threshold: minimum number of failed attempts to flag brute force.failed_login.window_minutes: time window (minutes) to count failures.failed_login.message/sudo_command.message: human-readable rule descriptions.
LogParser takes a raw log line and returns a normalized dictionary with the key information.
- IP → regular IPv4 regex in
extract_ip(). - User →
extract_user()tries several patterns:"invalid user (\w+)""for (\w+)""sudo:\s+(\w+)""\((\w+)\)"(CRON case)
- Event (
event) →classify_event()maps:"Failed password"→failed_login"Accepted password"→successful_login"sudo:"→sudo_command"CRON"→cron_job"GET"/"POST"→web_request- anything else →
other
- Timestamp →
extract_timestamp()supports:- syslog style:
Oct 30 10:10:10 ... - Apache style:
[30/Oct/2025:13:20:01 +0000]
- syslog style:
{
"raw_line": "Oct 30 10:10:10 server1 sshd[1111]: Failed password for root from 45.23.11.90 port 54421 ssh2",
"ip": "45.23.11.90",
"user": "root",
"event": "failed_login",
"timestamp": datetime(...)
}Module: core/analyzer.py
LogAnalyzer uses the parser to read the full log file and build a pandas DataFrame with all relevant lines:
from core.parser import LogParser
from core.analyzer import LogAnalyzer
parser = LogParser()
analyzer = LogAnalyzer(parser, rules=rules)
df = analyzer.read_log_file("logs/sample.log")The resulting DataFrame contains at least:
timestampipusereventraw_line
Conceptual example:
| timestamp | ip | user | event | raw_line |
|---|---|---|---|---|
| 2025-10-30 10:10:10 | 45.23.11.90 | root | failed_login | ... |
| 2025-10-30 10:10:30 | 45.23.11.90 | root | successful_login | ... |
- Reads
working_hoursfromrules.json. - Filters rows with a non-null
timestamp. - Computes
hour = df["timestamp"].dt.hour. - Returns only rows where the hour is outside
[start, end].
Typical usage:
offhours_df = analyzer.detect_offhour(df)- Filters
event == "failed_login". - Reads
thresholdandwindow_minutesfromrules.json. - If
window_minutesis missing:- counts total failed attempts per IP.
- If
window_minutesis set:- uses a sliding time window per IP to check if N attempts occur within X minutes.
The function returns:
brute_df, threshold, window_minutes = analyzer.detect_bruteforce(df)alert_storage.py is responsible for persisting alerts from each run into a historical CSV.
ensure_history_dir()→ createsdata/if it does not exist.append_alert(offhours_df, bruteforce_df, rules=None):- normalizes
offhours_dfandbruteforce_dfinto a common schema; - adds:
run_timestamp(when the run occurred);alert_type("offhours"or"bruteforce");rule(message based onrules.json);
- saves/appends to:
- normalizes
data/alert_history.csv
| run_timestamp | event_timestamp | alert_type | ip | user | rule | raw_line |
|---|---|---|---|---|---|---|
| 2025-10-30 10:11:00 | 2025-10-30 10:10:10 | bruteforce | 45.23.11.90 | root | possible brute force: ≥3 in 10 minutes | ... |
This module lets you inspect the full alert history.
-
load_history()- loads
data/alert_history.csv; - converts
run_timestampandevent_timestamptodatetime.
- loads
-
history_stats(df)returns a dictionary like:
{
"total": <total_alerts>,
"by_type": {"offhours": N1, "bruteforce": N2, ...},
"top_ips": {"45.23.11.90": X, "200.55.22.100": Y, ...},
"top_users": {"root": A, "admin": B, ...}
}These stats are used to:
- show total alerts;
- list top IPs and users;
- build dashboard panels.
LogCorrelator compares:
- current run events (
events_df); - historical alerts (
history_df);
to detect more advanced patterns.
from core.correlator import LogCorrelator
correlator = LogCorrelator(events_df=df, history_df=history_df)
local_corr = correlator.correlate_local(window_minutes=20, min_fails=1)
historical_corr = correlator.correlate_with_history(history_window_hours=24, min_alerts=1)
correlator.save_correlations(local_corr, historical_corr)Looks for sequences:
- One or more
failed_loginevents - Followed by a
successful_login - Same
(ip, user) - Within a given time window
Useful for situations like:
“There were several failed attempts and then a successful login for the same IP/user.”
Crosses:
- historical
bruteforcealerts - current
successful_loginevents
If an IP/user had brute-force alerts in the last N hours and now has a successful login, it generates a suspicious correlation.
Correlations are saved as CSV in:
alerts/correlated_local.csv
alerts/correlated_history.csv
LogReporter handles:
- saving DataFrames to CSV;
- showing formatted results in the console using Rich.
By default (or optionally), it creates files such as:
reports/full_log_report.csv→ all parsed eventsalerts/offhours_report.csv→ off-hours eventsalerts/bruteforce_report.csv→ brute-force attemptsalerts/correlated_local.csv→ local correlationsalerts/correlated_history.csv→ historical correlations
Functions like:
show_offhours(df)show_bruteforce(df, threshold, window_minutes)
use Rich to display:
- tables with headers;
- colored output;
- clear messages about whether suspicious activity was detected or not.
In main.py, the show_dashboard_summary(...) function renders a mini-SIEM dashboard with Rich panels:
-
General summary:
- processed lines;
- total alerts;
- local & historical correlations.
-
Alerts by type:
Table with counts per alert type. -
Top IPs & top users:
Panels with the most active IPs and users. -
Correlation summary:
Number of local and historical correlations detected.
This is the central panel that gives a quick view of the security posture based on the analyzed logs.
┌───────────────┐
│ rules.json │
└───────┬───────┘
▼
┌────────────────────┐
│ LogParser │
│ (parse_line) │
└─────────┬──────────┘
▼
┌─────────────────────────────┐
│ DataFrame (df) │
└─────────┬──────────┬────────┘
│ │
▼ ▼
┌──────────────────┐ ┌───────────────────┐
│ detect_offhour │ │ detect_bruteforce │
└─────────┬────────┘ └──────────┬────────┘
▼ ▼
┌────────────────┐ ┌────────────────┐
│ offhours_df │ │ bruteforce_df │
└─────────┬────────┘ └──────────┬────┘
▼ ▼
┌────────────────────┐ ┌─────────────────────┐
│ append_alert() │ │ alert_history.csv │
└─────────┬──────────┘ └──────────┬──────────┘
▼ ▼
┌──────────────────┐ ┌─────────────────────────┐
│ history_stats │ │ LogCorrelator │
└─────────┬────────┘ └──────────┬─────────────┘
▼ ▼
┌─────────────────┐ ┌────────────────────────┐
│ Dashboard (Rich) │ │ save_correlations() │
└─────────────────┘ └────────────────────────┘
Log Sentinel is meant to work with:
- syslog-style system logs:
sshd,sudo,CRON, etc.
- Apache/Nginx access logs.
- Any textual log containing:
- recognizable timestamps;
- messages with IPs;
- user information.
With small changes to the parser, it can be extended to additional formats.
- Extended rules for different event classes (web, database, etc.).
- Integration with APIs / webhooks to push alerts elsewhere.
- Web dashboard using Streamlit or similar.
- Support for multiple log files in a single run.
- CLI flags for input log path, rules file, output directory, etc.
Gaspar Swidzinski
Personal learning project around:
- 🛡️ Cybersecurity
- 🐍 Python development
- 📊 Log analysis and event correlation
Distributed under the MIT License.
You are free to use, modify, and share it, with proper attribution to the original author.

















