Security & Privacy

📖 10 min read 📄 Part 9 of 10

Payment Service - Security and Privacy

PCI DSS Level 1 Compliance

The 12 PCI DSS Requirements

Requirement 1: Install and maintain a firewall configuration

# Network security configuration
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: payment-processing-isolation
spec:
  podSelector:
    matchLabels:
      app: payment-processor
      pci-scope: "true"
  policyTypes:
  - Ingress
  - Egress
  ingress:
  - from:
    - podSelector:
        matchLabels:
          app: api-gateway
          pci-authorized: "true"
    ports:
    - protocol: TCP
      port: 8080
  egress:
  - to:
    - podSelector:
        matchLabels:
          app: payment-gateway
    ports:
    - protocol: TCP
      port: 443
  - to: []  # DNS only
    ports:
    - protocol: UDP
      port: 53

Requirement 3: Protect stored cardholder data

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
import base64
import os

class PCICompliantEncryption:
    def __init__(self, master_key: str):
        self.master_key = master_key.encode()
        
    def derive_encryption_key(self, context: str, salt: bytes = None) -> bytes:
        """Derive encryption key using PBKDF2"""
        if salt is None:
            salt = os.urandom(16)
            
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt + context.encode(),
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(self.master_key))
        return key
    
    def encrypt_cardholder_data(self, card_data: dict) -> dict:
        """Encrypt sensitive cardholder data"""
        
        # Never store full PAN - use tokenization instead
        if 'number' in card_data:
            # Store only last 4 digits
            last_four = card_data['number'][-4:]
            # Hash full number for fingerprinting
            card_hash = hashlib.sha256(card_data['number'].encode()).hexdigest()
            
            # Remove full PAN from data
            encrypted_data = {k: v for k, v in card_data.items() if k != 'number'}
            encrypted_data['last_four'] = last_four
            encrypted_data['fingerprint'] = card_hash[:16]
        else:
            encrypted_data = card_data.copy()
        
        # Encrypt remaining sensitive data
        key = self.derive_encryption_key('cardholder_data')
        cipher = Fernet(key)
        
        sensitive_fields = ['exp_month', 'exp_year', 'cardholder_name']
        for field in sensitive_fields:
            if field in encrypted_data:
                encrypted_value = cipher.encrypt(str(encrypted_data[field]).encode())
                encrypted_data[field] = base64.urlsafe_b64encode(encrypted_value).decode()
        
        return encrypted_data

class CardDataTokenizer:
    def __init__(self, vault_service):
        self.vault = vault_service
        self.token_prefix = "tok_"
        
    async def tokenize_card(self, card_data: dict, merchant_id: str) -> str:
        """Replace card data with secure token"""
        
        # Generate cryptographically secure token
        token = self.token_prefix + secrets.token_urlsafe(32)
        
        # Store encrypted card data in separate, secured vault
        await self.vault.store_encrypted_card_data(
            token=token,
            card_data=card_data,
            merchant_id=merchant_id,
            encryption_context={'purpose': 'payment_processing'}
        )
        
        # Return token - no sensitive data leaves this function
        return token
    
    async def process_payment_with_token(
        self, 
        token: str, 
        amount: int, 
        merchant_id: str
    ) -> dict:
        """Process payment using token without exposing card data"""
        
        # Retrieve card data from vault (restricted access)
        card_data = await self.vault.retrieve_card_data(
            token=token,
            merchant_id=merchant_id,
            purpose='payment_processing'
        )
        
        # Process payment with gateway
        payment_result = await self.payment_gateway.charge(
            card_data=card_data,
            amount=amount,
            merchant_id=merchant_id
        )
        
        # Clear card data from memory immediately
        del card_data
        
        return {
            'success': payment_result.success,
            'transaction_id': payment_result.transaction_id,
            'status': payment_result.status
            # No card data in response
        }

Requirement 4: Encrypt transmission of cardholder data

import ssl
import asyncio
from aiohttp import ClientSession, TCPConnector

