A Python-based solution that automates the generation of compliance reports from multiple systems, reducing manual effort and improving accuracy.
View Code SampleCompliance reporting is a critical but often time-consuming process that requires gathering data from multiple systems, formatting it according to specific requirements, and ensuring accuracy. The organization faced several challenges with their manual compliance reporting process:
I developed a comprehensive Python-based automation solution that extracts data from multiple systems, transforms it into a standardized format, and generates compliance reports automatically. The solution includes data validation checks to ensure accuracy and can be scheduled to run at regular intervals.
Below is a simplified example of the Python code used to extract data from different systems and generate a compliance report:
import pandas as pd
import numpy as np
from sqlalchemy import create_engine
import requests
import json
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
filename='compliance_automation.log'
)
logger = logging.getLogger('compliance_automation')
class ComplianceReportGenerator:
def __init__(self, config_file):
"""Initialize the report generator with configuration settings."""
with open(config_file, 'r') as f:
self.config = json.load(f)
self.db_engine = create_engine(self.config['database_connection_string'])
self.api_base_url = self.config['api_base_url']
self.api_key = self.config['api_key']
logger.info("Compliance Report Generator initialized")
def extract_database_data(self, query):
"""Extract data from database using SQL query."""
try:
logger.info(f"Extracting data from database")
df = pd.read_sql(query, self.db_engine)
logger.info(f"Successfully extracted {len(df)} records from database")
return df
except Exception as e:
logger.error(f"Error extracting data from database: {str(e)}")
raise
def extract_api_data(self, endpoint, params=None):
"""Extract data from REST API."""
try:
logger.info(f"Extracting data from API endpoint: {endpoint}")
headers = {'Authorization': f'Bearer {self.api_key}'}
response = requests.get(f"{self.api_base_url}/{endpoint}",
headers=headers,
params=params)
response.raise_for_status()
data = response.json()
df = pd.json_normalize(data['results'])
logger.info(f"Successfully extracted {len(df)} records from API")
return df
except Exception as e:
logger.error(f"Error extracting data from API: {str(e)}")
raise
def transform_data(self, dfs):
"""Transform and merge data from different sources."""
try:
logger.info("Transforming and merging data")
# Standardize column names
for i, df in enumerate(dfs):
dfs[i] = self.standardize_columns(df)
# Merge dataframes
merged_df = pd.concat(dfs, ignore_index=True)
# Clean data
merged_df = self.clean_data(merged_df)
# Add calculated fields
merged_df = self.add_calculated_fields(merged_df)
logger.info(f"Data transformation complete. Final dataset has {len(merged_df)} records")
return merged_df
except Exception as e:
logger.error(f"Error during data transformation: {str(e)}")
raise
def standardize_columns(self, df):
"""Standardize column names across different data sources."""
column_mapping = self.config['column_mapping']
df = df.rename(columns=column_mapping)
return df
def clean_data(self, df):
"""Clean and validate the data."""
# Remove duplicates
df = df.drop_duplicates()
# Handle missing values
for col in self.config['required_columns']:
missing_count = df[col].isna().sum()
if missing_count > 0:
logger.warning(f"Found {missing_count} missing values in required column {col}")
# Apply data type conversions
for col, dtype in self.config['column_dtypes'].items():
if col in df.columns:
try:
df[col] = df[col].astype(dtype)
except:
logger.warning(f"Could not convert column {col} to {dtype}")
return df
def add_calculated_fields(self, df):
"""Add calculated fields required for compliance reporting."""
# Add timestamp
df['report_timestamp'] = datetime.now()
# Add risk score based on configuration
if 'risk_score_factors' in self.config:
df['risk_score'] = 0
for factor in self.config['risk_score_factors']:
col = factor['column']
weight = factor['weight']
if col in df.columns:
df['risk_score'] += df[col] * weight
return df
def generate_report(self, output_format='excel'):
"""Generate the compliance report."""
try:
logger.info(f"Generating compliance report in {output_format} format")
# Extract data from all configured sources
dfs = []
# Extract from databases
for query_config in self.config['database_queries']:
df = self.extract_database_data(query_config['query'])
dfs.append(df)
# Extract from APIs
for api_config in self.config['api_endpoints']:
df = self.extract_api_data(api_config['endpoint'], api_config.get('params'))
dfs.append(df)
# Transform and merge data
final_df = self.transform_data(dfs)
# Generate report
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f"compliance_report_{timestamp}"
if output_format == 'excel':
output_file = f"{filename}.xlsx"
final_df.to_excel(output_file, index=False)
elif output_format == 'csv':
output_file = f"{filename}.csv"
final_df.to_csv(output_file, index=False)
elif output_format == 'json':
output_file = f"{filename}.json"
final_df.to_json(output_file, orient='records')
else:
raise ValueError(f"Unsupported output format: {output_format}")
logger.info(f"Compliance report generated successfully: {output_file}")
return output_file
except Exception as e:
logger.error(f"Error generating compliance report: {str(e)}")
raise
# Example usage
if __name__ == "__main__":
generator = ComplianceReportGenerator('config.json')
report_file = generator.generate_report(output_format='excel')
print(f"Report generated: {report_file}")
The solution uses a configuration file to define data sources, mappings, and report settings:
{
"database_connection_string": "postgresql://user:password@localhost:5432/compliance_db",
"api_base_url": "https://api.example.com/v1",
"api_key": "API_KEY_HERE",
"column_mapping": {
"user_id": "user_identifier",
"access_level": "permission_level",
"last_login": "last_access_date",
"system_name": "application_name"
},
"required_columns": [
"user_identifier",
"permission_level",
"application_name"
],
"column_dtypes": {
"user_identifier": "str",
"permission_level": "str",
"last_access_date": "datetime64[ns]"
},
"database_queries": [
{
"name": "active_directory_access",
"query": "SELECT user_id, access_level, last_login FROM ad_access WHERE last_login > current_date - interval '90 days'"
},
{
"name": "erp_access",
"query": "SELECT user_id, role_name as access_level, last_login_date as last_login, 'ERP' as system_name FROM erp_users"
}
],
"api_endpoints": [
{
"name": "cloud_access",
"endpoint": "users/access",
"params": {
"days": 90
}
}
],
"risk_score_factors": [
{
"column": "inactive_days",
"weight": 0.5
},
{
"column": "privilege_level",
"weight": 1.2
}
]
}
The Compliance Reporting Automation solution delivered significant benefits to the organization, transforming a manual, time-consuming process into an efficient, reliable system.
Reduction in reporting time
Elimination of manual data collection
Decrease in reporting errors
"This automation solution has transformed our compliance reporting process. What used to take weeks of manual effort is now completed in hours with greater accuracy and consistency. It's been a game-changer for our team." — Compliance Manager
This project provided valuable insights into effective compliance automation and data integration:
Potential enhancements for future iterations of the solution include:
I'd love to discuss how my experience with compliance automation could benefit your organization.
Get In Touch