A Python-based solution that analyzes security logs to identify potential threats and generate actionable reports for security teams.
View Alert DemoModern security environments generate massive volumes of log data from various sources, making it difficult for security teams to identify potential threats in a timely manner. The organization faced several challenges with their existing security monitoring approach:
I developed a comprehensive Python-based solution that collects, analyzes, and correlates security logs from multiple sources to identify potential threats. The solution uses advanced analytics techniques, including machine learning algorithms, to detect anomalies and patterns indicative of security incidents.
Below is an example of the type of security alert generated by the system when it detects a potential threat:
The system uses a combination of rule-based detection and machine learning to identify potential threats. Here's a simplified example of the Python code used for brute force detection:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
class BruteForceDetector:
def __init__(self, threshold=5, time_window_minutes=5):
"""
Initialize the brute force detector with configurable parameters.
Args:
threshold: Number of failed attempts that triggers an alert
time_window_minutes: Time window to consider for failed attempts
"""
self.threshold = threshold
self.time_window = timedelta(minutes=time_window_minutes)
def detect(self, auth_logs_df):
"""
Detect potential brute force attacks in authentication logs.
Args:
auth_logs_df: DataFrame containing authentication logs with columns:
timestamp, source_ip, username, success, system
Returns:
DataFrame containing potential brute force attacks
"""
# Convert timestamp to datetime if it's not already
if not pd.api.types.is_datetime64_dtype(auth_logs_df['timestamp']):
auth_logs_df['timestamp'] = pd.to_datetime(auth_logs_df['timestamp'])
# Filter for failed authentication attempts
failed_auths = auth_logs_df[auth_logs_df['success'] == False].copy()
# Group by source IP and system
grouped = failed_auths.groupby(['source_ip', 'system'])
alerts = []
# Analyze each group
for (source_ip, system), group in grouped:
# Sort by timestamp
group = group.sort_values('timestamp')
# Check for threshold violations within the time window
for i in range(len(group)):
start_time = group.iloc[i]['timestamp']
end_time = start_time + self.time_window
# Count failed attempts within the time window
window_attempts = group[(group['timestamp'] >= start_time) &
(group['timestamp'] <= end_time)]
if len(window_attempts) >= self.threshold:
# Check if attempts are for different usernames (more suspicious)
unique_users = window_attempts['username'].nunique()
# Calculate severity based on number of attempts and unique users
severity = self._calculate_severity(len(window_attempts), unique_users)
# Create alert
alert = {
'source_ip': source_ip,
'target_system': system,
'timestamp': start_time,
'end_timestamp': window_attempts.iloc[-1]['timestamp'],
'failed_attempts': len(window_attempts),
'unique_users': unique_users,
'severity': severity,
'alert_type': 'Potential Brute Force Attack',
'log_samples': window_attempts.head(5).to_dict('records')
}
alerts.append(alert)
# Skip ahead to avoid duplicate alerts
break
return pd.DataFrame(alerts) if alerts else pd.DataFrame()
def _calculate_severity(self, attempt_count, unique_users):
"""Calculate alert severity based on attempt count and unique users."""
if attempt_count > 20 and unique_users > 3:
return 'HIGH'
elif attempt_count > 10 or unique_users > 2:
return 'MEDIUM'
else:
return 'LOW'
# Example usage
if __name__ == "__main__":
# Sample authentication logs
logs = [
{'timestamp': '2025-05-15 03:42:15', 'source_ip': '198.51.100.73',
'username': 'admin', 'success': False, 'system': 'auth-server-prod-03'},
{'timestamp': '2025-05-15 03:42:16', 'source_ip': '198.51.100.73',
'username': 'root', 'success': False, 'system': 'auth-server-prod-03'},
# ... more log entries
]
# Convert to DataFrame
logs_df = pd.DataFrame(logs)
# Initialize detector
detector = BruteForceDetector(threshold=5, time_window_minutes=5)
# Detect potential brute force attacks
alerts = detector.detect(logs_df)
# Print alerts
if not alerts.empty:
for _, alert in alerts.iterrows():
print(f"ALERT: {alert['alert_type']} - Severity: {alert['severity']}")
print(f"Source IP: {alert['source_ip']}")
print(f"Target System: {alert['target_system']}")
print(f"Failed Attempts: {alert['failed_attempts']} ({alert['unique_users']} unique users)")
print(f"Timestamp: {alert['timestamp']} to {alert['end_timestamp']}")
print("---")
The Security Log Analysis solution significantly improved the organization's ability to detect and respond to potential security threats:
Reduction in false positives
Faster threat detection
Previously undetected threats identified
"This solution has transformed our security operations. We're now able to detect potential threats that would have previously gone unnoticed, and the reduction in false positives means our team can focus on real security issues instead of chasing noise." — Security Operations Manager
This project provided valuable insights into effective security log analysis and threat detection:
Potential enhancements for future iterations of the solution include:
I'd love to discuss how my experience with security log analysis could benefit your organization.
Get In Touch