Skip to main content

CPU-Based Autoscaler

Scale Flink parallelism based on TaskManager CPU utilization — best for CPU-bound jobs where Kafka lag isn't the bottleneck.

Strategy

if avg_cpu > 80%:
parallelism = ceil(current * avg_cpu / target_cpu) # scale up
elif avg_cpu < 30% and parallelism > min:
parallelism = max(ceil(current * avg_cpu / target_cpu), min) # scale down

Query Flink's TaskManager CPU metrics exposed via the Prometheus reporter.

import math
import requests
from cohestra_sdk import CohestraClient, AutoscalerBase, ScaleDecision

class CPUAutoscaler(AutoscalerBase):
TARGET_CPU = 0.60 # 60% target utilization
SCALE_UP_CPU = 0.80 # scale up above 80%
SCALE_DOWN_CPU = 0.30 # scale down below 30%
MIN_PARALLELISM = 2
MAX_PARALLELISM = 64

def __init__(self, client, env, ns, name, prometheus_url):
super().__init__(client, env, ns, name)
self.prometheus = prometheus_url

def _get_avg_cpu(self) -> float:
query = (
'avg(rate(flink_taskmanager_Status_JVM_CPU_Load'
'{job="flink-taskmanager"}[5m]))'
)
resp = requests.get(
f"{self.prometheus}/api/v1/query",
params={"query": query},
)
resp.raise_for_status()
results = resp.json().get("data", {}).get("result", [])
if not results:
return 0.0
return float(results[0]["value"][1])

def evaluate(self, status):
current = status["currentVersion"]["spec"]["parallelism"]
avg_cpu = self._get_avg_cpu()

if avg_cpu > self.SCALE_UP_CPU:
target = min(
math.ceil(current * avg_cpu / self.TARGET_CPU),
self.MAX_PARALLELISM,
)
if target > current:
return ScaleDecision(target, reason=f"cpu={avg_cpu:.0%}")

if avg_cpu < self.SCALE_DOWN_CPU and current > self.MIN_PARALLELISM:
target = max(
math.ceil(current * avg_cpu / self.TARGET_CPU),
self.MIN_PARALLELISM,
)
if target < current:
return ScaleDecision(target, reason=f"cpu={avg_cpu:.0%} low")

return None

Using CloudWatch Container Insights (EKS)

For EKS clusters with Container Insights enabled, read TaskManager pod CPU from CloudWatch.

import boto3
import math
from datetime import datetime, timedelta
from cohestra_sdk import CohestraClient, AutoscalerBase, ScaleDecision

class EKSCPUAutoscaler(AutoscalerBase):
def __init__(self, client, env, ns, name, cluster_name, flink_deployment):
super().__init__(client, env, ns, name)
self.cw = boto3.client("cloudwatch")
self.cluster = cluster_name
self.deployment = flink_deployment

def _get_avg_cpu(self) -> float:
resp = self.cw.get_metric_statistics(
Namespace="ContainerInsights",
MetricName="pod_cpu_utilization",
Dimensions=[
{"Name": "ClusterName", "Value": self.cluster},
{"Name": "Namespace", "Value": "streaming"},
{"Name": "PodName", "Value": f"{self.deployment}-taskmanager"},
],
StartTime=datetime.utcnow() - timedelta(minutes=5),
EndTime=datetime.utcnow(),
Period=300,
Statistics=["Average"],
)
points = resp.get("Datapoints", [])
if not points:
return 0.0
return max(p["Average"] for p in points) / 100.0

def evaluate(self, status):
current = status["currentVersion"]["spec"]["parallelism"]
cpu = self._get_avg_cpu()

if cpu > 0.80:
target = min(math.ceil(current * cpu / 0.60), 64)
if target > current:
return ScaleDecision(target, reason=f"eks_cpu={cpu:.0%}")

if cpu < 0.30 and current > 2:
target = max(math.ceil(current * cpu / 0.60), 2)
if target < current:
return ScaleDecision(target, reason=f"eks_cpu={cpu:.0%} low")

return None

Using Cohestra Backpressure Ratio

Cohestra's health summary includes backpressureRatio — a proxy for CPU saturation. No external metric source needed.

from cohestra_sdk import AutoscalerBase, ScaleDecision

class BackpressureAutoscaler(AutoscalerBase):
def evaluate(self, status):
health = status["currentVersion"]["healthSummary"]
current = status["currentVersion"]["spec"]["parallelism"]
bp = health.get("backpressureRatio", 0)

if bp > 0.7 and current < 64:
return ScaleDecision(min(current * 2, 64), reason=f"backpressure={bp:.0%}")

if bp < 0.1 and current > 4:
return ScaleDecision(max(current // 2, 4), reason="low backpressure")

return None

Tuning

ParameterDefaultDescription
TARGET_CPU60%Desired utilization. Scale decisions aim for this level.
SCALE_UP_CPU80%Trigger scale-up above this threshold
SCALE_DOWN_CPU30%Trigger scale-down below this threshold
MIN/MAX_PARALLELISM2 / 64Bounds for scaling decisions

Tip: The gap between TARGET_CPU and SCALE_UP_CPU provides headroom for traffic spikes without triggering a scale. A wider gap means fewer scaling events but less efficient resource usage.