Skip to content

Conversation

@Fly-Style
Copy link
Contributor

@Fly-Style Fly-Style commented Dec 5, 2025

Cost-Based Autoscaler for Seekable Stream Supervisors

Overview

Implements a cost-based autoscaling algorithm for seekable stream supervisor tasks that optimizes task count by balancing lag reduction against resource efficiency.

Note: this patch doesn't support autoscaling (down) during task rollover. Temporarily, it scales down in the same manner as scales up.
Introduces WeightedCostFunction for cost-based autoscaling decisions. The function computes a cost score (in seconds) for each candidate task count, balancing lag recovery time against idle resource waste.

Key Design Decisions

Cost Formula

totalCost = lagWeight × lagRecoveryTime + idleWeight × idlenessCost
  • lagRecoveryTime = aggregateLag / (taskCount × avgProcessingRate) — time to clear backlog
  • idlenessCost = taskCount × taskDuration × predictedIdleRatio — wasted compute time

Idle Prediction Model

Uses capacity-based linear scaling:

predictedIdle = 1 - (1 - currentIdle) / (proposedTasks / currentTasks)

More tasks → more idle per task; fewer tasks → busier tasks.

Ideal Idle Range

Defines optimal utilization as idle ratio within [0.2, 0.6]:

  • Below 0.2: overloaded → scale up
  • Within range: optimal → no action
  • Above 0.6: underutilized → scale down

Conservative Cold Start Behavior

When processing rate is unavailable (cold start, new tasks):

  • Current task count: cost = 0.01 (allowed)
  • Any scaling: cost = +∞ (prohibited)

This prevents scaling decisions based on incomplete data.

Additionally, we add reading poll-idle ratio-avg from /rowStats task endpoint.


This PR has:

  • been self-reviewed.
  • added documentation for new or modified features or behaviors.
  • a release note entry in the PR description.
  • added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • added or updated version, license, or notice information in licenses.yaml
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • added integration tests.
  • been tested in a test Druid cluster.

@Fly-Style Fly-Style changed the title Cost-based autoscaler: first raw version Cost-based autoscaler Dec 5, 2025
@jtuglu1
Copy link
Contributor

jtuglu1 commented Dec 6, 2025

While I think this will be very useful, the primary issue we've run into with the current scaler is that it needs to shut down tasks in order to scale (causes a lot of lag during this process). #18466 was working on a way to fix this.

I think this will help the scaler be smarter on each scale, but each scale action still costs a lot to do.

@Fly-Style
Copy link
Contributor Author

@jtuglu1 thanks for your input, appreciate it! The aim to make more capable / tunable autoscaler, while in parallel we will make improvements proposed in #18466 .

@Fly-Style Fly-Style marked this pull request as ready for review December 9, 2025 16:34
Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the new auto-scaler strategy, @Fly-Style !
I really like the idea of assigning a cost value to a potential task count as it helps reason about our choices! Overall, the patch looks good.

I am leaving a partial review here. I am yet to go through the WeightedCostFunction and some other aspects of the patch. Will post the remaining comments today.

}

final int currentTaskCount = currentMetrics.getCurrentTaskCount();
final List<Integer> validTaskCounts = FACTORS_CACHE.computeIfAbsent(
Copy link
Contributor

@kfaraz kfaraz Dec 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the computeFactors() computation really heavy enough to require caching?
Especially since we are imposing a max SCALE_FACTOR_DISCRETE_DISTANCE.

How about we simplify computeFactors so that we compute only the required factors?

Example:

List<Integer> computeValidTaskCounts(int partitionCount, int currentTaskCount)
{
   final int currentPartitionsPerTask = partitionCount / currentTaskCount;
   final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 2);
   final int maxPartitionsPerTask = Math.min(partitionCount, currentPartitionsPerTask + 2);

   return IntStream.of(minPartitionPerTask, maxPartitionsPerTask + 1)
                   .map(partitionsPerTask -> (partitionCount / partitionsPerTask) + Math.min(partitionCount % partitionsPerTask, 1))
                   .collect(Collectors.toList();
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed on voice chat, we leave current implementation as is, but slightly enhancing the way how it data is saved in cache.

@Fly-Style
Copy link
Contributor Author

Fly-Style commented Dec 10, 2025

@kfaraz thanks for the review. I addressed most of the comments in e5b40c7 , except comment regarding using the correct metrica for poll-idle-ratio. Unfortunately, initial plan with measuring poll from the consumer was not correct, more details in this comment. I simply did not know it measures a bit wrong thing. :(

Will do a separate endpoint for correct metrics to fetch correct metrics from all tasks and calculate average with consequent data normalization in separate commit.

cc @cryptoe

@Fly-Style Fly-Style requested a review from kfaraz December 11, 2025 19:28
@Fly-Style Fly-Style changed the title Cost-based autoscaler Introduce cost-based tasks autoscaler Dec 12, 2025
@Fly-Style Fly-Style changed the title Introduce cost-based tasks autoscaler Introduce cost-based tasks autoscaler for streaming ingestion Dec 12, 2025
Comment on lines 123 to 125
config.getScaleActionStartDelayMillis(),
config.getScaleActionPeriodMillis(),
TimeUnit.MILLISECONDS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I meant that we should not have the start delay, period and collection interval as configs at all. Even with good defaults, unnecessary configs only complicate admin work and require code to handle all possible scenarios.

At most, maybe keep just one config scaleActionPeriod that can be specified as an ISO period (e.g. PT1M) or something (mostly since you would be using this in embedded tests). The other configs don't really add any value. They are legacy configs in lag-based auto-scaler which we might as well avoid adding in the new strategy.

if (optimalTaskCount > currentTaskCount) {
return optimalTaskCount;
} else if (optimalTaskCount < currentTaskCount) {
supervisor.getIoConfig().setTaskCount(optimalTaskCount);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line can have behavioural side effects in the supervisor, since the taskCount should always reflect the current running task count and not the desired task count. Here we are updating the taskCount without actually changing the number of tasks, or suspending the supervisor.

Instead, we could do the following:

  • Add an auto-scaler method isScaleDownOnRolloverOnly(). This will always return false for lag-based and always true for cost-based.
  • CostBasedAutoScalerConfig.computeOptimalTaskCount() should return the optimal task count for scale down cases as well.
  • SeekableStreamSupervisor can store this desired task count in an atomic boolean and retrieve it upon regular task rollover.

Another (perhaps cleaner) option is to simply invoke AutoScaler.computeOptimalTaskCount() whenever we do rollover and then just go with the optimal task count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Fly-Style , as you suggested on chat, we can take up the scale down behaviour in a follow up PR.
In the current PR, we can do scale down same as scale up.

@Fly-Style Fly-Style requested a review from kfaraz December 15, 2025 13:22
Fly-Style and others added 3 commits December 15, 2025 15:22
…e/druid/indexing/kafka/KafkaConsumerMonitor.java

Co-authored-by: Kashif Faraz <[email protected]>
…e/druid/indexing/kafka/KafkaConsumerMonitor.java

Co-authored-by: Kashif Faraz <[email protected]>
@jtuglu1
Copy link
Contributor

jtuglu1 commented Dec 15, 2025

At most, maybe keep just one config scaleActionPeriod that can be specified as an ISO period (e.g. PT1M) or something (mostly since you would be using this in embedded tests). The other configs don't really add any value. They are legacy configs in lag-based auto-scaler which we might as well avoid adding in the new strategy.

I don't fully agree with this. At the very least, we use config.getScaleActionStartDelayMillis() internally when doing red/black deployments where supervisors can get paused. It's better in our case to put a delay after resubmitting the supervisor, otherwise we end-up over-scaling after a deployment. Similarly, we update the specs frequently to add new/update existing columns. Putting a cooldown after submission allows the scaler to adjust accurately to the lag rather than getting in a scaling loop and becoming way over-scaled (in supervisors with 500+ tasks this is issue). I agree the rest are not too useful in practice.

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look good but the weighted cost function needs to be simplified to make the computations more intuitive and debug friendly.

{
Map<String, Map<String, Object>> taskMetrics = getStats(true);
if (taskMetrics.isEmpty()) {
return 1.;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning 1 (full idle) here would cause the auto-scaler to think that the tasks are doing nothing and cause a scale-down, when in fact the tasks failed to return the metrics and may be in a bad state. Scaling down might further worsen the problem.

Should we return 0 instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scaling up might be an overkill in that scenario - nothing may happen and instead we will waste a resources.
0.5 looks optimal for me (during the implementation i thought between 0.5 and 1).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even return -1 to denote that we do not have metrics available, and just skip scaling rather than make a bad decision.

result.add(taskCount);
}
}
return result.stream().mapToInt(Integer::intValue).toArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is conversion to array still needed? Can we just return List or Set instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's my personal belief: not a fan of boxed primitives A LOT :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are still boxing the primitives while adding to the List. Might as well avoid the extra conversion.

Comment on lines +248 to +250
if (result.isEmpty() || result.get(result.size() - 1) != taskCount) {
result.add(taskCount);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe just use a set to simplify this computation.

* Weighted cost function combining lag, idle time, and change distance metrics.
* Uses adaptive bounds for normalization based on recent history.
*/
public class WeightedCostFunction
Copy link
Contributor

@kfaraz kfaraz Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the idea of the WeightedCostFunction, but I think we need to make it much more intuitive.

Define requirements (already aligned with the current state of this PR):

  1. A function that computes cost.
  2. A task count with lower cost is better.
  3. cost = lagCost * lagWeight + idlenessCost * idleWeight
  4. Lower the task count, higher the predicted lag, higher the lag cost.
  5. Higher the task count, higher the predicted idleness, higher the idleness cost.

Simplify computations

  1. Use linear scaling only, logarithmic scaling makes the terms difficult to reason about and debug.
    The diminishing returns effect is already enforced by the window (discrete distance). If more terms are needed to account for say, task operational overhead, we will add them in the future.
  2. Use only one mode i.e. do not invert scaling, even when lag is abnormally high.
  3. Use actual metrics instead of normalized or adaptive bounds.
    If a supervisor once saw a lag of 100M, the adaptive ratio would make a lag of 1M seem very small (normalizedLag = 0.01 i.e. 1%). But in reality, a lag of 1M is bad too and needs to be given appropriate weight.
  4. Always perform cost computation even if idleness is in the accepted range (0.2-0.6 in the PR).
    This would help us validate the correctness of the formula against real clusters by verifying that the current task count gives minimal cost.

We may re-introduce some of these enhancements in later patches once we have more data points using this auto-scaler, but it is best to start as simple as possible.

Use intuitive metric e.g. compute time

Connect the result of the cost function to an actual metric to make it more intuitive. The best metric I can think of is compute time or compute cycles, as it may be related to actual monetary cost of running tasks.

For example, what if we could model the cost as follows:

  1. lagCost = expected time (in seconds) required to recover current lag
  2. idlenessCost = total compute time (in seconds) wasted being idle in a single taskDuration
  3. Intuitively, we can see that as task count increases, lagCost would increase and idlenessCost would decrease.

The formula for these costs may be something like:

lagCost
= expected time (in seconds) required to recover current lag
= currentAggregateLag / (proposedTaskCount * avgRateOfProcessing)

where,
currentAggregateLag = sum of current lag (in records) across all partitions
avgRateOfProcessing = average of task moving averages
idlenessCost
= total time (in seconds) wasted being idle in a single taskDuration
= total task run time * predicted idleness ratio

where,
total task run time = (proposedTaskCount * taskDuration)

predicted idleness ratio = (proposedTaskCount / currentTaskCount) - (1 - avgPollToIdleRatio)

e.g. if current poll-to-idle-ratio is 0.7, tasks are idle 70% of the time,
so reducing task count by 70% will make tasks busy all the time (idleness ratio = 0).
Assumptions
  • Tasks are already at their peak processing rate and will remain at this rate.
  • poll-to-idle ratio scales linearly with task count. We may use some reasonable clamps for min (say 0.05) and max (say 0.95).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding:

Use actual metrics instead of normalized or adaptive bounds.
If a supervisor once saw a lag of 100M, the adaptive ratio would make a lag of 1M seem very small (normalizedLag = 0.01 i.e. 1%). But in reality, a lag of 1M is bad too and needs to be given appropriate weight.

Always perform cost computation even if idleness is in the accepted range (0.2-0.6 in the PR).
This would help us validate the correctness of the formula against real clusters by verifying that the current task count gives minimal cost.

Implemented in the latest commit. Other than that, we need to discuss each other point :)
Thanks in advance for your input, appreciate it A LOT!

@kfaraz
Copy link
Contributor

kfaraz commented Dec 15, 2025

Thanks for sharing the insight on config.getScaleActionStartDelayMillis(), @jtuglu1 !
Can you share some typical values that you use for this config, and how it compares to the scale action period?

@jtuglu1
Copy link
Contributor

jtuglu1 commented Dec 15, 2025

Thanks for sharing the insight on config.getScaleActionStartDelayMillis(), @jtuglu1 ! Can you share some typical values that you use for this config, and how it compares to the scale action period?

We typically set a submit (start) delay of 20-30mins after suspension/re-submit. This allows the scaler a chance to recover at its current task count before scaling (because often times, scaling too frequently will disrupt lag more than it helps). Instead, we opt to try and let the lag recover under 20mins, and if there's a sustained decrease in read tput (either due to new column) or increase in write tput, we allow the scaler to scale.

Scale action period is typically much smaller, maybe 5-10mins.

default Map<String, Map<String, Object>> getStats()
/**
* Returns all stats from stream consumer. If {@code includeOnlyStreamConsumerStats} is true,
* returns only stream platform stats, like Kafka metrics.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* returns only stream platform stats, like Kafka metrics.
* returns only stream consumer stats, like Kafka consumer metrics.

* Calculates the average poll-idle-ratio metric across all active tasks.
* This metric indicates how much time the consumer spends idle waiting for data.
*
* @return the average poll-idle-ratio across all tasks, or 1 (full idle) if no tasks or metrics are available
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return the average poll-idle-ratio across all tasks, or 1 (full idle) if no tasks or metrics are available
* @return the average poll-idle-ratio across all tasks, or 0 (fully busy) if no tasks or metrics are available

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second though, we might want to return -1 from here if we don't have any metrics available.
This would cause the auto-scaler to skip scaling rather than make a bad decision.

{
Map<String, Map<String, Object>> taskMetrics = getStats(true);
if (taskMetrics.isEmpty()) {
return 1.;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can even return -1 to denote that we do not have metrics available, and just skip scaling rather than make a bad decision.

result.add(taskCount);
}
}
return result.stream().mapToInt(Integer::intValue).toArray();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are still boxing the primitives while adding to the List. Might as well avoid the extra conversion.


autoscalerExecutor.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter),
config.getScaleActionPeriodMillis(),
Copy link
Contributor

@kfaraz kfaraz Dec 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the feedback from @jtuglu1 , I think we might need to reconsider this.

But rather than add a new config, I wonder if we can't improve this logic a bit.

Options:

  1. startDelay = Math.min(taskDuration, 30 mins). So we consider scaling up only after the current tasks have run for a bit.
  2. startDelay = 3 * config.getScaleActionPeriodMillis(). This could work too, and seems reasonable for most cases.
  3. Add a separate config scaleActionStartDelayMillis (so that we remain aligned with the existing behaviour of lag-based auto-scaler), whose default value is 3 * config.getScaleActionPeriodMillis().

@jtuglu1 , @Fly-Style , which one do you guys prefer the most?

private static final EmittingLogger log = new EmittingLogger(CostBasedAutoScaler.class);

private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2;
private static final int MIN_INCREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename based on usage. IIUC, this constant represents the "maximum" amount by which we can "decrease" the current value of num partitions per task.

Suggested change
private static final int MIN_INCREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;
private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2;

Copy link
Contributor

@kfaraz kfaraz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for incorporating the feedback, @Fly-Style !

The auto-scaler looks like a good starting point.
I have left some comments which should be addressed in follow up PRs.

final int partitionCount = supervisor.getPartitionCount();

final Map<String, Map<String, Object>> taskStats = supervisor.getStats();
final double movingAvgRate = extractMovingAverage(taskStats, DropwizardRowIngestionMeters.ONE_MINUTE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving averages over a longer time window (say 15 mins) might be more stable and thus more reliable.
If not available, then fallback to 5 minute, then 1 minute.

} else {
// Fallback: estimate processing rate based on idle ratio
final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio);
avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be weird to have this be fed from a config (even as a fallback mechanism).

In the future, we can consider computing this based on the stats of previously completed tasks of this supervisor.
OR just use the last known processing rate.

Comment on lines +275 to +277
if (result.isEmpty() || result.get(result.size() - 1) != taskCount) {
result.add(taskCount);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a set to simplify this logic.


autoscalerExecutor.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter),
config.getScaleActionPeriodMillis(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to revisit the start delay.

return -1;
}

// If idle is already in the ideal range [0.2, 0.6], optimal utilization has been achieved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may not always want to skip scaling even if idleness is in the accepted range.

For example, if current idleness is 0.5 and there is no lag, a cluster admin might prefer to scale down the tasks, so that idleness is more like 0.2 or so. They should be allowed to control this via the idleWeight.

For the initial testing of this auto-scaler, let's remove this guardrail.

}
}

emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC, metrics.getAvgPartitionLag()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already emitted as a metric.

What would be more useful is the computed terms lagCost and idleCost.
Getting these out as metrics would enable users to choose better values for lagWeight and idleWeight.

@kfaraz kfaraz merged commit 313ba8e into apache:master Dec 17, 2025
55 checks passed
@kfaraz
Copy link
Contributor

kfaraz commented Dec 17, 2025

@Fly-Style , I have merged this PR. Please address the open comments in a follow up PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants