Proactive Threat Identification through Security Log Analysis

A Python-based solution that analyzes security logs to identify potential threats and generate actionable reports for security teams.

View Alert Demo
Security Log Analysis

Role

Lead Developer

Timeline

3 Months

Technologies

Python Pandas Machine Learning Elasticsearch

The Challenge

Modern security environments generate massive volumes of log data from various sources, making it difficult for security teams to identify potential threats in a timely manner. The organization faced several challenges with their existing security monitoring approach:

Key Issues

  • Overwhelming volume of security logs (10+ million events daily)
  • Manual review processes that couldn't scale
  • Delayed detection of potential security incidents
  • High rate of false positives in existing alerting
  • Difficulty correlating events across multiple systems

Goals

  • Automate the analysis of security logs
  • Identify potential threats with high accuracy
  • Reduce false positives by at least 50%
  • Generate actionable reports for security teams
  • Enable proactive threat hunting

The Solution

I developed a comprehensive Python-based solution that collects, analyzes, and correlates security logs from multiple sources to identify potential threats. The solution uses advanced analytics techniques, including machine learning algorithms, to detect anomalies and patterns indicative of security incidents.

Security Log Analysis Architecture
Architecture diagram showing the components and data flow of the Security Log Analysis solution

Key Components

Implementation Process

  1. Log Source Integration: Developed connectors for various log sources to collect security events
  2. Data Preprocessing: Created pipelines to clean, normalize, and enrich log data
  3. Rule Development: Implemented detection rules based on known attack patterns and security best practices
  4. Machine Learning Model: Trained models to detect anomalies and unusual patterns in user and system behavior
  5. Alert Generation: Developed a system to create prioritized, actionable alerts with relevant context
  6. Visualization & Reporting: Created dashboards and reports to help security teams quickly understand and respond to threats
  7. Continuous Improvement: Implemented feedback mechanisms to refine detection capabilities over time

Alert Demonstration

Below is an example of the type of security alert generated by the system when it detects a potential threat:

Potential Brute Force Attack Detected
HIGH
Source IP
198.51.100.73
Target System
auth-server-prod-03
Timestamp
2025-05-15 03:42:17 UTC
Failed Attempts
27 in 3 minutes
Description
Multiple failed authentication attempts detected from a single source IP targeting multiple user accounts. Pattern consistent with brute force attack.
Recommended Action
Block source IP address and investigate potentially compromised accounts. Review authentication logs for additional suspicious activity.
May 15 03:42:15 auth-server-prod-03 sshd[12345]: Failed password for user admin from 198.51.100.73 port 49812 ssh2
May 15 03:42:16 auth-server-prod-03 sshd[12346]: Failed password for user root from 198.51.100.73 port 49813 ssh2
May 15 03:42:17 auth-server-prod-03 sshd[12347]: Failed password for user admin from 198.51.100.73 port 49814 ssh2
May 15 03:42:18 auth-server-prod-03 sshd[12348]: Failed password for user sysadmin from 198.51.100.73 port 49815 ssh2
May 15 03:42:19 auth-server-prod-03 sshd[12349]: Failed password for user administrator from 198.51.100.73 port 49816 ssh2

Detection Algorithm

The system uses a combination of rule-based detection and machine learning to identify potential threats. Here's a simplified example of the Python code used for brute force detection:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

