Security & Privacy

📖 7 min read 📄 Part 9 of 10

API Rate Limiter - Security and Privacy

Security Threats and Mitigations

1. Distributed Denial of Service (DDoS)

Threat: Attackers overwhelm system with massive request volume

Attack Patterns:

Layer 7 DDoS:
- 1M requests/second from 100K IPs
- Targets specific endpoints
- Mimics legitimate traffic
- Exhausts application resources

Layer 4 DDoS:
- SYN flood attacks
- UDP amplification
- TCP connection exhaustion
- Network bandwidth saturation

Mitigation Strategies:

Multi-Layer Defense:

Layer 1: Network Level (ISP/CDN)
- DDoS scrubbing centers
- Anycast routing
- Traffic filtering
- Bandwidth: 100+ Gbps capacity

Layer 2: Edge (CDN/WAF)
- IP reputation filtering
- Geographic blocking
- Challenge-response (CAPTCHA)
- Rate limiting: 1000 req/s per IP

Layer 3: Application (Rate Limiter)
- Per-user rate limiting
- Per-API-key rate limiting
- Adaptive throttling
- Behavioral analysis

Layer 4: Backend Protection
- Connection pooling
- Request queuing
- Circuit breakers
- Graceful degradation

Implementation:
class DDoSProtection:
    def check_request(self, request):
        # Layer 1: IP reputation
        if self.is_known_attacker(request.ip):
            return block_immediately()
        
        # Layer 2: Rate limiting
        if not self.check_rate_limit(request):
            return challenge_with_captcha()
        
        # Layer 3: Behavioral analysis
        if self.is_suspicious_pattern(request):
            return require_additional_verification()
        
        return allow()

2. Credential Stuffing and Brute Force

Threat: Attackers try stolen credentials or guess passwords

Attack Pattern:

Credential Stuffing:
- 10,000 login attempts per minute
- Different username/password combinations
- Distributed across many IPs
- Automated with bots

Brute Force:
- Systematic password guessing
- Dictionary attacks
- Rainbow table attacks
- Targeted at specific accounts

Mitigation:

Specialized Rate Limiting:

Login Endpoint Protection:
- 5 failed attempts per account per hour
- 100 login attempts per IP per hour
- Progressive delays (exponential backoff)
- CAPTCHA after 3 failed attempts
- Account lockout after 10 failed attempts

Implementation:
class LoginRateLimiter:
    def check_login(self, username, ip):
        # Per-account limit
        account_attempts = self.get_failed_attempts(username)
        if account_attempts >= 5:
            return deny_with_lockout(duration=3600)
        
        # Per-IP limit
        ip_attempts = self.get_ip_attempts(ip)
        if ip_attempts >= 100:
            return deny_with_captcha()
        
        # Progressive delay
        if account_attempts >= 3:
            delay = 2 ** account_attempts  # Exponential backoff
            return delay_response(delay)
        
        return allow()
    
    def on_failed_login(self, username, ip):
        self.increment_failed_attempts(username)
        self.increment_ip_attempts(ip)
        
        # Alert on suspicious patterns
        if self.detect_credential_stuffing(ip):
            self.alert_security_team()
            self.block_ip(ip, duration=86400)

Additional Protections:
- Multi-factor authentication (MFA)
- Device fingerprinting
- Behavioral biometrics
- Anomaly detection

3. API Key Theft and Abuse

Threat: Stolen API keys used to exhaust quotas or access data

Attack Scenarios:

Key Theft:
- Keys leaked in public repositories
- Keys exposed in client-side code
- Keys stolen via phishing
- Keys compromised in breaches

Key Abuse:
- Quota exhaustion attacks
- Data exfiltration
- Service disruption
- Unauthorized access

Mitigation:

API Key Security:

1. Key Management:
- Rotate keys regularly (every 90 days)
- Separate keys for dev/staging/prod
- Scope keys to specific permissions
- Revoke compromised keys immediately

2. Key Monitoring:
class APIKeyMonitor:
    def monitor_key_usage(self, api_key):
        usage = self.get_key_usage(api_key)
        
        # Detect anomalies
        if usage.requests_per_hour > usage.baseline * 3:
            self.alert_unusual_activity(api_key)
        
        if usage.new_ip_addresses > 10:
            self.alert_potential_theft(api_key)
        
        if usage.error_rate > 0.5:
            self.alert_suspicious_behavior(api_key)
        
        # Geographic anomalies
        if usage.new_countries > 3:
            self.require_verification(api_key)

3. Key Restrictions:
- IP allowlisting
- Referrer restrictions
- Time-based access
- Geographic restrictions

Example Configuration:
{
  "api_key": "sk_live_abc123",
  "restrictions": {
    "allowed_ips": ["203.0.113.0/24"],
    "allowed_referrers": ["https://example.com"],
    "allowed_hours": "09:00-17:00 UTC",
    "allowed_countries": ["US", "CA"]
  }
}

4. Rate Limit Bypass Attempts

Threat: Attackers try to circumvent rate limiting

Bypass Techniques:

1. IP Rotation:
- Use proxy networks
- Tor exit nodes
- Cloud provider IPs
- Residential proxies

2. Distributed Attacks:
- Botnets (100K+ devices)
- Cloud instances
- Compromised servers
- IoT devices

3. Header Manipulation:
- Fake X-Forwarded-For headers
- User-Agent rotation
- Cookie manipulation
- Session token abuse

4. Timing Attacks:
- Request just under limit
- Exploit window boundaries
- Race conditions
- Clock skew exploitation

Detection and Prevention:

class BypassDetection:
    def detect_bypass_attempt(self, request):
        signals = []
        
        # IP reputation check
        if self.is_proxy_or_vpn(request.ip):
            signals.append('proxy_usage')
        
        # Header analysis
        if self.has_suspicious_headers(request):
            signals.append('header_manipulation')
        
        # Behavioral analysis
        if self.is_automated_traffic(request):
            signals.append('bot_detected')
        
        # Timing analysis
        if self.is_timing_attack(request):
            signals.append('timing_attack')
        
        # Risk scoring
        risk_score = self.calculate_risk(signals)
        
        if risk_score > 0.8:
            return block_and_alert()
        elif risk_score > 0.5:
            return challenge_with_captcha()
        else:
            return allow()
    
    def is_automated_traffic(self, request):
        # Check for bot patterns
        user_agent = request.headers.get('User-Agent')
        if self.is_known_bot(user_agent):
            return True
        
        # Check request timing
        if self.has_perfect_timing(request):
            return True  # Too consistent = bot
        
        # Check mouse movements (if available)
        if not self.has_human_behavior(request):
            return True
        
        return False

Countermeasures:
- Device fingerprinting
- Behavioral analysis
- CAPTCHA challenges
- Proof-of-work requirements
- Progressive delays

Privacy and Compliance

1. Data Privacy (GDPR, CCPA)

Requirements:

Data Collection:
- Collect only necessary data
- Clear purpose for collection
- User consent required
- Transparent data usage

Data Storage:
- Encrypt at rest
- Encrypt in transit
- Access controls
- Audit logging

Data Retention:
- Define retention periods
- Automatic deletion
- Right to be forgotten
- Data portability

Implementation:
class PrivacyCompliance:
    def collect_rate_limit_data(self, user):
        # Minimal data collection
        data = {
            'user_id': self.hash_user_id(user.id),  # Pseudonymization
            'timestamp': now(),
            'request_count': 1,
            # No PII collected
        }
        
        # Log consent
        if not user.has_consented_to_analytics():
            data = self.anonymize_further(data)
        
        return data
    
    def handle_deletion_request(self, user_id):
        # Delete all user data
        self.delete_rate_limit_counters(user_id)
        self.delete_audit_logs(user_id)
        self.delete_api_keys(user_id)
        self.delete_quotas(user_id)
        
        # Log deletion
        self.log_deletion(user_id, timestamp=now())
    
    def export_user_data(self, user_id):
        # Data portability
        return {
            'rate_limits': self.get_rate_limits(user_id),
            'usage_history': self.get_usage_history(user_id),
            'api_keys': self.get_api_keys(user_id),
            'audit_logs': self.get_audit_logs(user_id)
        }

2. Audit Logging and Compliance

Requirements:

Audit Log Contents:
- Who: User/API key identifier
- What: Action performed
- When: Timestamp (UTC)
- Where: IP address, location
- Why: Rate limit rule applied
- Result: Allowed/denied

Compliance Standards:
- SOC 2 Type II
- ISO 27001
- PCI DSS (for payment APIs)
- HIPAA (for healthcare APIs)

Implementation:
class AuditLogger:
    def log_rate_limit_decision(self, request, decision):
        log_entry = {
            'timestamp': now(),
            'user_id': self.hash_pii(request.user_id),
            'ip_address': self.mask_ip(request.ip),  # Mask last octet
            'endpoint': request.endpoint,
            'decision': decision,
            'rule_id': decision.rule_id,
            'quota_remaining': decision.remaining,
            'request_id': request.id
        }
        
        # Write to immutable log
        self.write_to_audit_log(log_entry)
        
        # Compliance reporting
        if decision == 'denied':
            self.update_compliance_metrics()
    
    def mask_ip(self, ip):
        # Mask last octet for privacy
        # 203.0.113.42 → 203.0.113.0
        parts = ip.split('.')
        parts[-1] = '0'
        return '.'.join(parts)

3. Secure Configuration Management

Best Practices:

Configuration Security:

1. Encryption:
- Encrypt sensitive config at rest
- Use secrets management (Vault, KMS)
- Rotate encryption keys regularly

2. Access Control:
- Role-based access control (RBAC)
- Principle of least privilege
- Multi-factor authentication
- Audit all config changes

3. Version Control:
- Track all configuration changes
- Require code review
- Automated testing
- Rollback capability

Implementation:
class SecureConfigManager:
    def update_rate_limit_rule(self, rule, user):
        # Authorization check
        if not self.has_permission(user, 'update_rules'):
            raise PermissionDenied()
        
        # Validate rule
        if not self.validate_rule(rule):
            raise InvalidRule()
        
        # Encrypt sensitive data
        rule.api_keys = self.encrypt(rule.api_keys)
        
        # Version control
        old_rule = self.get_current_rule(rule.id)
        self.save_version(old_rule)
        
        # Apply change
        self.save_rule(rule)
        
        # Audit log
        self.log_config_change(user, old_rule, rule)
        
        # Notify stakeholders
        self.notify_change(rule)

4. Incident Response

Procedures:

Incident Types:
1. Rate limit bypass detected
2. DDoS attack in progress
3. API key compromise
4. System outage
5. Data breach

Response Plan:
class IncidentResponse:
    def handle_incident(self, incident):
        # 1. Detect and Alert
        self.alert_security_team(incident)
        
        # 2. Contain
        if incident.type == 'ddos':
            self.enable_aggressive_rate_limiting()
            self.block_attack_ips()
        elif incident.type == 'key_compromise':
            self.revoke_compromised_keys()
            self.notify_affected_users()
        
        # 3. Investigate
        self.collect_evidence(incident)
        self.analyze_attack_pattern(incident)
        
        # 4. Recover
        self.restore_normal_operations()
        self.verify_system_integrity()
        
        # 5. Post-Mortem
        self.document_incident(incident)
        self.update_security_measures()
        self.train_team()

Escalation:
- Low severity: Security team notified
- Medium severity: Engineering manager notified
- High severity: CTO notified, all hands on deck
- Critical severity: CEO notified, external help engaged

Security Best Practices

1. Defense in Depth

Multiple security layers:
1. Network firewall
2. DDoS protection (CDN)
3. WAF (Web Application Firewall)
4. Rate limiting (this system)
5. Application security
6. Database security
7. Monitoring and alerting

2. Principle of Least Privilege

Access Control:
- Users: Only their own quotas
- API keys: Scoped permissions
- Admins: Specific admin functions
- Services: Service-to-service auth

3. Regular Security Audits

Audit Schedule:
- Weekly: Automated security scans
- Monthly: Manual security review
- Quarterly: Penetration testing
- Annually: Third-party security audit

This comprehensive security and privacy approach ensures the rate limiting system is robust against attacks while respecting user privacy and maintaining compliance.