-
Notifications
You must be signed in to change notification settings - Fork 3.8k
Introduce cost-based tasks autoscaler for streaming ingestion #18819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Fixed
Show fixed
Hide fixed
|
While I think this will be very useful, the primary issue we've run into with the current scaler is that it needs to shut down tasks in order to scale (causes a lot of lag during this process). #18466 was working on a way to fix this. I think this will help the scaler be smarter on each scale, but each scale action still costs a lot to do. |
58b4e2c to
7ee6403
Compare
7ee6403 to
b7703c8
Compare
1606559 to
704ed4c
Compare
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the new auto-scaler strategy, @Fly-Style !
I really like the idea of assigning a cost value to a potential task count as it helps reason about our choices! Overall, the patch looks good.
I am leaving a partial review here. I am yet to go through the WeightedCostFunction and some other aspects of the patch. Will post the remaining comments today.
indexing-service/src/main/java/org/apache/druid/indexing/common/task/CompactionTask.java
Outdated
Show resolved
Hide resolved
...rg/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java
Outdated
Show resolved
Hide resolved
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
Outdated
Show resolved
Hide resolved
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| final int currentTaskCount = currentMetrics.getCurrentTaskCount(); | ||
| final List<Integer> validTaskCounts = FACTORS_CACHE.computeIfAbsent( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the computeFactors() computation really heavy enough to require caching?
Especially since we are imposing a max SCALE_FACTOR_DISCRETE_DISTANCE.
How about we simplify computeFactors so that we compute only the required factors?
Example:
List<Integer> computeValidTaskCounts(int partitionCount, int currentTaskCount)
{
final int currentPartitionsPerTask = partitionCount / currentTaskCount;
final int minPartitionsPerTask = Math.max(1, currentPartitionsPerTask - 2);
final int maxPartitionsPerTask = Math.min(partitionCount, currentPartitionsPerTask + 2);
return IntStream.of(minPartitionPerTask, maxPartitionsPerTask + 1)
.map(partitionsPerTask -> (partitionCount / partitionsPerTask) + Math.min(partitionCount % partitionsPerTask, 1))
.collect(Collectors.toList();
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed on voice chat, we leave current implementation as is, but slightly enhancing the way how it data is saved in cache.
...rg/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
Show resolved
Hide resolved
...pache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfigTest.java
Show resolved
Hide resolved
...rg/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
...rg/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
|
@kfaraz thanks for the review. I addressed most of the comments in e5b40c7 , except comment regarding using the correct metrica for Will do a separate endpoint for correct metrics to fetch correct metrics from all tasks and calculate average with consequent data normalization in separate commit. cc @cryptoe |
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Fixed
Show fixed
Hide fixed
5dd32e8 to
9bc5107
Compare
5451cff to
db1254b
Compare
…ange and logarithmic scalability
276b98a to
fdbe06f
Compare
| config.getScaleActionStartDelayMillis(), | ||
| config.getScaleActionPeriodMillis(), | ||
| TimeUnit.MILLISECONDS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I meant that we should not have the start delay, period and collection interval as configs at all. Even with good defaults, unnecessary configs only complicate admin work and require code to handle all possible scenarios.
At most, maybe keep just one config scaleActionPeriod that can be specified as an ISO period (e.g. PT1M) or something (mostly since you would be using this in embedded tests). The other configs don't really add any value. They are legacy configs in lag-based auto-scaler which we might as well avoid adding in the new strategy.
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
Outdated
Show resolved
Hide resolved
...rg/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
Outdated
Show resolved
Hide resolved
...fka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaConsumerMonitor.java
Outdated
Show resolved
Hide resolved
...ng-service/src/main/java/org/apache/druid/indexing/seekablestream/common/RecordSupplier.java
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...ce/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
Outdated
Show resolved
Hide resolved
| if (optimalTaskCount > currentTaskCount) { | ||
| return optimalTaskCount; | ||
| } else if (optimalTaskCount < currentTaskCount) { | ||
| supervisor.getIoConfig().setTaskCount(optimalTaskCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line can have behavioural side effects in the supervisor, since the taskCount should always reflect the current running task count and not the desired task count. Here we are updating the taskCount without actually changing the number of tasks, or suspending the supervisor.
Instead, we could do the following:
- Add an auto-scaler method
isScaleDownOnRolloverOnly(). This will always return false for lag-based and always true for cost-based. CostBasedAutoScalerConfig.computeOptimalTaskCount()should return the optimal task count for scale down cases as well.SeekableStreamSupervisorcan store this desired task count in an atomic boolean and retrieve it upon regular task rollover.
Another (perhaps cleaner) option is to simply invoke AutoScaler.computeOptimalTaskCount() whenever we do rollover and then just go with the optimal task count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Fly-Style , as you suggested on chat, we can take up the scale down behaviour in a follow up PR.
In the current PR, we can do scale down same as scale up.
d165901 to
3cda48a
Compare
…e/druid/indexing/kafka/KafkaConsumerMonitor.java Co-authored-by: Kashif Faraz <[email protected]>
…e/druid/indexing/kafka/KafkaConsumerMonitor.java Co-authored-by: Kashif Faraz <[email protected]>
I don't fully agree with this. At the very least, we use |
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes look good but the weighted cost function needs to be simplified to make the computations more intuitive and debug friendly.
...rg/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
Show resolved
Hide resolved
...ce/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskClient.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
Outdated
Show resolved
Hide resolved
| { | ||
| Map<String, Map<String, Object>> taskMetrics = getStats(true); | ||
| if (taskMetrics.isEmpty()) { | ||
| return 1.; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Returning 1 (full idle) here would cause the auto-scaler to think that the tasks are doing nothing and cause a scale-down, when in fact the tasks failed to return the metrics and may be in a bad state. Scaling down might further worsen the problem.
Should we return 0 instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Scaling up might be an overkill in that scenario - nothing may happen and instead we will waste a resources.
0.5 looks optimal for me (during the implementation i thought between 0.5 and 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can even return -1 to denote that we do not have metrics available, and just skip scaling rather than make a bad decision.
...rg/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
Outdated
Show resolved
Hide resolved
| result.add(taskCount); | ||
| } | ||
| } | ||
| return result.stream().mapToInt(Integer::intValue).toArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Is conversion to array still needed? Can we just return List or Set instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's my personal belief: not a fan of boxed primitives A LOT :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we are still boxing the primitives while adding to the List. Might as well avoid the extra conversion.
| if (result.isEmpty() || result.get(result.size() - 1) != taskCount) { | ||
| result.add(taskCount); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just use a set to simplify this computation.
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
| * Weighted cost function combining lag, idle time, and change distance metrics. | ||
| * Uses adaptive bounds for normalization based on recent history. | ||
| */ | ||
| public class WeightedCostFunction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the idea of the WeightedCostFunction, but I think we need to make it much more intuitive.
Define requirements (already aligned with the current state of this PR):
- A function that computes cost.
- A task count with lower cost is better.
cost = lagCost * lagWeight + idlenessCost * idleWeight- Lower the task count, higher the predicted lag, higher the lag cost.
- Higher the task count, higher the predicted idleness, higher the idleness cost.
Simplify computations
- Use linear scaling only, logarithmic scaling makes the terms difficult to reason about and debug.
The diminishing returns effect is already enforced by the window (discrete distance). If more terms are needed to account for say, task operational overhead, we will add them in the future. - Use only one mode i.e. do not invert scaling, even when lag is abnormally high.
- Use actual metrics instead of normalized or adaptive bounds.
If a supervisor once saw a lag of 100M, the adaptive ratio would make a lag of 1M seem very small (normalizedLag = 0.01i.e. 1%). But in reality, a lag of 1M is bad too and needs to be given appropriate weight. - Always perform cost computation even if idleness is in the accepted range (0.2-0.6 in the PR).
This would help us validate the correctness of the formula against real clusters by verifying that the current task count gives minimal cost.
We may re-introduce some of these enhancements in later patches once we have more data points using this auto-scaler, but it is best to start as simple as possible.
Use intuitive metric e.g. compute time
Connect the result of the cost function to an actual metric to make it more intuitive. The best metric I can think of is compute time or compute cycles, as it may be related to actual monetary cost of running tasks.
For example, what if we could model the cost as follows:
lagCost = expected time (in seconds) required to recover current lagidlenessCost = total compute time (in seconds) wasted being idle in a single taskDuration- Intuitively, we can see that as task count increases,
lagCostwould increase andidlenessCostwould decrease.
The formula for these costs may be something like:
lagCost
= expected time (in seconds) required to recover current lag
= currentAggregateLag / (proposedTaskCount * avgRateOfProcessing)
where,
currentAggregateLag = sum of current lag (in records) across all partitions
avgRateOfProcessing = average of task moving averages
idlenessCost
= total time (in seconds) wasted being idle in a single taskDuration
= total task run time * predicted idleness ratio
where,
total task run time = (proposedTaskCount * taskDuration)
predicted idleness ratio = (proposedTaskCount / currentTaskCount) - (1 - avgPollToIdleRatio)
e.g. if current poll-to-idle-ratio is 0.7, tasks are idle 70% of the time,
so reducing task count by 70% will make tasks busy all the time (idleness ratio = 0).
Assumptions
- Tasks are already at their peak processing rate and will remain at this rate.
- poll-to-idle ratio scales linearly with task count. We may use some reasonable clamps for min (say 0.05) and max (say 0.95).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding:
Use actual metrics instead of normalized or adaptive bounds.
If a supervisor once saw a lag of 100M, the adaptive ratio would make a lag of 1M seem very small (normalizedLag = 0.01 i.e. 1%). But in reality, a lag of 1M is bad too and needs to be given appropriate weight.
Always perform cost computation even if idleness is in the accepted range (0.2-0.6 in the PR).
This would help us validate the correctness of the formula against real clusters by verifying that the current task count gives minimal cost.
Implemented in the latest commit. Other than that, we need to discuss each other point :)
Thanks in advance for your input, appreciate it A LOT!
|
Thanks for sharing the insight on |
We typically set a submit (start) delay of 20-30mins after suspension/re-submit. This allows the scaler a chance to recover at its current task count before scaling (because often times, scaling too frequently will disrupt lag more than it helps). Instead, we opt to try and let the lag recover under 20mins, and if there's a sustained decrease in read tput (either due to new column) or increase in write tput, we allow the scaler to scale. Scale action period is typically much smaller, maybe 5-10mins. |
Co-authored-by: Kashif Faraz <[email protected]>
| default Map<String, Map<String, Object>> getStats() | ||
| /** | ||
| * Returns all stats from stream consumer. If {@code includeOnlyStreamConsumerStats} is true, | ||
| * returns only stream platform stats, like Kafka metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * returns only stream platform stats, like Kafka metrics. | |
| * returns only stream consumer stats, like Kafka consumer metrics. |
| * Calculates the average poll-idle-ratio metric across all active tasks. | ||
| * This metric indicates how much time the consumer spends idle waiting for data. | ||
| * | ||
| * @return the average poll-idle-ratio across all tasks, or 1 (full idle) if no tasks or metrics are available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| * @return the average poll-idle-ratio across all tasks, or 1 (full idle) if no tasks or metrics are available | |
| * @return the average poll-idle-ratio across all tasks, or 0 (fully busy) if no tasks or metrics are available |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second though, we might want to return -1 from here if we don't have any metrics available.
This would cause the auto-scaler to skip scaling rather than make a bad decision.
...rg/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScalerConfig.java
Show resolved
Hide resolved
| { | ||
| Map<String, Map<String, Object>> taskMetrics = getStats(true); | ||
| if (taskMetrics.isEmpty()) { | ||
| return 1.; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can even return -1 to denote that we do not have metrics available, and just skip scaling rather than make a bad decision.
| result.add(taskCount); | ||
| } | ||
| } | ||
| return result.stream().mapToInt(Integer::intValue).toArray(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we are still boxing the primitives while adding to the List. Might as well avoid the extra conversion.
|
|
||
| autoscalerExecutor.scheduleAtFixedRate( | ||
| supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), | ||
| config.getScaleActionPeriodMillis(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on the feedback from @jtuglu1 , I think we might need to reconsider this.
But rather than add a new config, I wonder if we can't improve this logic a bit.
Options:
startDelay = Math.min(taskDuration, 30 mins). So we consider scaling up only after the current tasks have run for a bit.startDelay = 3 * config.getScaleActionPeriodMillis(). This could work too, and seems reasonable for most cases.- Add a separate config
scaleActionStartDelayMillis(so that we remain aligned with the existing behaviour of lag-based auto-scaler), whose default value is3 * config.getScaleActionPeriodMillis().
@jtuglu1 , @Fly-Style , which one do you guys prefer the most?
| private static final EmittingLogger log = new EmittingLogger(CostBasedAutoScaler.class); | ||
|
|
||
| private static final int MAX_INCREASE_IN_PARTITIONS_PER_TASK = 2; | ||
| private static final int MIN_INCREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rename based on usage. IIUC, this constant represents the "maximum" amount by which we can "decrease" the current value of num partitions per task.
| private static final int MIN_INCREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; | |
| private static final int MAX_DECREASE_IN_PARTITIONS_PER_TASK = MAX_INCREASE_IN_PARTITIONS_PER_TASK * 2; |
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Show resolved
Hide resolved
...java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/CostBasedAutoScaler.java
Outdated
Show resolved
Hide resolved
...rg/apache/druid/testing/embedded/indexing/autoscaler/CostBasedAutoScalerIntegrationTest.java
Show resolved
Hide resolved
…imated time to eliminate lag
kfaraz
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for incorporating the feedback, @Fly-Style !
The auto-scaler looks like a good starting point.
I have left some comments which should be addressed in follow up PRs.
| final int partitionCount = supervisor.getPartitionCount(); | ||
|
|
||
| final Map<String, Map<String, Object>> taskStats = supervisor.getStats(); | ||
| final double movingAvgRate = extractMovingAverage(taskStats, DropwizardRowIngestionMeters.ONE_MINUTE_NAME); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moving averages over a longer time window (say 15 mins) might be more stable and thus more reliable.
If not available, then fallback to 5 minute, then 1 minute.
| } else { | ||
| // Fallback: estimate processing rate based on idle ratio | ||
| final double utilizationRatio = Math.max(0.01, 1.0 - pollIdleRatio); | ||
| avgProcessingRate = config.getDefaultProcessingRate() * utilizationRatio; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be weird to have this be fed from a config (even as a fallback mechanism).
In the future, we can consider computing this based on the stats of previously completed tasks of this supervisor.
OR just use the last known processing rate.
| if (result.isEmpty() || result.get(result.size() - 1) != taskCount) { | ||
| result.add(taskCount); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use a set to simplify this logic.
|
|
||
| autoscalerExecutor.scheduleAtFixedRate( | ||
| supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), | ||
| config.getScaleActionPeriodMillis(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to revisit the start delay.
| return -1; | ||
| } | ||
|
|
||
| // If idle is already in the ideal range [0.2, 0.6], optimal utilization has been achieved. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not always want to skip scaling even if idleness is in the accepted range.
For example, if current idleness is 0.5 and there is no lag, a cluster admin might prefer to scale down the tasks, so that idleness is more like 0.2 or so. They should be allowed to control this via the idleWeight.
For the initial testing of this auto-scaler, let's remove this guardrail.
| } | ||
| } | ||
|
|
||
| emitter.emit(metricBuilder.setMetric(AVG_LAG_METRIC, metrics.getAvgPartitionLag())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is already emitted as a metric.
What would be more useful is the computed terms lagCost and idleCost.
Getting these out as metrics would enable users to choose better values for lagWeight and idleWeight.
...ava/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
Fixed
Show fixed
Hide fixed
...ava/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/WeightedCostFunction.java
Outdated
Show resolved
Hide resolved
|
@Fly-Style , I have merged this PR. Please address the open comments in a follow up PR. |
Cost-Based Autoscaler for Seekable Stream Supervisors
Overview
Implements a cost-based autoscaling algorithm for seekable stream supervisor tasks that optimizes task count by balancing lag reduction against resource efficiency.
Note: this patch doesn't support autoscaling (down) during task rollover. Temporarily, it scales down in the same manner as scales up.
Introduces
WeightedCostFunctionfor cost-based autoscaling decisions. The function computes a cost score (in seconds) for each candidate task count, balancing lag recovery time against idle resource waste.Key Design Decisions
Cost Formula
aggregateLag / (taskCount × avgProcessingRate)— time to clear backlogtaskCount × taskDuration × predictedIdleRatio— wasted compute timeIdle Prediction Model
Uses capacity-based linear scaling:
More tasks → more idle per task; fewer tasks → busier tasks.
Ideal Idle Range
Defines optimal utilization as idle ratio within [0.2, 0.6]:
Conservative Cold Start Behavior
When processing rate is unavailable (cold start, new tasks):
This prevents scaling decisions based on incomplete data.
Additionally, we add reading
poll-idle ratio-avgfrom/rowStatstask endpoint.This PR has: