Scaling Considerations

📖 14 min read 📄 Part 6 of 10

Resource Allocation Service - Scaling Considerations

Overview

Scaling a resource allocation service from hundreds to tens of thousands of nodes introduces fundamental challenges in state management, scheduling throughput, and consistency. This section covers strategies used by production systems like Google Borg (centralized, single-cell scheduler), Omega (shared-state with optimistic concurrency), Kubernetes (single scheduler with extension points), and YARN (hierarchical two-level scheduling).


Scheduler Scaling

Partitioned Scheduling

When a single scheduler cannot handle the full cluster, partition the scheduling domain:

Partitioning Strategies:

1. Resource-Type Partitioning:
   ┌─────────────────────────────────────────────────┐
   │              API Server / Queue                   │
   ├────────────┬────────────┬────────────────────────┤
   │ CPU/Memory │    GPU     │   Storage/Network      │
   │ Scheduler  Scheduler  Scheduler            │
   ├────────────┼────────────┼────────────────────────┤
   │ 45K nodes  │ 3K nodes   │   2K nodes             │
   └────────────┴────────────┴────────────────────────┘
   
   Pros: Each scheduler is simpler, domain-specific optimizations
   Cons: Cross-resource scheduling is harder (job needs GPU + CPU)

2. Tenant/Namespace Partitioning:
   ┌─────────────────────────────────────────────────┐
   │              API Server / Router                   │
   ├──────────────┬──────────────┬────────────────────┤
   │ Tenants A-M  │ Tenants N-Z  │  System/Critical   │
   │ Scheduler  Scheduler  Scheduler         │
   ├──────────────┼──────────────┼────────────────────┤
   │ Shared pool  │ Shared pool  │  Reserved pool     │
   └──────────────┴──────────────┴────────────────────┘
   
   Pros: Tenant isolation, independent scaling
   Cons: Resource fragmentation across partitions

3. Zone/Topology Partitioning:
   ┌─────────────────────────────────────────────────┐
   │           Global Scheduler (coarse)               │
   ├──────────────┬──────────────┬────────────────────┤
   │  Zone A      │  Zone B      │  Zone C            │
   │  Scheduler  Scheduler  Scheduler         │
   │  (15K nodes) │  (20K nodes) │  (15K nodes)       │
   └──────────────┴──────────────┴────────────────────┘
   
   Pros: Locality-aware, reduced cross-zone traffic
   Cons: Cross-zone scheduling requires coordination

Hierarchical Scheduling

Two-level scheduling (inspired by Apache Mesos):

Level 1: Resource Offers (Cluster Manager)
- Maintains global view of available resources
- Makes resource offers to framework schedulers
- Enforces fairness and quotas across frameworks
- Does NOT make placement decisions

Level 2: Task Scheduling (Framework Schedulers)
- Receives resource offers from Level 1
- Makes placement decisions for their workloads
- Can accept/reject/partially-accept offers
- Domain-specific optimization (ML, batch, services)

Example Flow:
1. Cluster Manager: "Node-42 has 16 CPU, 64GB free"
2. ML Framework: "I'll take 8 CPU, 32GB for training job"
3. Cluster Manager: "Accepted. Node-42 now has 8 CPU, 32GB free"
4. Batch Framework: "I'll take remaining 8 CPU, 32GB"

Advantages:
- Each framework can implement custom scheduling logic
- Cluster manager stays simple (fairness + offers)
- Scales to many concurrent frameworks
- Failure isolation between frameworks

Disadvantages:
- Offer-reject cycles waste time (resource hoarding)
- No global optimization across frameworks
- Priority inversion possible between frameworks

Scheduling Pipeline Parallelism

Pipeline Stages (can run in parallel for different jobs):

Stage 1: Filtering (eliminate infeasible nodes)
  - Input: 50,000 nodes
  - Output: ~500 feasible nodes
  - Parallelism: Shard node list across filter workers
  - Time: 15ms with 8 parallel workers

Stage 2: Scoring (rank feasible nodes)
  - Input: 500 feasible nodes
  - Output: Ranked list with scores
  - Parallelism: Score different plugins in parallel
  - Time: 25ms with parallel scoring plugins

Stage 3: Binding (claim resources on selected node)
  - Input: Top-scored node
  - Output: Binding confirmation
  - Parallelism: None (must be serialized per node)
  - Time: 10ms

Throughput with Pipeline:
- Without pipelining: 50ms/job = 20 jobs/sec
- With 10-stage pipeline: 500 jobs/sec (10x improvement)
- With 4 parallel schedulers: 2000 jobs/sec

State Management at Scale

Optimistic Concurrency Control

Problem: Multiple schedulers may try to allocate the same node simultaneously.

Solution: Optimistic concurrency with resource versions (Omega model)

Flow:
1. Scheduler A reads node state: {cpu_available: 32000m, version: 42}
2. Scheduler B reads node state: {cpu_available: 32000m, version: 42}
3. Scheduler A writes: {cpu_available: 24000m, version: 43} -> SUCCESS
4. Scheduler B writes: {cpu_available: 16000m, version: 43} -> CONFLICT (version mismatch)
5. Scheduler B re-reads: {cpu_available: 24000m, version: 43}
6. Scheduler B retries: {cpu_available: 8000m, version: 44} -> SUCCESS

Conflict Rate Analysis:
- With 1 scheduler: 0% conflicts
- With 2 schedulers: ~5% conflicts (at 500 alloc/sec each)
- With 4 schedulers: ~15% conflicts
- With 8 schedulers: ~30% conflicts (diminishing returns)

Mitigation:
- Partition scheduling domains to reduce conflicts
- Use exponential backoff on conflicts
- Pre-reserve resources before full binding (two-phase)
- Accept slightly stale state for filtering (only need consistency at bind)

Eventual Consistency for Non-Critical State

State Classification:

Strongly Consistent (etcd/Raft):
- Resource bindings (who owns what)
- Quota counters
- Node registration/deregistration
- Preemption decisions

Eventually Consistent (gossip/cache):
- Node resource utilization metrics
- Scheduling queue depth
- Historical usage statistics
- Cost calculations

Bounded Staleness (read-your-writes):
- Node available resources (stale by up to 1 heartbeat interval)
- Tenant current usage (stale by up to 5 seconds)
- Queue position estimates

Implementation:
- Use etcd watches for critical state propagation (< 100ms)
- Use gossip protocol for metrics (convergence in 2-3 seconds)
- Use local caches with TTL for read-heavy non-critical data
- Accept scheduling on slightly stale data; catch conflicts at bind time

Sharded State Store

Sharding Strategy for Large Clusters:

Option 1: Multiple etcd Clusters (Kubernetes Federation)
- Cluster A etcd: Nodes 1-10000, their allocations
- Cluster B etcd: Nodes 10001-20000, their allocations
- Global etcd: Quotas, policies, cross-cluster state
- Trade-off: Cross-shard queries are expensive

Option 2: CockroachDB (Distributed SQL)
- Automatic range-based sharding
- Serializable transactions across shards
- Higher latency than etcd (5-20ms vs 1-5ms)
- Better for large state (no 8GB limit)
- Trade-off: Higher tail latency for writes

Option 3: Hybrid (Recommended for 50K+ nodes)
- etcd: Critical path state (bindings, leases) - per-zone
- Redis Cluster: Hot state (available resources, heartbeats)
- PostgreSQL: Queryable state (quotas, history, analytics)
- Each tier optimized for its access pattern

Multi-Cluster Federation

Cross-Cluster Scheduling

Federation Architecture:

┌─────────────────────────────────────────────────────────┐
│                  Federation Control Plane                 │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐  │
│  │ Global Policy│  │ Cross-Cluster│  │   Capacity   │  │
│  │   Engine     │  │  Scheduler   │  │  Aggregator  │  │
│  └──────────────┘  └──────────────┘  └──────────────┘  │
└─────────────┬───────────────┬───────────────┬───────────┘
              │               │               │
    ┌─────────▼──┐   ┌───────▼────┐   ┌──────▼─────┐
    │ Cluster A  │   │ Cluster B  │   │ Cluster C  │
    │ us-east-1  │   │ us-west-2  │   │ eu-west-1  │
    │ 15K nodes  │   │ 20K nodes  │   │ 15K nodes  │
    └────────────┘   └────────────┘   └────────────┘