class SecurePaymentGatewayClient:
    def __init__(self):
        # Configure TLS 1.3 with strong cipher suites
        self.ssl_context = ssl.create_default_context()
        self.ssl_context.minimum_version = ssl.TLSVersion.TLSv1_2
        self.ssl_context.set_ciphers('ECDHE+AESGCM:ECDHE+CHACHA20:DHE+AESGCM:DHE+CHACHA20:!aNULL:!MD5:!DSS')
        
        # Certificate pinning for payment gateways
        self.ssl_context.check_hostname = True
        self.ssl_context.verify_mode = ssl.CERT_REQUIRED
        
    async def send_payment_request(self, gateway_url: str, payment_data: dict) -> dict:
        """Send payment request with end-to-end encryption"""
        
        connector = TCPConnector(ssl=self.ssl_context)
        
        async with ClientSession(connector=connector) as session:
            # Add additional encryption layer for sensitive data
            encrypted_payload = await self.encrypt_payment_payload(payment_data)
            
            async with session.post(
                gateway_url,
                json=encrypted_payload,
                headers={
                    'Content-Type': 'application/json',
                    'X-Encryption-Version': '1.0',
                    'User-Agent': 'PaymentService/1.0'
                },
                timeout=30
            ) as response:
                
                if response.status == 200:
                    encrypted_response = await response.json()
                    return await self.decrypt_payment_response(encrypted_response)
                else:
                    raise PaymentGatewayException(f"Gateway error: {response.status}")
    
    async def encrypt_payment_payload(self, payment_data: dict) -> dict:
        """Add application-level encryption on top of TLS"""
        
        # Use gateway's public key for encryption
        gateway_public_key = await self.get_gateway_public_key()
        
        # Encrypt sensitive fields
        sensitive_data = {
            'card_number': payment_data.get('card_number'),
            'cvv': payment_data.get('cvv'),
            'exp_date': payment_data.get('exp_date')
        }
        
        encrypted_sensitive = self.rsa_encrypt(
            json.dumps(sensitive_data),
            gateway_public_key
        )
        
        # Return payload with encrypted sensitive data
        return {
            'amount': payment_data['amount'],
            'currency': payment_data['currency'],
            'merchant_id': payment_data['merchant_id'],
            'encrypted_card_data': encrypted_sensitive,
            'timestamp': int(time.time())
        }

Advanced Fraud Detection

Machine Learning Fraud Models

import numpy as np
from sklearn.ensemble import IsolationForest, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
import joblib

class AdvancedFraudDetection:
    def __init__(self):
        # Load pre-trained models
        self.isolation_forest = joblib.load('models/anomaly_detection_model.pkl')
        self.random_forest = joblib.load('models/fraud_classification_model.pkl')
        self.feature_scaler = joblib.load('models/feature_scaler.pkl')
        
        # Risk thresholds
        self.risk_thresholds = {
            'low': 0.2,
            'medium': 0.5,
            'high': 0.8,
            'critical': 0.95
        }
        
    async def comprehensive_fraud_analysis(
        self, 
        transaction: dict,
        merchant_context: dict,
        customer_history: dict
    ) -> dict:
        """Multi-layered fraud detection analysis"""
        
        # Extract comprehensive feature set
        features = await self.extract_fraud_features(
            transaction, merchant_context, customer_history
        )
        
        # Normalize features
        normalized_features = self.feature_scaler.transform([features])
        
        # Anomaly detection (unsupervised)
        anomaly_score = self.isolation_forest.decision_function(normalized_features)[0]
        is_anomaly = self.isolation_forest.predict(normalized_features)[0] == -1
        
        # Fraud classification (supervised)
        fraud_probability = self.random_forest.predict_proba(normalized_features)[0][1]
        
        # Velocity checks
        velocity_violations = await self.check_velocity_patterns(transaction)
        
        # Device fingerprinting
        device_risk = await self.analyze_device_fingerprint(transaction.get('device_info'))
        
        # Geolocation analysis
        geo_risk = await self.analyze_geolocation_risk(
            transaction.get('ip_address'),
            customer_history.get('usual_locations', [])
        )
        
        # Combine all risk signals
        combined_risk_score = self.calculate_combined_risk_score(
            fraud_probability=fraud_probability,
            anomaly_score=anomaly_score,
            velocity_violations=velocity_violations,
            device_risk=device_risk,
            geo_risk=geo_risk
        )
        
        return {
            'risk_score': combined_risk_score,
            'risk_level': self.get_risk_level(combined_risk_score),
            'fraud_probability': fraud_probability,
            'is_anomaly': is_anomaly,
            'risk_factors': {
                'velocity_violations': velocity_violations,
                'device_risk': device_risk,
                'geo_risk': geo_risk,
                'anomaly_detected': is_anomaly
            },
            'recommendation': self.get_fraud_recommendation(combined_risk_score),
            'required_actions': self.get_required_actions(combined_risk_score)
        }
    
    async def extract_fraud_features(
        self, 
        transaction: dict,
        merchant_context: dict,
        customer_history: dict
    ) -> list:
        """Extract comprehensive feature set for fraud detection"""
        
        features = []
        
        # Transaction features
        features.extend([
            float(transaction['amount']),
            len(transaction.get('description', '')),
            transaction.get('hour_of_day', 0),
            transaction.get('day_of_week', 0),
            1 if transaction.get('is_weekend') else 0,
            1 if transaction.get('is_holiday') else 0
        ])
        
        # Customer behavioral features
        if customer_history:
            features.extend([
                customer_history.get('avg_transaction_amount', 0),
                customer_history.get('transaction_count', 0),
                customer_history.get('days_since_first_transaction', 0),
                customer_history.get('failed_payment_rate', 0),
                customer_history.get('chargeback_rate', 0),
                len(customer_history.get('used_payment_methods', [])),
                len(customer_history.get('shipping_addresses', []))
            ])
        else:
            features.extend([0] * 7)  # New customer
        
        # Merchant risk features
        features.extend([
            merchant_context.get('avg_transaction_amount', 0),
            merchant_context.get('chargeback_rate', 0),
            merchant_context.get('refund_rate', 0),
            merchant_context.get('days_since_registration', 0),
            merchant_context.get('risk_score', 0.5)
        ])
        
        # Payment method features
        payment_method = transaction.get('payment_method', {})
        features.extend([
            1 if payment_method.get('type') == 'credit' else 0,
            1 if payment_method.get('type') == 'debit' else 0,
            1 if payment_method.get('is_prepaid') else 0,
            payment_method.get('issuer_risk_score', 0.5),
            1 if payment_method.get('country') != merchant_context.get('country') else 0
        ])
        
        return features
    
    async def check_velocity_patterns(self, transaction: dict) -> list:
        """Check for suspicious velocity patterns"""
        
        violations = []
        customer_id = transaction.get('customer_id')
        
        if not customer_id:
            return violations
        
        # Check transaction frequency
        recent_transactions = await self.get_recent_transactions(
            customer_id, 
            hours=1
        )
        
        if len(recent_transactions) > 5:
            violations.append('high_frequency_transactions')
        
        # Check amount velocity
        total_amount_1h = sum(t['amount'] for t in recent_transactions)
        if total_amount_1h > 10000:  # $100 in 1 hour
            violations.append('high_amount_velocity')
        
        # Check failed attempts
        failed_attempts = [t for t in recent_transactions if t['status'] == 'failed']
        if len(failed_attempts) > 3:
            violations.append('multiple_failed_attempts')
        
        # Check geographic velocity
        locations = [t.get('location') for t in recent_transactions if t.get('location')]
        if len(set(locations)) > 2:
            violations.append('impossible_travel')
        
        return violations
    
    def calculate_combined_risk_score(
        self,
        fraud_probability: float,
        anomaly_score: float,
        velocity_violations: list,
        device_risk: dict,
        geo_risk: dict
    ) -> float:
        """Combine multiple risk signals into final score"""
        
        # Base score from ML models (60% weight)
        base_score = fraud_probability * 0.6
        
        # Anomaly detection (20% weight)
        anomaly_contribution = (1 - (anomaly_score + 1) / 2) * 0.2
        
        # Velocity violations (10% weight)
        velocity_score = min(len(velocity_violations) * 0.25, 1.0) * 0.1
        
        # Device risk (5% weight)
        device_score = device_risk.get('risk_score', 0) * 0.05
        
        # Geographic risk (5% weight)
        geo_score = geo_risk.get('risk_score', 0) * 0.05
        
        combined_score = (
            base_score + 
            anomaly_contribution + 
            velocity_score + 
            device_score + 
            geo_score
        )
        
        return min(combined_score, 1.0)

Real-time Risk Scoring

import asyncio
from dataclasses import dataclass
from typing import Dict, List

@dataclass
class RiskSignal:
    signal_type: str
    risk_score: float
    confidence: float
    details: dict

class RealTimeFraudScoring:
    def __init__(self):
        self.risk_engines = {
            'ml_model': MLFraudEngine(),
            'rule_engine': RuleBasedEngine(),
            'velocity_engine': VelocityEngine(),
            'device_engine': DeviceFingerprintEngine(),
            'network_engine': NetworkAnalysisEngine()
        }
        
    async def score_transaction_realtime(
        self, 
        transaction: dict,
        timeout_ms: int = 100
    ) -> dict:
        """Score transaction with sub-100ms latency requirement"""
        
        start_time = asyncio.get_event_loop().time()
        
        # Run all risk engines in parallel
        tasks = []
        for engine_name, engine in self.risk_engines.items():
            task = asyncio.create_task(
                self.run_risk_engine_with_timeout(
                    engine, transaction, timeout_ms // len(self.risk_engines)
                )
            )
            tasks.append((engine_name, task))
        
        # Collect results as they complete
        risk_signals = []
        for engine_name, task in tasks:
            try:
                signal = await task
                if signal:
                    risk_signals.append(signal)
            except asyncio.TimeoutError:
                # Log timeout but continue with other signals
                logger.warning(f"Risk engine {engine_name} timed out")
                continue
        
        # Calculate final risk score
        final_score = self.aggregate_risk_signals(risk_signals)
        
        processing_time = (asyncio.get_event_loop().time() - start_time) * 1000
        
        return {
            'risk_score': final_score['score'],
            'risk_level': final_score['level'],
            'processing_time_ms': processing_time,
            'signals_processed': len(risk_signals),
            'recommendation': final_score['recommendation']
        }
    
    async def run_risk_engine_with_timeout(
        self, 
        engine, 
        transaction: dict, 
        timeout_ms: int
    ) -> RiskSignal:
        """Run individual risk engine with timeout"""
        
        try:
            return await asyncio.wait_for(
                engine.analyze(transaction),
                timeout=timeout_ms / 1000
            )
        except asyncio.TimeoutError:
            return None
    
    def aggregate_risk_signals(self, signals: List[RiskSignal]) -> dict:
        """Aggregate multiple risk signals into final score"""
        
        if not signals:
            return {'score': 0.0, 'level': 'low', 'recommendation': 'approve'}
        
        # Weighted average based on confidence
        total_weight = sum(signal.confidence for signal in signals)
        weighted_score = sum(
            signal.risk_score * signal.confidence 
            for signal in signals
        ) / total_weight if total_weight > 0 else 0
        
        # Determine risk level and recommendation
        if weighted_score >= 0.8:
            level = 'critical'
            recommendation = 'block'
        elif weighted_score >= 0.6:
            level = 'high'
            recommendation = 'review'
        elif weighted_score >= 0.3:
            level = 'medium'
            recommendation = 'monitor'
        else:
            level = 'low'
            recommendation = 'approve'
        
        return {
            'score': weighted_score,
            'level': level,
            'recommendation': recommendation,
            'signal_count': len(signals)
        }

Data Privacy and GDPR Compliance

Personal Data Management

from enum import Enum
from dataclasses import dataclass
from typing import List, Optional

class DataCategory(Enum):
    PAYMENT_DATA = "payment_data"
    PERSONAL_INFO = "personal_info"
    BEHAVIORAL_DATA = "behavioral_data"
    TECHNICAL_DATA = "technical_data"

@dataclass
class DataRetentionPolicy:
    category: DataCategory
    retention_days: int
    legal_basis: str
    can_be_deleted: bool

