-
Notifications
You must be signed in to change notification settings - Fork 6
Batch Coordinate cache #438
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
f389d00 to
a818653
Compare
99becc2 to
b038ed4
Compare
b038ed4 to
a5e6e6c
Compare
113f200 to
059af4e
Compare
059af4e to
16d3d83
Compare
| * If that fragment detects that the batch would create an integrity problem | ||
| * (such as a data gap, an overlap, or if the batch indicates the log start offset was | ||
| * increased), the entire cache entry for the requested TopicPartition will be invalidated | ||
| * and the batch insertion will be retried, creating a new log fragment.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it work when multiple brokers commit batches, i.e. in 3 AZ setup? Won't this mean constant invalidation on all of them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is basically the scenario that is tested in this test:
inkless/storage/inkless/src/test/java/io/aiven/inkless/cache/CaffeineBatchCoordinateCacheTest.java
Lines 266 to 307 in 16d3d83
| @Test | |
| void testMultipleCaches() throws IOException { | |
| // Simulate a scenario where a single producer produces to the same partition by calling 2 different brokers. | |
| // Cache of the first broker | |
| var cache1 = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30), Clock.systemUTC()); | |
| // Cache of the second broker | |
| var cache2 = new CaffeineBatchCoordinateCache(Duration.ofSeconds(30), Clock.systemUTC()); | |
| // Producer creates 5 batches | |
| var batch1 = createBatch(PARTITION_0, 0, 10, 0); | |
| var batch2 = createBatch(PARTITION_0, 10, 10, 0); | |
| var batch3 = createBatch(PARTITION_0, 20, 10, 0); | |
| var batch4 = createBatch(PARTITION_0, 40, 10, 0); | |
| var batch5 = createBatch(PARTITION_0, 50, 10, 0); | |
| // Produce first batch to broker 1 | |
| cache1.put(batch1); | |
| // Produce second batch to broker 2 | |
| cache2.put(batch2); | |
| // Produce third batch to broker 1. | |
| // Cache notices that a batch is missing and invalidates the entry for this partition | |
| cache1.put(batch3); | |
| // Produce fourth batch to broker 2 | |
| // Cache notices that a batch is missing and invalidates the entry for this partition | |
| cache2.put(batch4); | |
| // Produce fifth batch to broker 2, appending to the fourth batch because they're contiguous | |
| cache2.put(batch5); | |
| assertNull(cache1.get(PARTITION_0, 0)); | |
| var logFragmentFromCache1 = cache1.get(PARTITION_0, 20); | |
| assertEquals(20, logFragmentFromCache1.firstOffset()); | |
| assertEquals(30, logFragmentFromCache1.highWaterMark()); | |
| assertNull(cache2.get(PARTITION_0, 0)); | |
| var logFragmentFromCache2 = cache2.get(PARTITION_0, 40); | |
| assertEquals(40, logFragmentFromCache2.firstOffset()); | |
| assertEquals(60, logFragmentFromCache2.highWaterMark()); | |
| cache1.close(); | |
| cache2.close(); | |
| } |
The answer is yes, if a producer constantly round robins produce requests to different brokers then it will lead to a high amount of cache invalidations.
However, if the client is correctly setup with AZ affinity this will not be an issue, as the producer will always produce batches for a specific topic-partition to the same broker.
If producers do not use AZ affinity then the best thing would be to configure a very small TTL, or completely disable the cache.
Another possible case is that different producers in different AZs will produce to the same topic-partition.
This also can increase the amount of invalidations, however the cache will still be useful for DelayedFetch as it will be able to understand if the bytes produced to that specific broker are enough for completing.
A synthetic benchmark using a cluster of 9 nodes (spread in 3 AZs) with 288 partitions and 30 producers did not highlight any issue with the amount of cache invalidations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another possible case is that different producers in different AZs will produce to the same topic-partition.
Yeah, I thought about this case mainly.
A synthetic benchmark using a cluster of 9 nodes (spread in 3 AZs) with 288 partitions and 30 producers did not highlight any issue with the amount of cache invalidations.
Ok, good, we can work with this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comparisons (cluster with 9 nodes in 3 AZs):
288 partitions, 30 producers, cache invalidations are very rare (1.6 %)
9 partitions, 9 producers, cache invalidations are more frequent (20%)
So this cache makes more sense for high throughput and high amount of partitions, however is still sustainable even for lower level of parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this all is worth briefly noting in Javadoc, for example
storage/inkless/src/main/java/io/aiven/inkless/cache/StaleLogFragmentException.java
Show resolved
Hide resolved
| if (logFragment == null) { | ||
| FindBatchResponse.success(util.List.of(), -1, -1) | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't go to the control plane in case of misses, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, requests to the Control Plane are done only when the DelayedFetch completes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some other broker can append lots of batches without the cache in the local broker being aware of this. It becomes aware only when the local broker produces something to this partition and has to invalidate the cache due to offset mismatch. -- Do I understand this correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes exactly. This means that in theory you can have stale entries up to the TTL (e.g. the last 5 seconds of the log are not visible to a broker because no append was done through it).
This is fine as a best effort for checking if the DelayedFetch can be completed, but when the fetch is actually done the "real" batches will be retrieved from the control plane.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense. I need to stop seeing this cache as very consistent with PG and long-living :)
storage/inkless/src/main/java/io/aiven/inkless/cache/BatchCoordinateCache.java
Outdated
Show resolved
Hide resolved
storage/inkless/src/main/java/io/aiven/inkless/cache/CacheBatchCoordinate.java
Outdated
Show resolved
Hide resolved
| final CacheBatchCoordinate cacheBatchCoordinate = commitBatchResponse.cacheBatchCoordinate(); | ||
| if (cacheBatchCoordinate != null) { | ||
| try { | ||
| batchCoordinateCache.put(cacheBatchCoordinate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will probably suffer the same problem as with producing to the same topic on different brokers, but from finishing several requests in parallel. Shall we maybe have a comment about this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But only one file is committed at a time, right? So there are no parallel commits in the same broker
Adds a cache for the Batch Coordinates. Cache entries are inserted by the broker after batches are committed to the Control Plane.
The cache is used by
DelayedFetchto check if it can complete or not. This spares multiple network calls to the Control Plane, replacing them with checks performed on the in-memory cache.On an high throughput use case, FindBatches queries performed by the brokers are reduced by 2/3, which causes Postgres CPU usage to drop by 50%.
