Skip to main content

Custom Autoscaling

Cohestra replaces the Flink Kubernetes Operator's built-in autoscaler with a pluggable SDK-based approach. The Operator autoscaler has known stability issues — rescaling storms, flapping under transient load, and opaque decision-making. Cohestra gives you full control.

Why Replace the Operator Autoscaler?

IssueOperator AutoscalerCohestra SDK Autoscaler
StabilityKnown rescaling storms under bursty loadYou control cooldown and thresholds
MetricsLimited to Flink JMX metricsAny metric source (CloudWatch, Prometheus, Confluent, Datadog)
LogicFixed algorithm, limited tuningYour code, your logic
ObservabilityOpaque scaling decisionsFull audit trail via Temporal
DeploymentCoupled to Operator versionIndependent Lambda/CronJob/process

Architecture

Metric Source Your Autoscaler Cohestra API
(CloudWatch / Prometheus) → (Lambda / CronJob) → POST .../scale
↓ ↓ ↓
Kafka lag evaluate() DeploymentActor
TM CPU % ScaleDecision → RolloutWorkflow
Custom metric parallelism=N → FlinkDeployment

SDK Autoscaler Interface

All three SDKs provide the same pattern:

  1. Subclass AutoscalerBase — implement evaluate(status)
  2. Return a scale decision — target parallelism + reason, or None/null to hold
  3. Run — as a Lambda (one-shot), CronJob, or long-running loop

The SDK handles:

  • Fetching current deployment status
  • Checking health before scaling (won't scale an unhealthy job)
  • Skipping if already at target parallelism
  • Idempotency key generation

1. Kafka Consumer Lag (Most Common)

Best for Kafka-source jobs. Scale based on kafkaLag from Cohestra health summary, CloudWatch MSK metrics, or Confluent Cloud metrics API.

See: Kafka Lag Autoscaler

2. TaskManager CPU Utilization

Best for CPU-bound jobs. Scale based on Flink TaskManager CPU percentage from Prometheus or CloudWatch Container Insights.

See: CPU-Based Autoscaler

3. Composite

Combine multiple signals — scale up on lag OR CPU, scale down only when BOTH are low.

class CompositeAutoscaler(AutoscalerBase):
def evaluate(self, status):
health = status["currentVersion"]["healthSummary"]
current = status["currentVersion"]["spec"]["parallelism"]

lag = health.get("kafkaLag", 0)
bp = health.get("backpressureRatio", 0)

# Scale up if either signal is hot
if lag > 100_000 or bp > 0.7:
return ScaleDecision(min(current * 2, 64), reason=f"lag={lag} bp={bp:.0%}")

# Scale down only if both are cold
if lag < 10_000 and bp < 0.1 and current > 4:
return ScaleDecision(max(current // 2, 4), reason="both signals cold")

return None

Deployment Options

OptionBest ForLatency
AWS Lambda + EventBridgeEKS, serverless, low cost1-minute intervals
Kubernetes CronJobAny K8s, no cloud deps1-minute intervals
Long-running PodFastest reaction timeConfigurable (10s+)
CI/CD stepPost-deploy scalingOn-demand

See: Lambda Deployment Guide