Database Design

📖 10 min read 📄 Part 4 of 10

Distributed Tracing System - Database Design

Overview

The database design for a distributed tracing system must optimize for two fundamentally different access patterns: high-throughput append-only writes (span ingestion) and complex read queries (trace retrieval, search, analytics). This dual nature drives a polyglot persistence approach where different storage engines serve different query patterns. Production systems like Jaeger use Cassandra + Elasticsearch, while newer systems like Tempo use object storage with a search index.


Span Schema

Core Span Data Model

// OpenTelemetry Span Protocol Buffer Definition
message Span {
  bytes trace_id = 1;           // 16 bytes, globally unique
  bytes span_id = 2;            // 8 bytes, unique within a trace
  bytes parent_span_id = 3;     // 8 bytes, empty for root span
  string name = 4;              // operation name
  SpanKind kind = 5;            // CLIENT, SERVER, PRODUCER, CONSUMER, INTERNAL
  fixed64 start_time_unix_nano = 6;
  fixed64 end_time_unix_nano = 7;
  repeated KeyValue attributes = 8;
  repeated Event events = 10;
  repeated Link links = 12;
  Status status = 14;
  Resource resource = 15;
}

message Event {
  fixed64 time_unix_nano = 1;
  string name = 2;
  repeated KeyValue attributes = 3;
}

message Link {
  bytes trace_id = 1;
  bytes span_id = 2;
  repeated KeyValue attributes = 3;
}

message Status {
  StatusCode code = 1;    // UNSET, OK, ERROR
  string message = 2;
}

Semantic Conventions (Standard Tags)

HTTP Spans:
- http.method: GET, POST, PUT, DELETE
- http.url: full URL
- http.status_code: 200, 404, 500
- http.request_content_length: bytes

Database Spans:
- db.system: mysql, postgresql, redis
- db.statement: "SELECT * FROM users WHERE id = ?"
- db.operation: SELECT, INSERT, UPDATE

RPC Spans:
- rpc.system: grpc, thrift
- rpc.service: service name
- rpc.method: method name

Messaging Spans:
- messaging.system: kafka, rabbitmq, sqs
- messaging.destination: topic/queue name
- messaging.operation: send, receive, process

Trace Assembly Data Model

Trace as a Directed Acyclic Graph (DAG)

A trace is a collection of spans sharing the same trace_id.
Spans form a DAG via parent_span_id references.
Root span has no parent (entry point of the request).

Example Trace DAG:
                [API Gateway: GET /order/123]
                       (root span)
                      /           \
    [Auth: validate]    [Order Service: getOrder]
                          /        |         \
            [DB: SELECT]  [Cache: GET]  [Payment: getStatus]
                                              |
                                        [DB: SELECT payment]

Assembly Algorithm:
1. Collect all spans with matching trace_id
2. Build parent-to-children adjacency list
3. Identify root(s): spans with no parent_span_id
4. Compute critical path (longest sequential chain)
5. Calculate trace duration: max(end_time) - min(start_time)

Trace Metadata (Computed at Assembly)

{
  "trace_id": "abc123def456...",
  "root_service": "api-gateway",
  "root_operation": "GET /order/123",
  "start_time": "2024-01-15T10:30:00.000Z",
  "duration_ms": 245,
  "span_count": 7,
  "service_count": 4,
  "services": ["api-gateway", "auth-service", "order-service", "payment-service"],
  "has_error": false,
  "critical_path_ms": 220,
  "depth": 4,
  "width": 3
}

Primary Storage: Cassandra

Schema Design

CREATE KEYSPACE tracing WITH replication = {
  'class': 'NetworkTopologyStrategy',
  'us-east-1': 3,
  'us-west-2': 3
};

-- Main spans table: partitioned by trace_id
CREATE TABLE tracing.spans_by_trace (
    trace_id        blob,
    span_id         blob,
    parent_span_id  blob,
    operation_name  text,
    service_name    text,
    span_kind       tinyint,
    start_time      timestamp,
    duration_us     bigint,
    status_code     tinyint,
    status_message  text,
    tags            map<text, text>,
    events          list<frozen<span_event>>,
    links           list<frozen<span_link>>,
    process_tags    map<text, text>,
    PRIMARY KEY (trace_id, span_id)
) WITH compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_size': '1',
    'compaction_window_unit': 'HOURS'
  }
  AND default_time_to_live = 604800
  AND gc_grace_seconds = 86400;

CREATE TYPE tracing.span_event (
    timestamp   timestamp,
    name        text,
    attributes  map<text, text>
);

CREATE TYPE tracing.span_link (
    trace_id    blob,
    span_id     blob,
    attributes  map<text, text>
);

-- Service index: find traces by service and time range
CREATE TABLE tracing.traces_by_service (
    service_name    text,
    date_bucket     text,
    start_time      timestamp,
    trace_id        blob,
    operation_name  text,
    duration_us     bigint,
    has_error       boolean,
    span_count      int,
    PRIMARY KEY ((service_name, date_bucket), start_time, trace_id)
) WITH CLUSTERING ORDER BY (start_time DESC, trace_id ASC)
  AND default_time_to_live = 604800;

-- Duration index: find slow traces
CREATE TABLE tracing.traces_by_duration (
    service_name    text,
    date_bucket     text,
    duration_bucket text,
    duration_us     bigint,
    trace_id        blob,
    start_time      timestamp,
    operation_name  text,
    PRIMARY KEY ((service_name, date_bucket, duration_bucket), duration_us, trace_id)
) WITH CLUSTERING ORDER BY (duration_us DESC, trace_id ASC)
  AND default_time_to_live = 604800;

-- Tag index: find traces by specific tag values
CREATE TABLE tracing.traces_by_tag (
    service_name    text,
    tag_key         text,
    tag_value       text,
    date_bucket     text,
    start_time      timestamp,
    trace_id        blob,
    duration_us     bigint,
    PRIMARY KEY ((service_name, tag_key, tag_value, date_bucket), start_time, trace_id)
) WITH CLUSTERING ORDER BY (start_time DESC, trace_id ASC)
  AND default_time_to_live = 604800;

Write Path Optimization

Batch Write Strategy:
1. Group spans by trace_id (same partition)
2. Use unlogged batches for spans within same trace
3. Write index entries asynchronously
4. Use LOCAL_QUORUM for spans, LOCAL_ONE for indexes

Write Amplification:
- 1 span write produces: 1 spans_by_trace + 1 traces_by_service +
  1 traces_by_duration + N traces_by_tag (per indexed tag)
- Typical amplification: 5-8x per span
- Mitigated by: selective tag indexing, async index writes

Search Storage: Elasticsearch

Index Template

{
  "index_patterns": ["jaeger-span-*"],
  "settings": {
    "number_of_shards": 5,
    "number_of_replicas": 1,
    "index.refresh_interval": "5s",
    "index.translog.durability": "async",
    "index.codec": "best_compression",
    "index.mapping.total_fields.limit": 2000
  },
  "mappings": {
    "properties": {
      "traceID": { "type": "keyword" },
      "spanID": { "type": "keyword" },
      "parentSpanID": { "type": "keyword" },
      "operationName": { "type": "keyword" },
      "serviceName": { "type": "keyword" },
      "startTime": { "type": "date", "format": "epoch_micros" },
      "startTimeMillis": { "type": "date" },
      "duration": { "type": "long" },
      "tags": {
        "type": "nested",
        "properties": {
          "key": { "type": "keyword" },
          "value": { "type": "keyword" }
        }
      },
      "tag": {
        "type": "object",
        "properties": {
          "http_method": { "type": "keyword" },
          "http_status_code": { "type": "keyword" },
          "http_url": { "type": "keyword" },
          "error": { "type": "boolean" }
        }
      },
      "process": {
        "properties": {
          "serviceName": { "type": "keyword" },
          "tags": {
            "type": "nested",
            "properties": {
              "key": { "type": "keyword" },
              "value": { "type": "keyword" }
            }
          }
        }
      }
    }
  }
}

Index Lifecycle Management

{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": { "max_size": "50gb", "max_age": "1d" },
          "set_priority": { "priority": 100 }
        }
      },
      "warm": {
        "min_age": "2d",
        "actions": {
          "shrink": { "number_of_shards": 1 },
          "forcemerge": { "max_num_segments": 1 },
          "allocate": { "require": { "data": "warm" } }
        }
      },
      "cold": {
        "min_age": "7d",
        "actions": {
          "freeze": {},
          "allocate": { "require": { "data": "cold" } }
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": { "delete": {} }
      }
    }
  }
}

Analytics Storage: ClickHouse

Schema for Aggregated Analytics

CREATE TABLE spans (
    trace_id        FixedString(16),
    span_id         FixedString(8),
    parent_span_id  FixedString(8),
    service_name    LowCardinality(String),
    operation_name  LowCardinality(String),
    span_kind       Enum8('INTERNAL'=0,'SERVER'=1,'CLIENT'=2,'PRODUCER'=3,'CONSUMER'=4),
    start_time      DateTime64(6),
    duration_us     UInt64,
    status_code     Enum8('UNSET'=0, 'OK'=1, 'ERROR'=2),
    http_method     LowCardinality(Nullable(String)),
    http_status_code Nullable(UInt16),
    tags            Map(String, String),
    INDEX idx_service service_name TYPE set(100) GRANULARITY 4,
    INDEX idx_duration duration_us TYPE minmax GRANULARITY 4
) ENGINE = MergeTree()
PARTITION BY toDate(start_time)
ORDER BY (service_name, operation_name, start_time, trace_id)
TTL start_time + INTERVAL 30 DAY
SETTINGS index_granularity = 8192;

-- Pre-aggregated service metrics
CREATE MATERIALIZED VIEW service_metrics_mv
ENGINE = SummingMergeTree()
PARTITION BY toDate(time_bucket)
ORDER BY (service_name, operation_name, time_bucket)
AS SELECT
    service_name,
    operation_name,
    toStartOfMinute(start_time) AS time_bucket,
    count() AS request_count,
    countIf(status_code = 'ERROR') AS error_count,
    sum(duration_us) AS total_duration_us,
    min(duration_us) AS min_duration_us,
    max(duration_us) AS max_duration_us
FROM spans
WHERE span_kind = 'SERVER'
GROUP BY service_name, operation_name, time_bucket;

-- Service dependency graph
CREATE MATERIALIZED VIEW service_dependencies_mv
ENGINE = SummingMergeTree()
PARTITION BY toDate(time_bucket)
ORDER BY (caller_service, callee_service, time_bucket)
AS SELECT
    s1.service_name AS caller_service,
    s2.service_name AS callee_service,
    toStartOfHour(s1.start_time) AS time_bucket,
    count() AS call_count,
    countIf(s2.status_code = 'ERROR') AS error_count,
    avg(s2.duration_us) AS avg_duration_us
FROM spans s1
INNER JOIN spans s2 ON s1.trace_id = s2.trace_id
    AND s1.span_id = s2.parent_span_id
WHERE s1.service_name != s2.service_name
GROUP BY caller_service, callee_service, time_bucket;

Time-Series Partitioning Strategy

Partition Design

Time-Based Partitioning:
| Storage Engine | Partition Key        | Partition Size  |
|----------------|---------------------|-----------------|
| Cassandra      | (service, date)     | 1-10 GB         |
| Elasticsearch  | Daily index rollover| ~50 GB/index    |
| ClickHouse     | toDate(start_time)  | 50-100 GB       |

Avoiding Hot Partitions in Cassandra:
- Problem: Popular services create hot partitions
- Solution: Composite partition key with time bucketing

-- Distribute across time buckets
PRIMARY KEY ((service_name, date_bucket), start_time)

-- Add hash bucket for very hot services
PRIMARY KEY ((service_name, date_bucket, hash_bucket), start_time)
-- hash_bucket = trace_id % 16

Tag Indexing for Flexible Queries

Tag Index Strategy

Challenge: Tags are arbitrary key-value pairs with high cardinality
- Total unique tag keys: ~500
- High-cardinality: user_id, request_id (millions of values)
- Low-cardinality: http.method, status_code (tens of values)

Indexing Approach:
1. Whitelist indexed tags (configured per service):
   - http.method, http.status_code, error, component
   - Custom business tags: customer_tier, feature_flag

2. Skip high-cardinality tags in indexes:
   - user_id, session_id → only in raw span storage
   - Queryable via trace_id lookup, not via search

3. Bloom filters for existence queries:
   - "Does any span have tag X=Y?"
   - False positive rate: 1%
   - Memory: ~10 bits per entry

Cardinality Management

-- Track tag cardinality for index decisions
SELECT
    tag_key,
    uniqExact(tag_value) as cardinality,
    count() as usage_count
FROM spans
ARRAY JOIN mapKeys(tags) AS tag_key, mapValues(tags) AS tag_value
WHERE start_time > now() - INTERVAL 1 DAY
GROUP BY tag_key
ORDER BY cardinality DESC;

-- Results guide indexing:
-- request_id       | 50,000,000 → DO NOT INDEX
-- http.url         | 5,000      → INDEX (moderate)
-- http.method      | 7          → INDEX (low cardinality)
-- http.status_code | 25         → INDEX (low cardinality)

Service Dependency Graph Storage

-- Cassandra: Current service graph
CREATE TABLE tracing.service_graph (
    caller_service  text,
    callee_service  text,
    protocol        text,
    last_seen       timestamp,
    call_rate_per_sec float,
    error_rate      float,
    avg_duration_ms float,
    PRIMARY KEY (caller_service, callee_service, protocol)
);

-- Operations per service
CREATE TABLE tracing.service_operations (
    service_name    text,
    operation_name  text,
    span_kind       text,
    last_seen       timestamp,
    PRIMARY KEY (service_name, operation_name)
);

Retention and TTL Policies

Multi-Tier Retention

| Data Type              | Hot    | Warm   | Cold    | Archive |
|------------------------|--------|--------|---------|---------|
| Raw spans              | 2 days | 7 days | 30 days | -       |
| Span indexes           | 2 days | 7 days | 14 days | -       |
| Service metrics (1min) | 7 days | 30 days| -       | -       |
| Service metrics (1hr)  | 30 days| 90 days| 1 year  | -       |
| Dependency graph       | 30 days| 90 days| 1 year  | -       |
| Error traces           | 7 days | 30 days| 90 days | 1 year  |

Selective Retention (Keep Important Traces Longer):
1. Error traces (status_code = ERROR): 90 days
2. Slow traces (duration > P99): 30 days
3. Traces matching alert rules: 90 days
4. Manually bookmarked traces: 1 year
5. Traces linked to incidents: 1 year

Storage Engine Comparison

| Criteria         | Cassandra     | Elasticsearch | ClickHouse      |
|------------------|---------------|---------------|-----------------|
| Write throughput | Excellent     | Good          | Excellent       |
| Read by ID       | Excellent     | Good          | Good            |
| Full-text search | Poor          | Excellent     | Moderate        |
| Aggregations     | Poor          | Moderate      | Excellent       |
| Compression      | Good (3:1)    | Moderate (2:1)| Excellent (10:1)|
| Operational cost | Moderate      | High          | Low             |
| Best for         | Span storage  | Search/filter | Analytics       |

Recommended Architecture:
- Primary storage: Cassandra (all spans, trace-by-ID retrieval)
- Search index: Elasticsearch (tag search, filtering)
- Analytics: ClickHouse (aggregations, percentiles, dashboards)
- Long-term: S3/GCS (archived traces, cost-effective retention)

This database design supports the full spectrum of distributed tracing workloads from high-throughput ingestion to complex analytical queries, following patterns proven at scale by Jaeger, Zipkin, and commercial APM systems.