API Rate Limiter - Security and Privacy
Security Threats and Mitigations
1. Distributed Denial of Service (DDoS)
Threat: Attackers overwhelm system with massive request volume
Attack Patterns:
Layer 7 DDoS:
- 1M requests/second from 100K IPs
- Targets specific endpoints
- Mimics legitimate traffic
- Exhausts application resources
Layer 4 DDoS:
- SYN flood attacks
- UDP amplification
- TCP connection exhaustion
- Network bandwidth saturationMitigation Strategies:
Multi-Layer Defense:
Layer 1: Network Level (ISP/CDN)
- DDoS scrubbing centers
- Anycast routing
- Traffic filtering
- Bandwidth: 100+ Gbps capacity
Layer 2: Edge (CDN/WAF)
- IP reputation filtering
- Geographic blocking
- Challenge-response (CAPTCHA)
- Rate limiting: 1000 req/s per IP
Layer 3: Application (Rate Limiter)
- Per-user rate limiting
- Per-API-key rate limiting
- Adaptive throttling
- Behavioral analysis
Layer 4: Backend Protection
- Connection pooling
- Request queuing
- Circuit breakers
- Graceful degradation
Implementation:
class DDoSProtection:
def check_request(self, request):
# Layer 1: IP reputation
if self.is_known_attacker(request.ip):
return block_immediately()
# Layer 2: Rate limiting
if not self.check_rate_limit(request):
return challenge_with_captcha()
# Layer 3: Behavioral analysis
if self.is_suspicious_pattern(request):
return require_additional_verification()
return allow()2. Credential Stuffing and Brute Force
Threat: Attackers try stolen credentials or guess passwords
Attack Pattern:
Credential Stuffing:
- 10,000 login attempts per minute
- Different username/password combinations
- Distributed across many IPs
- Automated with bots
Brute Force:
- Systematic password guessing
- Dictionary attacks
- Rainbow table attacks
- Targeted at specific accountsMitigation:
Specialized Rate Limiting:
Login Endpoint Protection:
- 5 failed attempts per account per hour
- 100 login attempts per IP per hour
- Progressive delays (exponential backoff)
- CAPTCHA after 3 failed attempts
- Account lockout after 10 failed attempts
Implementation:
class LoginRateLimiter:
def check_login(self, username, ip):
# Per-account limit
account_attempts = self.get_failed_attempts(username)
if account_attempts >= 5:
return deny_with_lockout(duration=3600)
# Per-IP limit
ip_attempts = self.get_ip_attempts(ip)
if ip_attempts >= 100:
return deny_with_captcha()
# Progressive delay
if account_attempts >= 3:
delay = 2 ** account_attempts # Exponential backoff
return delay_response(delay)
return allow()
def on_failed_login(self, username, ip):
self.increment_failed_attempts(username)
self.increment_ip_attempts(ip)
# Alert on suspicious patterns
if self.detect_credential_stuffing(ip):
self.alert_security_team()
self.block_ip(ip, duration=86400)
Additional Protections:
- Multi-factor authentication (MFA)
- Device fingerprinting
- Behavioral biometrics
- Anomaly detection3. API Key Theft and Abuse
Threat: Stolen API keys used to exhaust quotas or access data
Attack Scenarios:
Key Theft:
- Keys leaked in public repositories
- Keys exposed in client-side code
- Keys stolen via phishing
- Keys compromised in breaches
Key Abuse:
- Quota exhaustion attacks
- Data exfiltration
- Service disruption
- Unauthorized accessMitigation:
API Key Security:
1. Key Management:
- Rotate keys regularly (every 90 days)
- Separate keys for dev/staging/prod
- Scope keys to specific permissions
- Revoke compromised keys immediately
2. Key Monitoring:
class APIKeyMonitor:
def monitor_key_usage(self, api_key):
usage = self.get_key_usage(api_key)
# Detect anomalies
if usage.requests_per_hour > usage.baseline * 3:
self.alert_unusual_activity(api_key)
if usage.new_ip_addresses > 10:
self.alert_potential_theft(api_key)
if usage.error_rate > 0.5:
self.alert_suspicious_behavior(api_key)
# Geographic anomalies
if usage.new_countries > 3:
self.require_verification(api_key)
3. Key Restrictions:
- IP allowlisting
- Referrer restrictions
- Time-based access
- Geographic restrictions
Example Configuration:
{
"api_key": "sk_live_abc123",
"restrictions": {
"allowed_ips": ["203.0.113.0/24"],
"allowed_referrers": ["https://example.com"],
"allowed_hours": "09:00-17:00 UTC",
"allowed_countries": ["US", "CA"]
}
}4. Rate Limit Bypass Attempts
Threat: Attackers try to circumvent rate limiting
Bypass Techniques:
1. IP Rotation:
- Use proxy networks
- Tor exit nodes
- Cloud provider IPs
- Residential proxies
2. Distributed Attacks:
- Botnets (100K+ devices)
- Cloud instances
- Compromised servers
- IoT devices
3. Header Manipulation:
- Fake X-Forwarded-For headers
- User-Agent rotation
- Cookie manipulation
- Session token abuse
4. Timing Attacks:
- Request just under limit
- Exploit window boundaries
- Race conditions
- Clock skew exploitationDetection and Prevention:
class BypassDetection:
def detect_bypass_attempt(self, request):
signals = []
# IP reputation check
if self.is_proxy_or_vpn(request.ip):
signals.append('proxy_usage')
# Header analysis
if self.has_suspicious_headers(request):
signals.append('header_manipulation')
# Behavioral analysis
if self.is_automated_traffic(request):
signals.append('bot_detected')
# Timing analysis
if self.is_timing_attack(request):
signals.append('timing_attack')
# Risk scoring
risk_score = self.calculate_risk(signals)
if risk_score > 0.8:
return block_and_alert()
elif risk_score > 0.5:
return challenge_with_captcha()
else:
return allow()
def is_automated_traffic(self, request):
# Check for bot patterns
user_agent = request.headers.get('User-Agent')
if self.is_known_bot(user_agent):
return True
# Check request timing
if self.has_perfect_timing(request):
return True # Too consistent = bot
# Check mouse movements (if available)
if not self.has_human_behavior(request):
return True
return False
Countermeasures:
- Device fingerprinting
- Behavioral analysis
- CAPTCHA challenges
- Proof-of-work requirements
- Progressive delaysPrivacy and Compliance
1. Data Privacy (GDPR, CCPA)
Requirements:
Data Collection:
- Collect only necessary data
- Clear purpose for collection
- User consent required
- Transparent data usage
Data Storage:
- Encrypt at rest
- Encrypt in transit
- Access controls
- Audit logging
Data Retention:
- Define retention periods
- Automatic deletion
- Right to be forgotten
- Data portability
Implementation:
class PrivacyCompliance:
def collect_rate_limit_data(self, user):
# Minimal data collection
data = {
'user_id': self.hash_user_id(user.id), # Pseudonymization
'timestamp': now(),
'request_count': 1,
# No PII collected
}
# Log consent
if not user.has_consented_to_analytics():
data = self.anonymize_further(data)
return data
def handle_deletion_request(self, user_id):
# Delete all user data
self.delete_rate_limit_counters(user_id)
self.delete_audit_logs(user_id)
self.delete_api_keys(user_id)
self.delete_quotas(user_id)
# Log deletion
self.log_deletion(user_id, timestamp=now())
def export_user_data(self, user_id):
# Data portability
return {
'rate_limits': self.get_rate_limits(user_id),
'usage_history': self.get_usage_history(user_id),
'api_keys': self.get_api_keys(user_id),
'audit_logs': self.get_audit_logs(user_id)
}2. Audit Logging and Compliance
Requirements:
Audit Log Contents:
- Who: User/API key identifier
- What: Action performed
- When: Timestamp (UTC)
- Where: IP address, location
- Why: Rate limit rule applied
- Result: Allowed/denied
Compliance Standards:
- SOC 2 Type II
- ISO 27001
- PCI DSS (for payment APIs)
- HIPAA (for healthcare APIs)
Implementation:
class AuditLogger:
def log_rate_limit_decision(self, request, decision):
log_entry = {
'timestamp': now(),
'user_id': self.hash_pii(request.user_id),
'ip_address': self.mask_ip(request.ip), # Mask last octet
'endpoint': request.endpoint,
'decision': decision,
'rule_id': decision.rule_id,
'quota_remaining': decision.remaining,
'request_id': request.id
}
# Write to immutable log
self.write_to_audit_log(log_entry)
# Compliance reporting
if decision == 'denied':
self.update_compliance_metrics()
def mask_ip(self, ip):
# Mask last octet for privacy
# 203.0.113.42 → 203.0.113.0
parts = ip.split('.')
parts[-1] = '0'
return '.'.join(parts)3. Secure Configuration Management
Best Practices:
Configuration Security:
1. Encryption:
- Encrypt sensitive config at rest
- Use secrets management (Vault, KMS)
- Rotate encryption keys regularly
2. Access Control:
- Role-based access control (RBAC)
- Principle of least privilege
- Multi-factor authentication
- Audit all config changes
3. Version Control:
- Track all configuration changes
- Require code review
- Automated testing
- Rollback capability
Implementation:
class SecureConfigManager:
def update_rate_limit_rule(self, rule, user):
# Authorization check
if not self.has_permission(user, 'update_rules'):
raise PermissionDenied()
# Validate rule
if not self.validate_rule(rule):
raise InvalidRule()
# Encrypt sensitive data
rule.api_keys = self.encrypt(rule.api_keys)
# Version control
old_rule = self.get_current_rule(rule.id)
self.save_version(old_rule)
# Apply change
self.save_rule(rule)
# Audit log
self.log_config_change(user, old_rule, rule)
# Notify stakeholders
self.notify_change(rule)4. Incident Response
Procedures:
Incident Types:
1. Rate limit bypass detected
2. DDoS attack in progress
3. API key compromise
4. System outage
5. Data breach
Response Plan:
class IncidentResponse:
def handle_incident(self, incident):
# 1. Detect and Alert
self.alert_security_team(incident)
# 2. Contain
if incident.type == 'ddos':
self.enable_aggressive_rate_limiting()
self.block_attack_ips()
elif incident.type == 'key_compromise':
self.revoke_compromised_keys()
self.notify_affected_users()
# 3. Investigate
self.collect_evidence(incident)
self.analyze_attack_pattern(incident)
# 4. Recover
self.restore_normal_operations()
self.verify_system_integrity()
# 5. Post-Mortem
self.document_incident(incident)
self.update_security_measures()
self.train_team()
Escalation:
- Low severity: Security team notified
- Medium severity: Engineering manager notified
- High severity: CTO notified, all hands on deck
- Critical severity: CEO notified, external help engagedSecurity Best Practices
1. Defense in Depth
Multiple security layers:
1. Network firewall
2. DDoS protection (CDN)
3. WAF (Web Application Firewall)
4. Rate limiting (this system)
5. Application security
6. Database security
7. Monitoring and alerting2. Principle of Least Privilege
Access Control:
- Users: Only their own quotas
- API keys: Scoped permissions
- Admins: Specific admin functions
- Services: Service-to-service auth3. Regular Security Audits
Audit Schedule:
- Weekly: Automated security scans
- Monthly: Manual security review
- Quarterly: Penetration testing
- Annually: Third-party security auditThis comprehensive security and privacy approach ensures the rate limiting system is robust against attacks while respecting user privacy and maintaining compliance.