Scaling Considerations

📖 14 min read 📄 Part 6 of 10

Distributed Tracing System - Scaling Considerations

Overview

Scaling a distributed tracing system presents unique challenges: the system must handle write volumes that grow linearly with application traffic, support bursty query patterns during incidents, and maintain low overhead on instrumented services. Unlike most systems where reads dominate, tracing is heavily write-biased (1000:1 write-to-read ratio). The scaling strategy must address collection, storage, indexing, and query serving independently since each has different bottlenecks.


Collector Scaling

Horizontal Scaling Architecture

Collector Cluster Design:
┌─────────────────────────────────────────────────────────────┐
│                    Load Balancer (L4)                        │
│              (DNS round-robin or NLB)                        │
└─────────────┬──────────┬──────────┬──────────┬─────────────┘
              │          │          │          │
        ┌─────▼──┐ ┌─────▼──┐ ┌─────▼──┐ ┌─────▼──┐
        │Collect1│ │Collect2│ │Collect3│ │Collect4│
        │ 50K/s  │ │ 50K/s  │ │ 50K/s  │ │ 50K/s  │
        └────┬───┘ └────┬───┘ └────┬───┘ └────┬───┘
             │          │          │          │
        ┌────▼──────────▼──────────▼──────────▼────┐
        │              Kafka Cluster                 │
        │     (partitioned by trace_id hash)         │
        └──────────────────────────────────────────┘

Key Design Decisions:
- Collectors are stateless (no local state, easy to scale)
- Load balancing: L4 (TCP) for gRPC, L7 for HTTP
- No affinity required (any collector handles any span)
- Auto-scaling trigger: CPU > 70% or queue depth > 10K

Load Balancing by Trace ID

Why Trace-ID Affinity Matters (for tail-based sampling):
- Tail-based sampling requires seeing ALL spans of a trace
- If spans scatter across collectors, no single collector sees the full trace
- Solution: Hash trace_id to route all spans of a trace to same collector

Implementation Options:

1. Consistent Hashing at Load Balancer:
   - Extract trace_id from header/payload
   - Hash to collector ring
   - Handles collector additions/removals gracefully
   - Complexity: Requires L7 load balancer with custom routing

2. Two-Tier Collection:
   - Tier 1: Stateless receivers (accept from agents, no routing)
   - Tier 2: Trace-affinity processors (Kafka consumer groups)
   - Kafka partition key = trace_id
   - Each partition processed by one consumer → sees full trace

3. Gossip-Based Redistribution:
   - Collectors gossip about partial traces
   - Forward spans to "owner" collector for that trace_id
   - Higher network overhead but simpler deployment

Recommended: Two-tier with Kafka (most production systems use this)

Collector Processing Pipeline

Per-Collector Processing Stages:
1. Receive (gRPC/HTTP) → deserialize spans
2. Validate (schema check, required fields)
3. Enrich (add collector metadata, normalize timestamps)
4. Sample (head-based: check trace flags; tail-based: buffer)
5. Batch (group spans for efficient downstream writes)
6. Export (write to Kafka / direct to storage)

Performance Characteristics:
- Receive: 100K spans/sec per core (gRPC with protobuf)
- Validate: 500K spans/sec per core (simple field checks)
- Enrich: 300K spans/sec per core (timestamp normalization)
- Sample: 1M spans/sec per core (probabilistic check)
- Batch: Accumulate 500 spans or 1 second, whichever first
- Export: Bounded by downstream (Kafka write latency ~5ms)

Bottleneck: Usually network I/O or downstream write capacity
Scaling: Add collectors linearly; 1 collector per 50K spans/sec

Storage Scaling

Sharding by Time + Trace ID Hash

Cassandra Sharding Strategy:
- Partition key: trace_id (natural distribution via UUID)
- Time-based compaction: TimeWindowCompactionStrategy
- Token range: 256 vnodes per node (even distribution)

Write Distribution:
- trace_id is UUID → uniform hash distribution across nodes
- No hot spots for span writes (unlike service-based partitioning)
- Index tables use (service_name, date_bucket) → potential hot spots
  - Mitigation: Add hash_bucket to partition key for hot services

Scaling Triggers:
- Disk usage > 60%: Add nodes
- Write latency P99 > 50ms: Add nodes or tune compaction
- Read latency P99 > 100ms: Add nodes or add read replicas

Capacity per Node (i3.2xlarge):
- Write: 15K spans/sec sustained
- Storage: 1.7 TB usable (after replication)
- Read: 5K trace lookups/sec

Elasticsearch Scaling

Index Sharding Strategy:
- Daily indices: jaeger-span-2024-01-15
- Shards per index: 5 primary + 1 replica = 10 total
- Shard size target: 30-50 GB (optimal for search performance)

Scaling Dimensions:
1. Write throughput: Add data nodes (each handles ~10K docs/sec)
2. Search performance: Add replicas (more shards to search in parallel)
3. Storage capacity: Add nodes with larger disks
4. Query concurrency: Add coordinating-only nodes

Hot-Warm-Cold Architecture:
- Hot nodes (NVMe SSD): Current day's index, all writes
- Warm nodes (SSD): 2-7 day old indices, read-only
- Cold nodes (HDD): 7-30 day old indices, frozen
- Frozen tier (S3): 30+ days, searchable snapshots

Node Sizing:
- Hot: i3.2xlarge (8 vCPU, 61 GB RAM, 1.9 TB NVMe) × 6
- Warm: d2.2xlarge (8 vCPU, 61 GB RAM, 12 TB HDD) × 4
- Cold: d2.xlarge (4 vCPU, 30 GB RAM, 6 TB HDD) × 4

ClickHouse Scaling

Sharding + Replication:
- Cluster: 3 shards × 2 replicas = 6 nodes
- Distributed table routes inserts by rand() or cityHash64(trace_id)
- Each shard handles ~50K inserts/sec
- Total cluster: 150K inserts/sec

Partition Pruning:
- Queries always include time range → partition pruning eliminates old data
- ORDER BY (service_name, operation_name, start_time) → efficient filtering
- Materialized views pre-aggregate → dashboards query aggregates, not raw data

Scaling Triggers:
- Insert latency > 100ms: Add shards
- Query latency > 5s: Add replicas or optimize materialized views
- Disk > 70%: Add storage or reduce TTL

Query Scaling

Read Replicas and Caching

Query Tier Architecture:
┌──────────────────────────────────────────────────────┐
│                   API Gateway                         │
│            (rate limiting, auth)                      │
└──────────┬───────────────────────────┬───────────────┘
           │                           │
    ┌──────▼──────┐            ┌───────▼──────┐
    │ Query Cache │            │ Query Router  │
    │  (Redis)    │            │              │
    └──────┬──────┘            └───────┬──────┘
           │ miss                      │
    ┌──────▼──────────────────────────▼────────┐
    │           Query Executors                  │
    │  (fan-out to appropriate storage)         │
    └──┬──────────┬──────────────┬─────────────┘
       │          │              │
  ┌────▼───┐ ┌───▼────┐  ┌─────▼──────┐
  │Cassandra│ │  ES    │  │ ClickHouse │
  │(by ID) │ │(search)│  │(analytics) │
  └────────┘ └────────┘  └────────────┘

Caching Strategy:
- Trace by ID: Cache in Redis (TTL: 5 minutes)
  - Hit rate: 60-70% (developers reload same trace)
  - Cache size: 10 GB (stores ~20M compressed traces)
  
- Service list: Cache indefinitely (invalidate on new service)
- Operation list: Cache 5 minutes
- Dependency graph: Cache 1 minute (frequently refreshed dashboards)
- Search results: Do NOT cache (time-sensitive, low repeat rate)

Query Optimization Techniques

1. Trace-by-ID Optimization:
   - Direct Cassandra lookup (single partition read)
   - Parallel fetch: read all spans in one partition scan
   - Response streaming for large traces (>100 spans)

2. Search Query Optimization:
   - Push filters to storage layer (don't fetch then filter)
   - Time range is always required (bounds the search space)
   - Use Elasticsearch for complex tag queries
   - Use Cassandra indexes for simple service+time queries

3. Analytics Query Optimization:
   - Pre-aggregated materialized views in ClickHouse
   - Dashboard queries hit aggregates, never raw spans
   - Percentile approximation (t-digest) for fast P99 computation
   - Query result pagination with cursor-based approach

4. Fan-Out Query Pattern:
   - "Find traces where service A calls service B with error"
   - Step 1: Query ES for spans matching service A + error
   - Step 2: For each trace_id, fetch full trace from Cassandra
   - Step 3: Filter traces where B is called by A
   - Optimization: Store caller-callee pairs in dedicated index

Sampling Strategies at Scale

Head-Based Sampling

Decision Point: At trace creation (first span)
Propagation: Sampling decision travels with trace context

Implementation:
- Generate random number from trace_id hash
- Compare against sampling rate threshold
- Decision is deterministic per trace_id (consistent across services)

Advantages:
- Zero buffering required
- Minimal CPU overhead (<0.1%)
- Predictable storage costs
- Simple implementation

Disadvantages:
- Cannot sample based on trace outcome (errors, latency)
- May miss rare but important traces
- Fixed rate doesn't adapt to traffic patterns

Configuration:
  default_sampling_rate: 0.01  # 1% of all traces
  per_service_overrides:
    payment-service: 0.10      # 10% (critical service)
    health-check: 0.0001       # 0.01% (noisy, low value)
  per_operation_overrides:
    "POST /api/orders": 0.05   # 5% (important flow)

Tail-Based Sampling

Decision Point: After trace is complete (all spans collected)
Requirement: Buffer all spans until decision is made

Implementation:
- Collector buffers spans for each trace (60-second window)
- When trace is "complete" (no new spans for 30s), evaluate policies
- Keep trace if: has error, exceeds latency threshold, matches rules
- Discard trace if: normal, fast, no errors

Policies (evaluated in order):
1. Always keep: status_code = ERROR
2. Always keep: duration > P99 threshold (dynamic)
3. Always keep: matches alert rule patterns
4. Probabilistic: keep 1% of remaining traces
5. Rate limit: max 10,000 traces/sec total output

Resource Requirements:
- Memory per collector: 8-16 GB (buffering in-flight traces)
- In-flight traces: 600K concurrent (60s window × 10K traces/sec)
- Memory per trace: ~10 KB (10 spans × 1 KB each)
- Total buffer: 600K × 10 KB = 6 GB per collector

Advantages:
- Captures all errors and slow traces
- Adaptive to actual trace characteristics
- Better signal-to-noise ratio in stored data

Disadvantages:
- Higher memory usage (buffering)
- Higher latency to storage (60s delay)
- Complex implementation (trace completion detection)
- Requires trace-ID affinity routing

Adaptive Sampling

Dynamic Rate Adjustment:
- Monitor incoming span rate per service
- Adjust sampling rate to maintain target storage budget
- React to traffic spikes by reducing sampling rate
- React to low traffic by increasing sampling rate

Algorithm:
  target_spans_per_sec = 100,000  # storage budget
  
  for each service:
    current_rate = observed_spans_per_sec[service]
    current_sampling = sampling_rate[service]
    actual_stored = current_rate * current_sampling
    
    # Adjust to hit target proportionally
    service_budget = target_spans_per_sec * (service_weight / total_weight)
    new_sampling = min(1.0, service_budget / current_rate)
    
    # Smooth adjustment (avoid oscillation)
    sampling_rate[service] = 0.8 * current_sampling + 0.2 * new_sampling

Update Frequency: Every 30 seconds
Propagation: Collectors fetch latest rates from config service

Agent-Side Buffering and Batching

Agent Architecture

In-Process Agent (Sidecar or Library):
┌─────────────────────────────────────────┐
              Application                 
  ┌─────────────────────────────────┐    
       OpenTelemetry SDK               
    ┌──────────┐  ┌────────────┐      
     Span        Batch            
     Creator  │→  Processor        
    └──────────┘  └─────┬──────┘      
  └───────────────────────┼─────────┘    
└──────────────────────────┼──────────────┘
                           
                    ┌──────▼──────┐
                       Exporter  
                      (gRPC/HTTP)│
                    └──────┬──────┘
                            Batched spans
                    ┌──────▼──────┐
                      Collector  
                    └─────────────┘

Batching Configuration:
- max_queue_size: 2048 spans (backpressure threshold)
- max_export_batch_size: 512 spans per batch
- schedule_delay: 5000ms (flush interval)
- export_timeout: 30000ms (per batch export timeout)

Memory Budget:
- Queue: 2048 spans × 1 KB = 2 MB
- Export buffer: 512 spans × 1 KB = 512 KB
- Total agent memory: ~5 MB (including SDK overhead)
- CPU overhead: <2% of host CPU

Backpressure Handling

When Collector is Unavailable:
1. Agent buffers spans in memory queue (up to max_queue_size)
2. If queue full: drop oldest spans (or drop by priority)
3. Retry export with exponential backoff: 1s, 2s, 4s, 8s, 16s, 30s
4. Report dropped span count as metric
5. Never block application threads (async export only)

Priority-Based Dropping:
- Priority 1 (never drop): Error spans, root spans
- Priority 2 (drop last): Spans with interesting tags
- Priority 3 (drop first): Normal internal spans, health checks

Disk Buffering (optional, for critical traces):
- Spill to local disk when memory queue is full
- Replay from disk when collector recovers
- Bounded disk usage: 100 MB max
- Used by: Datadog Agent, Elastic APM Agent

Kafka as Intermediate Buffer

Kafka Topology

Kafka Cluster Configuration:
- Topics: spans-raw (ingestion), spans-sampled (post-sampling)
- Partitions: 64 per topic (allows 64 parallel consumers)
- Replication factor: 3 (durability)
- Retention: 4 hours (buffer for downstream outages)
- Segment size: 1 GB
- Compression: LZ4 (best throughput/compression ratio)

Partition Key Strategy:
- spans-raw: trace_id (ensures all spans of a trace go to same partition)
- spans-sampled: service_name (balances storage writes by service)

Throughput:
- Per partition: ~10 MB/sec write, ~30 MB/sec read
- 64 partitions: 640 MB/sec write capacity
- At 500 bytes/span: 1.28M spans/sec capacity (12x headroom)

Benefits of Kafka Buffer

1. Decoupling:
   - Collectors write to Kafka regardless of storage health
   - Storage can be upgraded/restarted without data loss
   - Multiple consumers can read same data (ES + Cassandra + ClickHouse)

2. Backpressure Absorption:
   - Traffic spikes buffered in Kafka (hours of retention)
   - Consumers process at their own pace
   - No data loss during storage maintenance windows

3. Replay Capability:
   - Re-process historical spans (e.g., after index schema change)
   - Reset consumer offset to reindex data
   - Useful for backfilling new analytics views

4. Fan-Out:
   - Single write from collector
   - Multiple consumers: storage writer, indexer, metrics aggregator, alerting
   - Each consumer group processes independently

Consumer Groups:
- storage-writer: Writes to Cassandra (primary storage)
- search-indexer: Writes to Elasticsearch (search index)
- metrics-aggregator: Computes RED metrics, writes to ClickHouse
- tail-sampler: Buffers traces, makes sampling decisions
- alerting-engine: Evaluates trace-based alert rules

Index Optimization

Pre-Aggregated Service Metrics

Instead of querying raw spans for dashboards, pre-compute metrics:

Aggregation Pipeline:
  Raw Spans → Kafka → Metrics Aggregator → ClickHouse Materialized Views

Pre-Computed Metrics (per service, per operation, per minute):
- Request count
- Error count
- Duration percentiles (P50, P95, P99)
- Throughput (requests/sec)

Storage Savings:
- Raw query: Scan 8.6M spans to compute 1-hour P99 for one service
- Pre-aggregated: Read 60 rows (one per minute) from materialized view
- Speedup: 100,000x for dashboard queries

Implementation in ClickHouse:
- AggregatingMergeTree with quantileState() for percentiles
- SummingMergeTree for counts and sums
- Automatic background merging keeps aggregates fresh
- Query latency: <100ms for any time range

Bloom Filters for Tag Existence

Problem: "Find traces with tag customer_id=12345" requires scanning all spans
Solution: Bloom filter index per time bucket

Implementation:
- One bloom filter per (service, tag_key, time_bucket)
- Filter answers: "Does this time bucket contain tag_key=tag_value?"
- False positive rate: 1% (acceptable for narrowing search)
- Memory: 10 bits per entry × 1M entries = 1.2 MB per filter

Query Flow:
1. Check bloom filters for each time bucket
2. Only scan buckets where bloom filter says "maybe yes"
3. Verify actual matches in those buckets
4. Reduces scan from 100% to ~5% of data

Multi-Region Collection and Centralized Querying

Architecture Options

Option 1: Regional Collection, Centralized Storage
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│  US-East    │  │  US-West    │  │  EU-West    │
│ Collectors  │  │ Collectors  │  │ Collectors  │
└──────┬──────┘  └──────┬──────┘  └──────┬──────┘
       │                │                │
       └────────────────┼────────────────┘
                        │ Cross-region replication
                 ┌──────▼──────┐
                 │  Central    │
                 │  Storage    │
                 │ (US-East)   │
                 └─────────────┘

Pros: Single query endpoint, complete trace visibility
Cons: Cross-region bandwidth cost, higher write latency

Option 2: Regional Storage, Federated Query
┌─────────────┐  ┌─────────────┐  ┌─────────────┐
│  US-East    │  │  US-West    │  │  EU-West    │
│ Collectors  │  │ Collectors  │  │ Collectors  │
│ + Storage   │  │ + Storage   │  │ + Storage   │
└──────┬──────┘  └──────┬──────┘  └──────┬──────┘
       │                │                │
       └────────────────┼────────────────┘
                        │
                 ┌──────▼──────┐
                 │  Federated  │
                 │  Query Layer│
                 └─────────────┘

Pros: Low write latency, data locality, GDPR compliance
Cons: Complex query routing, cross-region traces need fan-out

Option 3: Hybrid (Recommended)
- Store spans regionally (low latency writes)
- Replicate trace metadata centrally (lightweight)
- Query router determines which regions to query
- Cross-region trace assembly on demand

Cross-Region Trace Assembly

Challenge: A single trace may span multiple regions
Example: User in EU → EU API Gateway → US-East Payment Service

Solution:
1. Trace metadata index is global (replicated to all regions)
   - Contains: trace_id, regions_involved, root_service, duration
   - Size: ~100 bytes per trace (lightweight replication)

2. Query flow for cross-region trace:
   a. Look up trace_id in global metadata index
   b. Identify regions: [eu-west-1, us-east-1]
   c. Fan-out queries to both regions in parallel
   d. Merge spans from both regions
   e. Assemble complete trace

3. Latency impact:
   - Same-region trace: 50ms (P95)
   - Cross-region trace: 200ms (P95) due to network RTT
   - Acceptable for debugging workflows (not real-time)

Scaling Checklist by Traffic Level

10K spans/sec (Startup):
- 2 collectors (c5.xlarge)
- 3-node Cassandra cluster
- 3-node Elasticsearch cluster
- No Kafka needed (direct write)
- Head-based sampling at 10%

100K spans/sec (Growth):
- 4 collectors (c5.2xlarge)
- 6-node Cassandra cluster
- 6-node Elasticsearch cluster
- 3-node Kafka cluster
- Head-based sampling at 1%
- Add ClickHouse for analytics

1M spans/sec (Scale):
- 20 collectors (c5.2xlarge)
- 12-node Cassandra cluster
- 12-node Elasticsearch cluster
- 6-node Kafka cluster
- 3-node ClickHouse cluster
- Tail-based sampling
- Multi-tier storage

10M spans/sec (Hyperscale):
- 200 collectors (auto-scaled)
- 50+ node Cassandra cluster (multi-DC)
- 30+ node Elasticsearch cluster
- 12-node Kafka cluster
- 6-node ClickHouse cluster
- Adaptive sampling with budget control
- Multi-region deployment
- Object storage for cold data (S3/GCS)

This scaling guide provides a roadmap for growing a distributed tracing system from startup scale to hyperscale, following patterns proven by Uber (Jaeger), Google (Dapper/Cloud Trace), and Datadog APM.