Cross-Cluster Scheduling Triggers:
1. Local cluster at capacity (>90% utilization)
2. Data locality requirements (job needs data in another region)
3. Disaster recovery (cluster failure, failover)
4. Cost optimization (spot capacity in another region)
5. Compliance (data residency requirements)

Scheduling Decision Flow:
1. Job submitted to local cluster
2. Local scheduler determines: can schedule locally? 
3. If no: query federation for capacity across clusters
4. Federation returns: Cluster B has 500 GPU available
5. Job forwarded to Cluster B with cross-cluster metadata
6. Cluster B scheduler places job locally

Resource Sharing Across Clusters

Sharing Models:

1. Dedicated Pools (No Sharing):
   - Each cluster has fixed capacity
   - Simple, predictable
   - Poor utilization during off-peak

2. Burst Sharing:
   - Each cluster has guaranteed minimum
   - Can burst into other clusters' unused capacity
   - Burst allocations are preemptible
   - Better utilization, more complex

3. Global Pool with Affinity:
   - All resources in a global pool
   - Scheduling prefers local placement
   - Cross-cluster only when necessary
   - Best utilization, highest complexity

Implementation Considerations:
- Network latency between clusters: 10-100ms
- State synchronization lag: 1-5 seconds
- Cross-cluster preemption: complex coordination
- Billing: which cluster "owns" the cost?

Bin Packing Optimization

Algorithm Comparison

First-Fit Decreasing (FFD):
- Sort jobs by resource size (largest first)
- Place each job on first node with sufficient capacity
- Time complexity: O(n * m) where n=jobs, m=nodes
- Utilization: ~85% of optimal
- Best for: Fast scheduling, acceptable waste

Best-Fit Decreasing (BFD):
- Sort jobs by resource size (largest first)
- Place each job on node with LEAST remaining capacity after placement
- Time complexity: O(n * m)
- Utilization: ~90% of optimal
- Best for: Maximizing utilization, reducing fragmentation

Worst-Fit:
- Place job on node with MOST remaining capacity
- Spreads load evenly across nodes
- Lower utilization but better for burst capacity
- Best for: Latency-sensitive workloads needing headroom

Multi-Dimensional Bin Packing:
- Resources are multi-dimensional (CPU, memory, GPU, network)
- NP-hard in general case
- Heuristic: Dominant Resource Fairness (DRF)
- Score = weighted sum of dimension utilizations
- Kubernetes uses: LeastRequestedPriority + MostRequestedPriority

Score Calculation Example:
  Node has: 64 CPU, 256GB RAM, 8 GPU
  After placement: 48 CPU used, 200GB used, 6 GPU used
  
  CPU utilization: 48/64 = 75%
  Memory utilization: 200/256 = 78%
  GPU utilization: 6/8 = 75%
  
  Balance score = 1 - stddev(75%, 78%, 75%) = 0.98 (well balanced)
  Packing score = avg(75%, 78%, 75%) = 76% (good utilization)

Kubernetes Scoring Plugins

Default Scoring (simplified):

1. LeastRequestedPriority (spread):
   score = (capacity - requested) / capacity * 100
   Prefers nodes with more free resources

2. MostRequestedPriority (pack):
   score = requested / capacity * 100
   Prefers nodes that are already heavily used

3. BalancedResourceAllocation:
   score = 100 - abs(cpuFraction - memoryFraction) * 100
   Prefers nodes where CPU and memory are balanced

4. ImageLocality:
   score = sum(image_sizes_present) / sum(all_image_sizes) * 100
   Prefers nodes that already have container images

Production Configuration:
- Batch workloads: Weight MostRequestedPriority high (pack tightly)
- Service workloads: Weight LeastRequestedPriority high (spread for resilience)
- GPU workloads: Custom plugin for NUMA/topology awareness

Gang Scheduling for Distributed Jobs

Problem Statement

Distributed training jobs (e.g., 8-GPU across 8 nodes) require all members to start simultaneously. Partial allocation wastes resources (7 nodes allocated, waiting for 8th).

Implementation

Gang Scheduling Algorithm:

1. Job arrives: "Need 8 pods, each with 1 GPU, all-or-nothing"
2. Scheduler checks: Are 8 suitable nodes available?
3. If yes: Reserve all 8 simultaneously (atomic operation)
4. If no: 
   a. Check if preemption can free enough resources
   b. If yes: Initiate preemption, hold reservation
   c. If no: Queue entire gang, release any partial reservations

