Skip to content

Conversation

@giuseppelillo
Copy link
Contributor

@giuseppelillo giuseppelillo commented Oct 20, 2025

Adds a cache for the Batch Coordinates. Cache entries are inserted by the broker after batches are committed to the Control Plane.
The cache is used by DelayedFetch to check if it can complete or not. This spares multiple network calls to the Control Plane, replacing them with checks performed on the in-memory cache.

On an high throughput use case, FindBatches queries performed by the brokers are reduced by 2/3, which causes Postgres CPU usage to drop by 50%.
Untitled Diagram-Copy of Page-1

@giuseppelillo giuseppelillo marked this pull request as draft October 20, 2025 14:57
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/bc-cache-v2 branch 8 times, most recently from f389d00 to a818653 Compare October 23, 2025 07:43
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/bc-cache-v2 branch 7 times, most recently from 99becc2 to b038ed4 Compare October 31, 2025 16:20
@giuseppelillo giuseppelillo changed the title Batch Coordinate cache PoC Batch Coordinate cachw Oct 31, 2025
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/bc-cache-v2 branch from b038ed4 to a5e6e6c Compare October 31, 2025 16:21
@giuseppelillo giuseppelillo changed the title Batch Coordinate cachw Batch Coordinate cache Oct 31, 2025
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/bc-cache-v2 branch 2 times, most recently from 113f200 to 059af4e Compare November 3, 2025 17:04
@giuseppelillo giuseppelillo force-pushed the giuseppelillo/bc-cache-v2 branch from 059af4e to 16d3d83 Compare November 3, 2025 17:29
@giuseppelillo giuseppelillo marked this pull request as ready for review November 3, 2025 17:29
Comment on lines +144 to +147
* If that fragment detects that the batch would create an integrity problem
* (such as a data gap, an overlap, or if the batch indicates the log start offset was
* increased), the entire cache entry for the requested TopicPartition will be invalidated
* and the batch insertion will be retried, creating a new log fragment.</p>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it work when multiple brokers commit batches, i.e. in 3 AZ setup? Won't this mean constant invalidation on all of them?

Copy link
Contributor Author

@giuseppelillo giuseppelillo Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically the scenario that is tested in this test:

@Test
void testMultipleCaches() throws IOException {
// Simulate a scenario where a single producer produces to the same partition by calling 2 different brokers.
// Cache of the first broker
var cache1 = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30), Clock.systemUTC());
// Cache of the second broker
var cache2 = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30), Clock.systemUTC());
// Producer creates 5 batches
var batch1 = createBatch(PARTITION_0, 0, 10, 0);
var batch2 = createBatch(PARTITION_0, 10, 10, 0);
var batch3 = createBatch(PARTITION_0, 20, 10, 0);
var batch4 = createBatch(PARTITION_0, 40, 10, 0);
var batch5 = createBatch(PARTITION_0, 50, 10, 0);
// Produce first batch to broker 1
cache1.put(batch1);
// Produce second batch to broker 2
cache2.put(batch2);
// Produce third batch to broker 1.
// Cache notices that a batch is missing and invalidates the entry for this partition
cache1.put(batch3);
// Produce fourth batch to broker 2
// Cache notices that a batch is missing and invalidates the entry for this partition
cache2.put(batch4);
// Produce fifth batch to broker 2, appending to the fourth batch because they're contiguous
cache2.put(batch5);
assertNull(cache1.get(PARTITION_0, 0));
var logFragmentFromCache1 = cache1.get(PARTITION_0, 20);
assertEquals(20, logFragmentFromCache1.firstOffset());
assertEquals(30, logFragmentFromCache1.highWaterMark());
assertNull(cache2.get(PARTITION_0, 0));
var logFragmentFromCache2 = cache2.get(PARTITION_0, 40);
assertEquals(40, logFragmentFromCache2.firstOffset());
assertEquals(60, logFragmentFromCache2.highWaterMark());
cache1.close();
cache2.close();
}

The answer is yes, if a producer constantly round robins produce requests to different brokers then it will lead to a high amount of cache invalidations.
However, if the client is correctly setup with AZ affinity this will not be an issue, as the producer will always produce batches for a specific topic-partition to the same broker.
If producers do not use AZ affinity then the best thing would be to configure a very small TTL, or completely disable the cache.

Another possible case is that different producers in different AZs will produce to the same topic-partition.
This also can increase the amount of invalidations, however the cache will still be useful for DelayedFetch as it will be able to understand if the bytes produced to that specific broker are enough for completing.
A synthetic benchmark using a cluster of 9 nodes (spread in 3 AZs) with 288 partitions and 30 producers did not highlight any issue with the amount of cache invalidations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible case is that different producers in different AZs will produce to the same topic-partition.

Yeah, I thought about this case mainly.

A synthetic benchmark using a cluster of 9 nodes (spread in 3 AZs) with 288 partitions and 30 producers did not highlight any issue with the amount of cache invalidations.

Ok, good, we can work with this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comparisons (cluster with 9 nodes in 3 AZs):

288 partitions, 30 producers, cache invalidations are very rare (1.6 %)
Screenshot 2025-11-04 at 15 21 27

9 partitions, 9 producers, cache invalidations are more frequent (20%)
Screenshot 2025-11-04 at 15 21 09

So this cache makes more sense for high throughput and high amount of partitions, however is still sustainable even for lower level of parallelism

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this all is worth briefly noting in Javadoc, for example

Comment on lines +1735 to +1737
if (logFragment == null) {
FindBatchResponse.success(util.List.of(), -1, -1)
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't go to the control plane in case of misses, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, requests to the Control Plane are done only when the DelayedFetch completes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some other broker can append lots of batches without the cache in the local broker being aware of this. It becomes aware only when the local broker produces something to this partition and has to invalidate the cache due to offset mismatch. -- Do I understand this correctly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly. This means that in theory you can have stale entries up to the TTL (e.g. the last 5 seconds of the log are not visible to a broker because no append was done through it).
This is fine as a best effort for checking if the DelayedFetch can be completed, but when the fetch is actually done the "real" batches will be retrieved from the control plane.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense. I need to stop seeing this cache as very consistent with PG and long-living :)

final CacheBatchCoordinate cacheBatchCoordinate = commitBatchResponse.cacheBatchCoordinate();
if (cacheBatchCoordinate != null) {
try {
batchCoordinateCache.put(cacheBatchCoordinate);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will probably suffer the same problem as with producing to the same topic on different brokers, but from finishing several requests in parallel. Shall we maybe have a comment about this?

Copy link
Contributor Author

@giuseppelillo giuseppelillo Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But only one file is committed at a time, right? So there are no parallel commits in the same broker

ivanyu
ivanyu previously approved these changes Nov 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants