Security & Privacy

📖 11 min read 📄 Part 9 of 10

Security and Privacy - Live Comment System

Authentication and Authorization

Multi-Factor Authentication System

JWT-Based Authentication with MFA:

import jwt
import pyotp
import qrcode
from cryptography.fernet import Fernet
from datetime import datetime, timedelta

class SecureAuthenticationSystem:
    def __init__(self, secret_key, encryption_key):
        self.secret_key = secret_key
        self.cipher_suite = Fernet(encryption_key)
        self.token_expiry = timedelta(hours=1)
        self.refresh_token_expiry = timedelta(days=30)
        
    async def authenticate_user(self, username, password, mfa_token=None):
        """
        Comprehensive user authentication with MFA support
        """
        # Step 1: Validate username and password
        user = await self.validate_credentials(username, password)
        if not user:
            await self.log_failed_attempt(username, 'invalid_credentials')
            raise AuthenticationError("Invalid credentials")
        
        # Step 2: Check account status and security flags
        security_check = await self.perform_security_checks(user)
        if not security_check.is_valid:
            raise SecurityError(security_check.reason)
        
        # Step 3: MFA verification if enabled
        if user.mfa_enabled:
            if not mfa_token:
                return {
                    'status': 'mfa_required',
                    'mfa_methods': user.mfa_methods,
                    'temp_token': self.generate_temp_token(user.user_id)
                }
            
            mfa_valid = await self.verify_mfa_token(user.user_id, mfa_token)
            if not mfa_valid:
                await self.log_failed_attempt(username, 'invalid_mfa')
                raise AuthenticationError("Invalid MFA token")
        
        # Step 4: Generate secure tokens
        access_token = self.generate_access_token(user)
        refresh_token = self.generate_refresh_token(user)
        
        # Step 5: Update user session and security logs
        await self.create_user_session(user.user_id, access_token)
        await self.log_successful_login(user.user_id)
        
        return {
            'status': 'authenticated',
            'access_token': access_token,
            'refresh_token': refresh_token,
            'expires_in': int(self.token_expiry.total_seconds()),
            'user_profile': self.sanitize_user_profile(user)
        }
    
    async def setup_mfa(self, user_id, mfa_method='totp'):
        """
        Set up multi-factor authentication for user
        """
        if mfa_method == 'totp':
            # Generate TOTP secret
            secret = pyotp.random_base32()
            
            # Create provisioning URI for QR code
            totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
                name=f"user_{user_id}",
                issuer_name="LiveComments"
            )
            
            # Generate QR code
            qr = qrcode.QRCode(version=1, box_size=10, border=5)
            qr.add_data(totp_uri)
            qr.make(fit=True)
            
            # Store encrypted secret
            encrypted_secret = self.cipher_suite.encrypt(secret.encode())
            await self.store_mfa_secret(user_id, encrypted_secret, 'totp')
            
            return {
                'secret': secret,
                'qr_code_uri': totp_uri,
                'backup_codes': await self.generate_backup_codes(user_id)
            }
    
    def generate_access_token(self, user):
        """
        Generate secure JWT access token with claims
        """
        now = datetime.utcnow()
        payload = {
            'sub': str(user.user_id),
            'username': user.username,
            'email': user.email,
            'roles': user.roles,
            'permissions': user.permissions,
            'iat': now,
            'exp': now + self.token_expiry,
            'jti': str(uuid.uuid4()),  # JWT ID for token tracking
            'iss': 'live-comments-api',
            'aud': 'live-comments-client'
        }
        
        return jwt.encode(payload, self.secret_key, algorithm='HS256')
    
    async def verify_token(self, token):
        """
        Verify and decode JWT token with security checks
        """
        try:
            # Decode token
            payload = jwt.decode(
                token, 
                self.secret_key, 
                algorithms=['HS256'],
                audience='live-comments-client',
                issuer='live-comments-api'
            )
            
            # Check if token is blacklisted
            if await self.is_token_blacklisted(payload['jti']):
                raise TokenError("Token has been revoked")
            
            # Verify user still exists and is active
            user = await self.get_user_by_id(payload['sub'])
            if not user or user.status != 'active':
                raise TokenError("User account is inactive")
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise TokenError("Token has expired")
        except jwt.InvalidTokenError:
            raise TokenError("Invalid token")

# Role-based access control
class RoleBasedAccessControl:
    def __init__(self):
        self.permissions = {
            'user': [
                'comment:create',
                'comment:read',
                'comment:update_own',
                'comment:delete_own',
                'reaction:create',
                'reaction:read'
            ],
            'moderator': [
                'comment:moderate',
                'comment:delete_any',
                'user:warn',
                'user:timeout',
                'event:moderate'
            ],
            'admin': [
                'user:manage',
                'event:create',
                'event:manage',
                'system:configure',
                'analytics:view'
            ]
        }
    
    def check_permission(self, user_roles, required_permission):
        """
        Check if user has required permission
        """
        user_permissions = set()
        for role in user_roles:
            user_permissions.update(self.permissions.get(role, []))
        
        return required_permission in user_permissions
    
    def require_permission(self, permission):
        """
        Decorator to enforce permission requirements
        """
        def decorator(func):
            async def wrapper(*args, **kwargs):
                # Extract user from request context
                user = kwargs.get('current_user')
                if not user:
                    raise AuthorizationError("Authentication required")
                
                if not self.check_permission(user.roles, permission):
                    raise AuthorizationError(f"Permission {permission} required")
                
                return await func(*args, **kwargs)
            return wrapper
        return decorator

Data Encryption and Privacy

End-to-End Encryption for Sensitive Comments

Message Encryption System:

from cryptography.hazmat.primitives.asymmetric import rsa, padding
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import os

class EndToEndEncryption:
    def __init__(self):
        self.key_size = 2048
        self.symmetric_key_size = 32  # 256 bits for AES
        
    async def generate_user_keypair(self, user_id):
        """
        Generate RSA keypair for user
        """
        # Generate private key
        private_key = rsa.generate_private_key(
            public_exponent=65537,
            key_size=self.key_size
        )
        
        # Get public key
        public_key = private_key.public_key()
        
        # Serialize keys
        private_pem = private_key.private_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption()
        )
        
        public_pem = public_key.public_bytes(
            encoding=serialization.Encoding.PEM,
            format=serialization.PublicFormat.SubjectPublicKeyInfo
        )
        
        # Store public key in database, return private key to user
        await self.store_public_key(user_id, public_pem)
        
        return {
            'private_key': private_pem.decode('utf-8'),
            'public_key': public_pem.decode('utf-8'),
            'key_id': str(uuid.uuid4())
        }
    
    async def encrypt_comment(self, comment_content, recipient_public_keys):
        """
        Encrypt comment for multiple recipients
        """
        # Generate random symmetric key for this message
        symmetric_key = os.urandom(self.symmetric_key_size)
        iv = os.urandom(16)  # 128-bit IV for AES
        
        # Encrypt comment content with AES
        cipher = Cipher(
            algorithms.AES(symmetric_key),
            modes.CBC(iv)
        )
        encryptor = cipher.encryptor()
        
        # Pad content to block size
        padded_content = self.pad_content(comment_content.encode('utf-8'))
        encrypted_content = encryptor.update(padded_content) + encryptor.finalize()
        
        # Encrypt symmetric key for each recipient
        encrypted_keys = {}
        for user_id, public_key_pem in recipient_public_keys.items():
            public_key = serialization.load_pem_public_key(public_key_pem.encode())
            
            encrypted_key = public_key.encrypt(
                symmetric_key,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            encrypted_keys[user_id] = encrypted_key.hex()
        
        return {
            'encrypted_content': encrypted_content.hex(),
            'iv': iv.hex(),
            'encrypted_keys': encrypted_keys,
            'encryption_algorithm': 'AES-256-CBC',
            'key_algorithm': 'RSA-2048-OAEP'
        }
    
    async def decrypt_comment(self, encrypted_data, user_private_key):
        """
        Decrypt comment using user's private key
        """
        try:
            # Load private key
            private_key = serialization.load_pem_private_key(
                user_private_key.encode(),
                password=None
            )
            
            # Decrypt symmetric key
            encrypted_symmetric_key = bytes.fromhex(
                encrypted_data['encrypted_keys'][user_id]
            )
            
            symmetric_key = private_key.decrypt(
                encrypted_symmetric_key,
                padding.OAEP(
                    mgf=padding.MGF1(algorithm=hashes.SHA256()),
                    algorithm=hashes.SHA256(),
                    label=None
                )
            )
            
            # Decrypt content
            iv = bytes.fromhex(encrypted_data['iv'])
            encrypted_content = bytes.fromhex(encrypted_data['encrypted_content'])
            
            cipher = Cipher(
                algorithms.AES(symmetric_key),
                modes.CBC(iv)
            )
            decryptor = cipher.decryptor()
            
            padded_content = decryptor.update(encrypted_content) + decryptor.finalize()
            content = self.unpad_content(padded_content)
            
            return content.decode('utf-8')
            
        except Exception as e:
            raise DecryptionError(f"Failed to decrypt comment: {str(e)}")

# Database encryption for sensitive data
class DatabaseEncryption:
    def __init__(self, encryption_key):
        self.cipher_suite = Fernet(encryption_key)
        
    def encrypt_sensitive_field(self, data):
        """
        Encrypt sensitive database fields
        """
        if data is None:
            return None
        
        return self.cipher_suite.encrypt(str(data).encode()).decode()
    
    def decrypt_sensitive_field(self, encrypted_data):
        """
        Decrypt sensitive database fields
        """
        if encrypted_data is None:
            return None
        
        return self.cipher_suite.decrypt(encrypted_data.encode()).decode()

# Encrypted database schema
encrypted_schema = """
-- Users table with encrypted PII
CREATE TABLE users (
    user_id UUID PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    email_encrypted TEXT NOT NULL,  -- Encrypted email
    display_name VARCHAR(100) NOT NULL,
    phone_encrypted TEXT,           -- Encrypted phone number
    created_at TIMESTAMP DEFAULT NOW()
);

-- Comments with optional encryption
CREATE TABLE comments (
    comment_id UUID PRIMARY KEY,
    event_id UUID NOT NULL,
    user_id UUID NOT NULL,
    content_encrypted TEXT,         -- Encrypted content for private comments
    content_hash VARCHAR(64),       -- Hash for duplicate detection
    encryption_metadata JSONB,     -- Encryption algorithm and key info
    is_encrypted BOOLEAN DEFAULT FALSE,
    created_at TIMESTAMP DEFAULT NOW()
);
"""

Privacy Compliance and Data Protection

GDPR and Privacy Regulation Compliance

Data Privacy Management System:

class PrivacyComplianceSystem:
    def __init__(self, database, audit_logger):
        self.db = database
        self.audit = audit_logger
        
    async def handle_data_subject_request(self, request_type, user_id, requester_info):
        """
        Handle GDPR data subject requests (access, portability, erasure)
        """
        # Verify request authenticity
        verification_result = await self.verify_data_subject_identity(
            user_id, requester_info
        )
        if not verification_result.is_verified:
            raise PrivacyError("Identity verification failed")
        
        # Log the request for audit trail
        await self.audit.log_privacy_request(request_type, user_id, requester_info)
        
        if request_type == 'access':
            return await self.export_user_data(user_id)
        elif request_type == 'portability':
            return await self.export_portable_data(user_id)
        elif request_type == 'erasure':
            return await self.erase_user_data(user_id)
        else:
            raise PrivacyError(f"Unsupported request type: {request_type}")
    
    async def export_user_data(self, user_id):
        """
        Export all user data for GDPR access request
        """
        user_data = {
            'request_id': str(uuid.uuid4()),
            'user_id': user_id,
            'export_date': datetime.utcnow().isoformat(),
            'data_categories': {}
        }
        
        # Personal information
        user_profile = await self.db.get_user_profile(user_id)
        user_data['data_categories']['profile'] = {
            'username': user_profile.username,
            'email': user_profile.email,
            'display_name': user_profile.display_name,
            'created_at': user_profile.created_at.isoformat(),
            'last_login': user_profile.last_login.isoformat() if user_profile.last_login else None
        }
        
        # Comments and interactions
        comments = await self.db.get_user_comments(user_id)
        user_data['data_categories']['comments'] = [
            {
                'comment_id': comment.comment_id,
                'event_id': comment.event_id,
                'content': comment.content,
                'created_at': comment.created_at.isoformat(),
                'like_count': comment.like_count,
                'reply_count': comment.reply_count
            }
            for comment in comments
        ]
        
        # Reactions and likes
        reactions = await self.db.get_user_reactions(user_id)
        user_data['data_categories']['reactions'] = [
            {
                'comment_id': reaction.comment_id,
                'reaction_type': reaction.reaction_type,
                'created_at': reaction.created_at.isoformat()
            }
            for reaction in reactions
        ]
        
        # Session and activity logs
        activity_logs = await self.db.get_user_activity_logs(user_id)
        user_data['data_categories']['activity'] = [
            {
                'activity_type': log.activity_type,
                'timestamp': log.timestamp.isoformat(),
                'ip_address': self.anonymize_ip(log.ip_address),
                'user_agent': log.user_agent
            }
            for log in activity_logs
        ]
        
        return user_data
    
    async def erase_user_data(self, user_id):
        """
        Implement right to erasure (right to be forgotten)
        """
        erasure_plan = {
            'user_id': user_id,
            'erasure_id': str(uuid.uuid4()),
            'started_at': datetime.utcnow().isoformat(),
            'steps': []
        }
        
        try:
            # Step 1: Anonymize comments (preserve for community value)
            comments_anonymized = await self.anonymize_user_comments(user_id)
            erasure_plan['steps'].append({
                'step': 'anonymize_comments',
                'status': 'completed',
                'count': comments_anonymized
            })
            
            # Step 2: Delete personal data
            await self.db.delete_user_profile(user_id)
            erasure_plan['steps'].append({
                'step': 'delete_profile',
                'status': 'completed'
            })
            
            # Step 3: Delete authentication data
            await self.db.delete_user_auth_data(user_id)
            erasure_plan['steps'].append({
                'step': 'delete_auth_data',
                'status': 'completed'
            })
            
            # Step 4: Delete activity logs
            logs_deleted = await self.db.delete_user_activity_logs(user_id)
            erasure_plan['steps'].append({
                'step': 'delete_activity_logs',
                'status': 'completed',
                'count': logs_deleted
            })
            
            # Step 5: Remove from caches
            await self.clear_user_from_caches(user_id)
            erasure_plan['steps'].append({
                'step': 'clear_caches',
                'status': 'completed'
            })
            
            erasure_plan['completed_at'] = datetime.utcnow().isoformat()
            erasure_plan['status'] = 'completed'
            
        except Exception as e:
            erasure_plan['status'] = 'failed'
            erasure_plan['error'] = str(e)
            raise PrivacyError(f"Data erasure failed: {str(e)}")
        
        # Log erasure for compliance audit
        await self.audit.log_data_erasure(erasure_plan)
        
        return erasure_plan
    
    async def anonymize_user_comments(self, user_id):
        """
        Anonymize user comments while preserving community value
        """
        # Replace user info with anonymous placeholder
        anonymous_user_data = {
            'username': f'anonymous_user_{hash(user_id) % 10000}',
            'display_name': 'Anonymous User',
            'avatar_url': None
        }
        
        # Update comments to reference anonymous user
        comments_updated = await self.db.anonymize_comments(user_id, anonymous_user_data)
        
        return comments_updated

# Data retention and cleanup
class DataRetentionManager:
    def __init__(self, database):
        self.db = database
        self.retention_policies = {
            'comments': timedelta(days=365 * 5),      # 5 years
            'activity_logs': timedelta(days=90),       # 90 days
            'session_data': timedelta(days=30),        # 30 days
            'audit_logs': timedelta(days=365 * 7),     # 7 years (compliance)
            'deleted_user_data': timedelta(days=30)    # 30 days backup
        }
    
    async def cleanup_expired_data(self):
        """
        Automated cleanup of expired data based on retention policies
        """
        cleanup_results = {}
        
        for data_type, retention_period in self.retention_policies.items():
            cutoff_date = datetime.utcnow() - retention_period
            
            if data_type == 'comments':
                # Soft delete old comments
                deleted_count = await self.db.soft_delete_old_comments(cutoff_date)
            elif data_type == 'activity_logs':
                deleted_count = await self.db.delete_old_activity_logs(cutoff_date)
            elif data_type == 'session_data':
                deleted_count = await self.db.delete_expired_sessions(cutoff_date)
            elif data_type == 'audit_logs':
                # Archive instead of delete for compliance
                archived_count = await self.db.archive_old_audit_logs(cutoff_date)
                deleted_count = archived_count
            
            cleanup_results[data_type] = {
                'cutoff_date': cutoff_date.isoformat(),
                'records_processed': deleted_count
            }
        
        return cleanup_results

Security Monitoring and Incident Response

Real-Time Security Monitoring

Security Event Detection System:

class SecurityMonitoringSystem:
    def __init__(self, redis_client, alert_manager):
        self.redis = redis_client
        self.alerts = alert_manager
        self.threat_patterns = self.load_threat_patterns()
        
    async def monitor_user_activity(self, user_id, activity_data):
        """
        Real-time monitoring of user activity for security threats
        """
        # Rate limiting detection
        await self.check_rate_limiting_violations(user_id, activity_data)
        
        # Suspicious behavior detection
        await self.detect_suspicious_behavior(user_id, activity_data)
        
        # Credential stuffing detection
        if activity_data['type'] == 'login_attempt':
            await self.detect_credential_stuffing(user_id, activity_data)
        
        # Content-based threat detection
        if activity_data['type'] == 'comment_submission':
            await self.detect_malicious_content(user_id, activity_data)
    
    async def check_rate_limiting_violations(self, user_id, activity_data):
        """
        Detect rate limiting violations and potential abuse
        """
        activity_type = activity_data['type']
        current_time = datetime.utcnow()
        
        # Define rate limits
        rate_limits = {
            'comment_submission': {'count': 60, 'window': 60},    # 60 per minute
            'login_attempt': {'count': 5, 'window': 300},         # 5 per 5 minutes
            'reaction': {'count': 100, 'window': 60}              # 100 per minute
        }
        
        if activity_type in rate_limits:
            limit_config = rate_limits[activity_type]
            window_key = f"rate_limit:{user_id}:{activity_type}:{current_time.minute}"
            
            # Increment counter
            current_count = await self.redis.incr(window_key)
            await self.redis.expire(window_key, limit_config['window'])
            
            # Check if limit exceeded
            if current_count > limit_config['count']:
                await self.handle_rate_limit_violation(user_id, activity_type, current_count)
    
    async def detect_suspicious_behavior(self, user_id, activity_data):
        """
        Detect patterns indicating suspicious or malicious behavior
        """
        suspicious_indicators = []
        
        # Multiple IP addresses in short time
        ip_addresses = await self.get_recent_user_ips(user_id, minutes=30)
        if len(ip_addresses) > 5:
            suspicious_indicators.append('multiple_ips')
        
        # Unusual activity hours
        current_hour = datetime.utcnow().hour
        user_typical_hours = await self.get_user_typical_activity_hours(user_id)
        if current_hour not in user_typical_hours:
            suspicious_indicators.append('unusual_hours')
        
        # Rapid sequential actions
        recent_actions = await self.get_recent_user_actions(user_id, seconds=10)
        if len(recent_actions) > 20:
            suspicious_indicators.append('rapid_actions')
        
        # Geographic anomaly
        current_location = await self.get_ip_geolocation(activity_data['ip_address'])
        typical_locations = await self.get_user_typical_locations(user_id)
        if self.is_geographic_anomaly(current_location, typical_locations):
            suspicious_indicators.append('geographic_anomaly')
        
        # If multiple indicators, flag for investigation
        if len(suspicious_indicators) >= 2:
            await self.flag_suspicious_activity(user_id, suspicious_indicators, activity_data)
    
    async def detect_malicious_content(self, user_id, activity_data):
        """
        Detect malicious content in comments
        """
        content = activity_data.get('content', '')
        
        # Check against known threat patterns
        for pattern_name, pattern_regex in self.threat_patterns.items():
            if re.search(pattern_regex, content, re.IGNORECASE):
                await self.handle_malicious_content_detection(
                    user_id, pattern_name, content, activity_data
                )
        
        # Check for suspicious URLs
        urls = self.extract_urls(content)
        for url in urls:
            threat_level = await self.check_url_reputation(url)
            if threat_level == 'malicious':
                await self.handle_malicious_url_detection(user_id, url, activity_data)
    
    def load_threat_patterns(self):
        """
        Load regex patterns for threat detection
        """
        return {
            'phishing_attempt': r'(click here|verify account|suspended|urgent action)',
            'spam_content': r'(buy now|limited time|act fast|guaranteed)',
            'malware_indicators': r'(download.*exe|install.*now|free.*software)',
            'social_engineering': r'(send me|give me|share your|tell me your)'
        }
    
    async def handle_security_incident(self, incident_type, user_id, details):
        """
        Handle detected security incidents
        """
        incident_id = str(uuid.uuid4())
        
        incident_data = {
            'incident_id': incident_id,
            'type': incident_type,
            'user_id': user_id,
            'severity': self.calculate_incident_severity(incident_type, details),
            'details': details,
            'detected_at': datetime.utcnow().isoformat(),
            'status': 'open'
        }
        
        # Store incident
        await self.db.create_security_incident(incident_data)
        
        # Automated response based on severity
        if incident_data['severity'] == 'critical':
            await self.execute_critical_incident_response(user_id, incident_data)
        elif incident_data['severity'] == 'high':
            await self.execute_high_severity_response(user_id, incident_data)
        
        # Alert security team
        await self.alerts.send_security_alert(incident_data)
        
        return incident_id

# Automated incident response
class IncidentResponseSystem:
    def __init__(self, user_manager, session_manager):
        self.user_manager = user_manager
        self.session_manager = session_manager
        
    async def execute_critical_incident_response(self, user_id, incident_data):
        """
        Automated response for critical security incidents
        """
        # Immediately suspend user account
        await self.user_manager.suspend_user_account(
            user_id, 
            reason="Critical security incident detected",
            incident_id=incident_data['incident_id']
        )
        
        # Terminate all active sessions
        await self.session_manager.terminate_all_user_sessions(user_id)
        
        # Block IP address temporarily
        ip_address = incident_data['details'].get('ip_address')
        if ip_address:
            await self.block_ip_address(ip_address, duration_hours=24)
        
        # Notify user via secure channel
        await self.send_security_notification(user_id, incident_data)
    
    async def execute_high_severity_response(self, user_id, incident_data):
        """
        Automated response for high severity incidents
        """
        # Require additional authentication
        await self.user_manager.require_additional_auth(user_id)
        
        # Limit user actions temporarily
        await self.user_manager.apply_temporary_restrictions(
            user_id,
            restrictions=['comment_creation', 'media_upload'],
            duration_hours=1
        )
        
        # Flag for manual review
        await self.flag_for_manual_review(user_id, incident_data)

This comprehensive security and privacy implementation ensures the live comment system meets enterprise-grade security requirements while maintaining compliance with privacy regulations like GDPR.