class BruteForceDetector:
    def __init__(self, threshold=5, time_window_minutes=5):
        """
        Initialize the brute force detector with configurable parameters.
        
        Args:
            threshold: Number of failed attempts that triggers an alert
            time_window_minutes: Time window to consider for failed attempts
        """
        self.threshold = threshold
        self.time_window = timedelta(minutes=time_window_minutes)
        
    def detect(self, auth_logs_df):
        """
        Detect potential brute force attacks in authentication logs.
        
        Args:
            auth_logs_df: DataFrame containing authentication logs with columns:
                          timestamp, source_ip, username, success, system
                          
        Returns:
            DataFrame containing potential brute force attacks
        """
        # Convert timestamp to datetime if it's not already
        if not pd.api.types.is_datetime64_dtype(auth_logs_df['timestamp']):
            auth_logs_df['timestamp'] = pd.to_datetime(auth_logs_df['timestamp'])
        
        # Filter for failed authentication attempts
        failed_auths = auth_logs_df[auth_logs_df['success'] == False].copy()
        
        # Group by source IP and system
        grouped = failed_auths.groupby(['source_ip', 'system'])
        
        alerts = []
        
        # Analyze each group
        for (source_ip, system), group in grouped:
            # Sort by timestamp
            group = group.sort_values('timestamp')
            
            # Check for threshold violations within the time window
            for i in range(len(group)):
                start_time = group.iloc[i]['timestamp']
                end_time = start_time + self.time_window
                
                # Count failed attempts within the time window
                window_attempts = group[(group['timestamp'] >= start_time) & 
                                       (group['timestamp'] <= end_time)]
                
                if len(window_attempts) >= self.threshold:
                    # Check if attempts are for different usernames (more suspicious)
                    unique_users = window_attempts['username'].nunique()
                    
                    # Calculate severity based on number of attempts and unique users
                    severity = self._calculate_severity(len(window_attempts), unique_users)
                    
                    # Create alert
                    alert = {
                        'source_ip': source_ip,
                        'target_system': system,
                        'timestamp': start_time,
                        'end_timestamp': window_attempts.iloc[-1]['timestamp'],
                        'failed_attempts': len(window_attempts),
                        'unique_users': unique_users,
                        'severity': severity,
                        'alert_type': 'Potential Brute Force Attack',
                        'log_samples': window_attempts.head(5).to_dict('records')
                    }
                    
                    alerts.append(alert)
                    
                    # Skip ahead to avoid duplicate alerts
                    break
        
        return pd.DataFrame(alerts) if alerts else pd.DataFrame()
    
    def _calculate_severity(self, attempt_count, unique_users):
        """Calculate alert severity based on attempt count and unique users."""
        if attempt_count > 20 and unique_users > 3:
            return 'HIGH'
        elif attempt_count > 10 or unique_users > 2:
            return 'MEDIUM'
        else:
            return 'LOW'


# Example usage
if __name__ == "__main__":
    # Sample authentication logs
    logs = [
        {'timestamp': '2025-05-15 03:42:15', 'source_ip': '198.51.100.73', 
         'username': 'admin', 'success': False, 'system': 'auth-server-prod-03'},
        {'timestamp': '2025-05-15 03:42:16', 'source_ip': '198.51.100.73', 
         'username': 'root', 'success': False, 'system': 'auth-server-prod-03'},
        # ... more log entries
    ]
    
    # Convert to DataFrame
    logs_df = pd.DataFrame(logs)
    
    # Initialize detector
    detector = BruteForceDetector(threshold=5, time_window_minutes=5)
    
    # Detect potential brute force attacks
    alerts = detector.detect(logs_df)
    
    # Print alerts
    if not alerts.empty:
        for _, alert in alerts.iterrows():
            print(f"ALERT: {alert['alert_type']} - Severity: {alert['severity']}")
            print(f"Source IP: {alert['source_ip']}")
            print(f"Target System: {alert['target_system']}")
            print(f"Failed Attempts: {alert['failed_attempts']} ({alert['unique_users']} unique users)")
            print(f"Timestamp: {alert['timestamp']} to {alert['end_timestamp']}")
            print("---")

Results & Impact

The Security Log Analysis solution significantly improved the organization's ability to detect and respond to potential security threats:

85%

Reduction in false positives

67%

Faster threat detection

12

Previously undetected threats identified

Business Impact

Security Team Feedback

"This solution has transformed our security operations. We're now able to detect potential threats that would have previously gone unnoticed, and the reduction in false positives means our team can focus on real security issues instead of chasing noise." — Security Operations Manager

Lessons Learned

This project provided valuable insights into effective security log analysis and threat detection:

Key Takeaways

Future Improvements

Potential enhancements for future iterations of the solution include:

Interested in a Similar Solution?

I'd love to discuss how my experience with security log analysis could benefit your organization.

Get In Touch