A Python-based solution that automates the generation of compliance reports from multiple systems, reducing manual effort and improving accuracy.
View Code SampleCompliance reporting is a critical but often time-consuming process that requires gathering data from multiple systems, formatting it according to specific requirements, and ensuring accuracy. The organization faced several challenges with their manual compliance reporting process:
I developed a comprehensive Python-based automation solution that extracts data from multiple systems, transforms it into a standardized format, and generates compliance reports automatically. The solution includes data validation checks to ensure accuracy and can be scheduled to run at regular intervals.
Below is a simplified example of the Python code used to extract data from different systems and generate a compliance report:
import pandas as pd import numpy as np from sqlalchemy import create_engine import requests import json import logging from datetime import datetime # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', filename='compliance_automation.log' ) logger = logging.getLogger('compliance_automation') class ComplianceReportGenerator: def __init__(self, config_file): """Initialize the report generator with configuration settings.""" with open(config_file, 'r') as f: self.config = json.load(f) self.db_engine = create_engine(self.config['database_connection_string']) self.api_base_url = self.config['api_base_url'] self.api_key = self.config['api_key'] logger.info("Compliance Report Generator initialized") def extract_database_data(self, query): """Extract data from database using SQL query.""" try: logger.info(f"Extracting data from database") df = pd.read_sql(query, self.db_engine) logger.info(f"Successfully extracted {len(df)} records from database") return df except Exception as e: logger.error(f"Error extracting data from database: {str(e)}") raise def extract_api_data(self, endpoint, params=None): """Extract data from REST API.""" try: logger.info(f"Extracting data from API endpoint: {endpoint}") headers = {'Authorization': f'Bearer {self.api_key}'} response = requests.get(f"{self.api_base_url}/{endpoint}", headers=headers, params=params) response.raise_for_status() data = response.json() df = pd.json_normalize(data['results']) logger.info(f"Successfully extracted {len(df)} records from API") return df except Exception as e: logger.error(f"Error extracting data from API: {str(e)}") raise def transform_data(self, dfs): """Transform and merge data from different sources.""" try: logger.info("Transforming and merging data") # Standardize column names for i, df in enumerate(dfs): dfs[i] = self.standardize_columns(df) # Merge dataframes merged_df = pd.concat(dfs, ignore_index=True) # Clean data merged_df = self.clean_data(merged_df) # Add calculated fields merged_df = self.add_calculated_fields(merged_df) logger.info(f"Data transformation complete. Final dataset has {len(merged_df)} records") return merged_df except Exception as e: logger.error(f"Error during data transformation: {str(e)}") raise def standardize_columns(self, df): """Standardize column names across different data sources.""" column_mapping = self.config['column_mapping'] df = df.rename(columns=column_mapping) return df def clean_data(self, df): """Clean and validate the data.""" # Remove duplicates df = df.drop_duplicates() # Handle missing values for col in self.config['required_columns']: missing_count = df[col].isna().sum() if missing_count > 0: logger.warning(f"Found {missing_count} missing values in required column {col}") # Apply data type conversions for col, dtype in self.config['column_dtypes'].items(): if col in df.columns: try: df[col] = df[col].astype(dtype) except: logger.warning(f"Could not convert column {col} to {dtype}") return df def add_calculated_fields(self, df): """Add calculated fields required for compliance reporting.""" # Add timestamp df['report_timestamp'] = datetime.now() # Add risk score based on configuration if 'risk_score_factors' in self.config: df['risk_score'] = 0 for factor in self.config['risk_score_factors']: col = factor['column'] weight = factor['weight'] if col in df.columns: df['risk_score'] += df[col] * weight return df def generate_report(self, output_format='excel'): """Generate the compliance report.""" try: logger.info(f"Generating compliance report in {output_format} format") # Extract data from all configured sources dfs = [] # Extract from databases for query_config in self.config['database_queries']: df = self.extract_database_data(query_config['query']) dfs.append(df) # Extract from APIs for api_config in self.config['api_endpoints']: df = self.extract_api_data(api_config['endpoint'], api_config.get('params')) dfs.append(df) # Transform and merge data final_df = self.transform_data(dfs) # Generate report timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"compliance_report_{timestamp}" if output_format == 'excel': output_file = f"{filename}.xlsx" final_df.to_excel(output_file, index=False) elif output_format == 'csv': output_file = f"{filename}.csv" final_df.to_csv(output_file, index=False) elif output_format == 'json': output_file = f"{filename}.json" final_df.to_json(output_file, orient='records') else: raise ValueError(f"Unsupported output format: {output_format}") logger.info(f"Compliance report generated successfully: {output_file}") return output_file except Exception as e: logger.error(f"Error generating compliance report: {str(e)}") raise # Example usage if __name__ == "__main__": generator = ComplianceReportGenerator('config.json') report_file = generator.generate_report(output_format='excel') print(f"Report generated: {report_file}")
The solution uses a configuration file to define data sources, mappings, and report settings:
{ "database_connection_string": "postgresql://user:password@localhost:5432/compliance_db", "api_base_url": "https://api.example.com/v1", "api_key": "API_KEY_HERE", "column_mapping": { "user_id": "user_identifier", "access_level": "permission_level", "last_login": "last_access_date", "system_name": "application_name" }, "required_columns": [ "user_identifier", "permission_level", "application_name" ], "column_dtypes": { "user_identifier": "str", "permission_level": "str", "last_access_date": "datetime64[ns]" }, "database_queries": [ { "name": "active_directory_access", "query": "SELECT user_id, access_level, last_login FROM ad_access WHERE last_login > current_date - interval '90 days'" }, { "name": "erp_access", "query": "SELECT user_id, role_name as access_level, last_login_date as last_login, 'ERP' as system_name FROM erp_users" } ], "api_endpoints": [ { "name": "cloud_access", "endpoint": "users/access", "params": { "days": 90 } } ], "risk_score_factors": [ { "column": "inactive_days", "weight": 0.5 }, { "column": "privilege_level", "weight": 1.2 } ] }
The Compliance Reporting Automation solution delivered significant benefits to the organization, transforming a manual, time-consuming process into an efficient, reliable system.
Reduction in reporting time
Elimination of manual data collection
Decrease in reporting errors
"This automation solution has transformed our compliance reporting process. What used to take weeks of manual effort is now completed in hours with greater accuracy and consistency. It's been a game-changer for our team." — Compliance Manager
This project provided valuable insights into effective compliance automation and data integration:
Potential enhancements for future iterations of the solution include:
I'd love to discuss how my experience with compliance automation could benefit your organization.
Get In Touch