Reservation Protocol:
- Phase 1 (Try): Attempt to reserve on all target nodes
- Phase 2 (Confirm/Abort): 
  - If all reservations succeed: Confirm all (bind)
  - If any reservation fails: Abort all (release)
- Timeout: If confirmation doesn't arrive in 30s, auto-abort

Challenges:
- Deadlock: Gang A holds 4 nodes, needs 4 more; Gang B holds those 4
- Solution: Priority-based deadlock resolution (lower priority gang yields)
- Fragmentation: 8 single-GPU requests easier to place than 1 eight-GPU request
- Solution: Coscheduling with backfill (fill gaps with smaller jobs)

Coscheduling Implementation (Kubernetes):
- PodGroup CRD defines gang membership
- Scheduler plugin: CoschedulingPlugin
- Permit phase: Hold pod until all gang members are schedulable
- Timeout: Release all if gang can't be fully scheduled within deadline

Resource Fragmentation and Defragmentation

Fragmentation Problem

Example: 4 nodes, each with 8 GPU

Initial state (well-packed):
  Node 1: [████████] 8/8 GPU used
  Node 2: [████████] 8/8 GPU used
  Node 3: [████░░░░] 4/8 GPU used
  Node 4: [░░░░░░░░] 0/8 GPU used

After some jobs complete (fragmented):
  Node 1: [█░█░█░░░] 3/8 GPU used (scattered)
  Node 2: [░█░█░░░░] 2/8 GPU used (scattered)
  Node 3: [████░░░░] 4/8 GPU used (contiguous)
  Node 4: [░░░░░░░░] 0/8 GPU used

Problem: New job needs 4 contiguous GPUs on same node
- Node 3 can serve it (4 contiguous free)
- Nodes 1,2 cannot (GPUs are scattered)
- Total free: 13 GPUs, but only 4 contiguous available

Defragmentation Strategies

1. Compaction (Live Migration):
   - Move running allocations to consolidate free space
   - Expensive: network transfer, brief downtime
   - Best for: Long-running services that can tolerate brief disruption
   - Trigger: When fragmentation score exceeds threshold

2. Proactive Bin Packing:
   - Score nodes by contiguous free resources
   - Prefer placing new jobs to maintain contiguity
   - Avoid "swiss cheese" allocation patterns
   - Cost: Slightly suboptimal initial placement

3. Descheduler (Periodic Rebalancing):
   - Background process that identifies poorly-placed allocations
   - Evicts and reschedules to improve packing
   - Respects PodDisruptionBudgets (max 1 eviction at a time)
   - Runs every 5-10 minutes

4. Reservation-Based Defrag:
   - Reserve contiguous blocks for large jobs
   - Gradually migrate small jobs off reserved blocks
   - Ensures large jobs can always be scheduled

Overcommitment and Burst Capacity

Overcommit Strategy

Resource Overcommit Model:

Physical Capacity:     [████████████████████] 100%
Allocatable:           [██████████████████░░] 90% (10% system reserved)
Sum of Requests:       [████████████████████████████] 140% (1.4x overcommit on CPU)
Sum of Limits:         [████████████████████████████████████] 180%
Actual Usage (typical):[████████████░░░░░░░░] 60%
Actual Usage (peak):   [██████████████████░░] 90%

Safety Mechanisms:
1. CPU: Throttle (CFS bandwidth) when node CPU > 95%
2. Memory: OOM kill lowest-priority pod when node memory > 95%
3. Disk: Evict pods exceeding ephemeral storage limits
4. Network: Traffic shaping when bandwidth > 80%

QoS Classes (Kubernetes model):
- Guaranteed: requests == limits (never overcommitted, last to be evicted)
- Burstable: requests < limits (can burst, evicted before Guaranteed)
- BestEffort: no requests/limits (fully overcommitted, first to be evicted)

Eviction Order (when node under pressure):
1. BestEffort pods exceeding no limits
2. Burstable pods exceeding requests
3. Burstable pods within requests (by priority)
4. Guaranteed pods (only in extreme cases, by priority)

Burst Capacity Management

Burst Quota Model:
- Base quota: 100 GPU (guaranteed, always available)
- Burst quota: 150 GPU (can use up to 150, but 50 are preemptible)
- Burst window: 4 hours maximum
- Cooldown: 1 hour between burst periods

Implementation:
1. Tenant requests 120 GPU (exceeds base quota of 100)
2. System checks: burst quota allows up to 150
3. System checks: burst capacity available in cluster? Yes
4. Allocation granted with burst flag
5. If cluster needs capacity: burst allocations preempted first
6. Tenant notified: "20 GPU in burst mode, may be reclaimed"

Auto-Scaling Integration

Cluster Autoscaler

Scale-Up Trigger:
1. Pods pending in queue for > 30 seconds
2. No feasible nodes for pending pods
3. Autoscaler calculates: what node types would satisfy pending pods?
4. Autoscaler requests new nodes from cloud provider
5. New nodes register with scheduler (3-5 minutes for cloud VMs)

Scale-Down Trigger:
1. Node utilization < 50% for > 10 minutes
2. All pods on node can be rescheduled elsewhere
3. No pods with local storage or PodDisruptionBudget violations
4. Autoscaler cordons node, drains pods, terminates instance

Configuration:
  scan-interval: 10s
  scale-down-delay-after-add: 10m
  scale-down-unneeded-time: 10m
  scale-down-utilization-threshold: 0.5
  max-nodes-total: 50000
  max-node-provision-time: 15m
  expander: priority  # Which node pool to scale

Node Pool Priority:
1. Spot/preemptible instances (cheapest, for batch)
2. On-demand standard instances (for services)
3. GPU instances (expensive, only when GPU jobs pending)

Vertical Pod Autoscaler (VPA)

VPA Workflow:
1. Monitor actual resource usage over time (7-day window)
2. Calculate recommended requests/limits based on usage patterns
3. Apply recommendations:
   - UpdateMode: "Auto" - evict and recreate with new resources
   - UpdateMode: "Initial" - only apply to new pods
   - UpdateMode: "Off" - only recommend, don't apply

Example:
  Current: requests={cpu: 4000m, memory: 8Gi}
  Actual usage: cpu P95=800m, memory P95=2Gi
  Recommendation: requests={cpu: 1000m, memory: 3Gi}
  Savings: 75% CPU, 62% memory (returned to cluster pool)

Impact on Scheduler:
- VPA reduces over-provisioning
- More accurate requests = better bin packing
- Fewer OOM kills (limits set based on actual peaks)
- Cluster-wide: 20-40% improvement in utilization

Horizontal Pod Autoscaler (HPA) Interaction

HPA + Scheduler Interaction:

1. HPA detects: CPU utilization > 80% target
2. HPA scales: replicas 5 -> 10
3. 5 new pods enter scheduling queue
4. Scheduler places pods on available nodes
5. If no capacity: triggers Cluster Autoscaler
6. New nodes added, pods scheduled
7. Total scale-up time: 30s (existing capacity) to 5min (new nodes)

Predictive Scaling:
- Use historical patterns to pre-scale before demand
- Monday 9am: pre-scale 30 minutes before traffic spike
- Reduces cold-start latency from 5min to 30s
- Requires ML model trained on historical usage patterns

Performance Benchmarks

Scheduling Throughput (measured):

Single Scheduler (Kubernetes default):
- 100 nodes: 200 pods/sec
- 1000 nodes: 150 pods/sec
- 5000 nodes: 100 pods/sec (official limit)

Optimized Scheduler (with techniques above):
- 5000 nodes: 500 pods/sec (5x improvement)
- 50000 nodes: 200 pods/sec (partitioned)
- 50000 nodes: 1000 pods/sec (4 parallel schedulers)

Key Optimizations Applied:
1. Node filtering parallelism: 3x improvement
2. Score caching (5s TTL): 2x improvement
3. Batch binding (10 binds/batch): 1.5x improvement
4. Reduced etcd writes (async status): 2x improvement
5. Precomputed equivalence classes: 1.5x improvement

Combined: ~10x improvement over naive implementation

These scaling strategies enable the resource allocation service to grow from a single-cluster deployment to a multi-region federation managing millions of concurrent allocations.