class GDPRComplianceManager:
    def __init__(self):
        self.retention_policies = {
            DataCategory.PAYMENT_DATA: DataRetentionPolicy(
                category=DataCategory.PAYMENT_DATA,
                retention_days=2555,  # 7 years for financial records
                legal_basis="legal_obligation",
                can_be_deleted=False
            ),
            DataCategory.PERSONAL_INFO: DataRetentionPolicy(
                category=DataCategory.PERSONAL_INFO,
                retention_days=1095,  # 3 years
                legal_basis="legitimate_interest",
                can_be_deleted=True
            ),
            DataCategory.BEHAVIORAL_DATA: DataRetentionPolicy(
                category=DataCategory.BEHAVIORAL_DATA,
                retention_days=730,  # 2 years
                legal_basis="consent",
                can_be_deleted=True
            )
        }
    
    async def handle_data_subject_request(
        self,
        request_type: str,
        customer_id: str,
        merchant_id: str
    ) -> dict:
        """Handle GDPR data subject rights requests"""
        
        if request_type == "access":
            return await self.export_customer_data(customer_id, merchant_id)
        elif request_type == "deletion":
            return await self.delete_customer_data(customer_id, merchant_id)
        elif request_type == "portability":
            return await self.export_portable_data(customer_id, merchant_id)
        elif request_type == "rectification":
            return await self.update_customer_data(customer_id, merchant_id)
        else:
            raise ValueError(f"Unsupported request type: {request_type}")
    
    async def export_customer_data(self, customer_id: str, merchant_id: str) -> dict:
        """Export all customer data for access request"""
        
        customer_data = {}
        
        # Personal information
        personal_info = await self.get_customer_personal_info(customer_id, merchant_id)
        customer_data['personal_information'] = personal_info
        
        # Payment methods (tokenized data only)
        payment_methods = await self.get_customer_payment_methods(customer_id, merchant_id)
        customer_data['payment_methods'] = [
            {
                'id': pm['token'],
                'type': pm['type'],
                'last_four': pm['last_four'],
                'brand': pm['brand'],
                'created_at': pm['created_at']
            }
            for pm in payment_methods
        ]
        
        # Transaction history
        transactions = await self.get_customer_transactions(customer_id, merchant_id)
        customer_data['transactions'] = [
            {
                'id': tx['id'],
                'amount': tx['amount'],
                'currency': tx['currency'],
                'status': tx['status'],
                'created_at': tx['created_at'],
                'description': tx['description']
            }
            for tx in transactions
        ]
        
        # Behavioral data (if consent given)
        if await self.has_behavioral_data_consent(customer_id):
            behavioral_data = await self.get_customer_behavioral_data(customer_id)
            customer_data['behavioral_data'] = behavioral_data
        
        # Create secure download link
        download_url = await self.create_secure_data_export(customer_data, customer_id)
        
        return {
            'status': 'completed',
            'download_url': download_url,
            'expires_at': datetime.utcnow() + timedelta(days=30),
            'data_categories': list(customer_data.keys())
        }
    
    async def delete_customer_data(self, customer_id: str, merchant_id: str) -> dict:
        """Handle right to be forgotten request"""
        
        deletion_summary = {
            'deleted_categories': [],
            'retained_categories': [],
            'deletion_date': datetime.utcnow()
        }
        
        # Check what can be deleted based on retention policies
        for category, policy in self.retention_policies.items():
            if policy.can_be_deleted:
                await self.delete_data_category(customer_id, merchant_id, category)
                deletion_summary['deleted_categories'].append(category.value)
            else:
                # Anonymize instead of delete for legal obligations
                await self.anonymize_data_category(customer_id, merchant_id, category)
                deletion_summary['retained_categories'].append({
                    'category': category.value,
                    'reason': policy.legal_basis,
                    'retention_until': datetime.utcnow() + timedelta(days=policy.retention_days)
                })
        
        return {
            'status': 'completed',
            'summary': deletion_summary
        }
    
    async def anonymize_data_category(
        self, 
        customer_id: str, 
        merchant_id: str, 
        category: DataCategory
    ):
        """Anonymize data that cannot be deleted due to legal obligations"""
        
        if category == DataCategory.PAYMENT_DATA:
            # Anonymize transaction records while preserving financial data
            await self.database.execute("""
                UPDATE transactions 
                SET 
                    customer_id = NULL,
                    metadata = jsonb_build_object(
                        'anonymized', true,
                        'anonymized_at', NOW()
                    )
                WHERE customer_id = $1 AND merchant_id = $2
            """, customer_id, merchant_id)
            
            # Anonymize payment methods
            await self.database.execute("""
                UPDATE payment_methods_vault
                SET 
                    customer_id = NULL,
                    encrypted_data = 'ANONYMIZED'::bytea
                WHERE customer_id = $1 AND merchant_id = $2
            """, customer_id, merchant_id)

Reading Time: ~20 minutes