A Python-based solution that analyzes security logs to identify potential threats and generate actionable reports for security teams.
View Alert DemoModern security environments generate massive volumes of log data from various sources, making it difficult for security teams to identify potential threats in a timely manner. The organization faced several challenges with their existing security monitoring approach:
I developed a comprehensive Python-based solution that collects, analyzes, and correlates security logs from multiple sources to identify potential threats. The solution uses advanced analytics techniques, including machine learning algorithms, to detect anomalies and patterns indicative of security incidents.
Below is an example of the type of security alert generated by the system when it detects a potential threat:
The system uses a combination of rule-based detection and machine learning to identify potential threats. Here's a simplified example of the Python code used for brute force detection:
import pandas as pd import numpy as np from datetime import datetime, timedelta class BruteForceDetector: def __init__(self, threshold=5, time_window_minutes=5): """ Initialize the brute force detector with configurable parameters. Args: threshold: Number of failed attempts that triggers an alert time_window_minutes: Time window to consider for failed attempts """ self.threshold = threshold self.time_window = timedelta(minutes=time_window_minutes) def detect(self, auth_logs_df): """ Detect potential brute force attacks in authentication logs. Args: auth_logs_df: DataFrame containing authentication logs with columns: timestamp, source_ip, username, success, system Returns: DataFrame containing potential brute force attacks """ # Convert timestamp to datetime if it's not already if not pd.api.types.is_datetime64_dtype(auth_logs_df['timestamp']): auth_logs_df['timestamp'] = pd.to_datetime(auth_logs_df['timestamp']) # Filter for failed authentication attempts failed_auths = auth_logs_df[auth_logs_df['success'] == False].copy() # Group by source IP and system grouped = failed_auths.groupby(['source_ip', 'system']) alerts = [] # Analyze each group for (source_ip, system), group in grouped: # Sort by timestamp group = group.sort_values('timestamp') # Check for threshold violations within the time window for i in range(len(group)): start_time = group.iloc[i]['timestamp'] end_time = start_time + self.time_window # Count failed attempts within the time window window_attempts = group[(group['timestamp'] >= start_time) & (group['timestamp'] <= end_time)] if len(window_attempts) >= self.threshold: # Check if attempts are for different usernames (more suspicious) unique_users = window_attempts['username'].nunique() # Calculate severity based on number of attempts and unique users severity = self._calculate_severity(len(window_attempts), unique_users) # Create alert alert = { 'source_ip': source_ip, 'target_system': system, 'timestamp': start_time, 'end_timestamp': window_attempts.iloc[-1]['timestamp'], 'failed_attempts': len(window_attempts), 'unique_users': unique_users, 'severity': severity, 'alert_type': 'Potential Brute Force Attack', 'log_samples': window_attempts.head(5).to_dict('records') } alerts.append(alert) # Skip ahead to avoid duplicate alerts break return pd.DataFrame(alerts) if alerts else pd.DataFrame() def _calculate_severity(self, attempt_count, unique_users): """Calculate alert severity based on attempt count and unique users.""" if attempt_count > 20 and unique_users > 3: return 'HIGH' elif attempt_count > 10 or unique_users > 2: return 'MEDIUM' else: return 'LOW' # Example usage if __name__ == "__main__": # Sample authentication logs logs = [ {'timestamp': '2025-05-15 03:42:15', 'source_ip': '198.51.100.73', 'username': 'admin', 'success': False, 'system': 'auth-server-prod-03'}, {'timestamp': '2025-05-15 03:42:16', 'source_ip': '198.51.100.73', 'username': 'root', 'success': False, 'system': 'auth-server-prod-03'}, # ... more log entries ] # Convert to DataFrame logs_df = pd.DataFrame(logs) # Initialize detector detector = BruteForceDetector(threshold=5, time_window_minutes=5) # Detect potential brute force attacks alerts = detector.detect(logs_df) # Print alerts if not alerts.empty: for _, alert in alerts.iterrows(): print(f"ALERT: {alert['alert_type']} - Severity: {alert['severity']}") print(f"Source IP: {alert['source_ip']}") print(f"Target System: {alert['target_system']}") print(f"Failed Attempts: {alert['failed_attempts']} ({alert['unique_users']} unique users)") print(f"Timestamp: {alert['timestamp']} to {alert['end_timestamp']}") print("---")
The Security Log Analysis solution significantly improved the organization's ability to detect and respond to potential security threats:
Reduction in false positives
Faster threat detection
Previously undetected threats identified
"This solution has transformed our security operations. We're now able to detect potential threats that would have previously gone unnoticed, and the reduction in false positives means our team can focus on real security issues instead of chasing noise." — Security Operations Manager
This project provided valuable insights into effective security log analysis and threat detection:
Potential enhancements for future iterations of the solution include:
I'd love to discuss how my experience with security log analysis could benefit your organization.
Get In Touch