-
Notifications
You must be signed in to change notification settings - Fork 24.4k
Add idempotency support to XADD via IDMPAUTO and IDMP parameters #14615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: unstable
Are you sure you want to change the base?
Conversation
| server.db[j].blocking_keys = dictCreate(&keylistDictType); | ||
| server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); | ||
| server.db[j].stream_claim_pending_keys = dictCreate(&objectKeyPointerValueDictType); | ||
| server.db[j].stream_idmp_keys = dictCreate(&objectKeyPointerValueDictType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see that stream_idmp_keys was updated when the key was deleted, defragged, am i missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key will be deleted in handleExpiredIdmpEntries t_stream.c:5661.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the key was defragged? That is to say, when defrag the keys of db->keys, it will not be updated simultaneously here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key in stream_idmp_keys comes from the command's argv argument. I don't think we're defragging it—note the call to trackStreamIdmpEntries(c, c->argv[1]);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i see
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i wonder why not store the key as sds? This way can save some memory.
Overview
This PR introduces idempotency support to Redis Streams' XADD command, enabling automatic deduplication of duplicate message submissions through optional IDMPAUTO and IDMP parameters with producer identification. This eliminates the need for external deduplication logic and prevents duplicate entries in streams.
Problem Statement
Current Redis Streams implementations lack built-in idempotency mechanisms, forcing developers to implement external deduplication:
This lack of native idempotency support creates reliability challenges in distributed systems where at-least-once delivery semantics are required but exactly-once processing is desired.
Solution
Extends XADD with optional idempotency parameters that include producer identification:
Producer ID (pid)
Idempotency Modes
IDMPAUTO pid (Automatic Idempotency):
iid = XOR(XXH128(fieldName || fieldValue))for all fieldsIDMP pid iid (Manual Idempotency):
Both modes can only be specified when entry ID is
*(auto-generated).Deduplication Logic
When XADD is called with idempotency parameters:
IDMP Map: Per-Producer Time and Capacity-Based Expiration
Each producer with idempotency enabled maintains its own isolated IDMP map (iid → entry_id) with dual expiration criteria:
Time-based expiration (duration):
Capacity-based expiration (maxsize):
Configuration Commands
XIDMP CFGGET: Retrieve current configuration
XIDMP CFGSET: Configure expiration parameters
Default Configuration (when XIDMP CFGSET not called):
stream-idmp-durationandstream-idmp-maxsizeResponse Behavior
On first submission (pid, iid) pair not in producer's map:
On duplicate submission (pid, iid) pair exists in producer's map:
Stream Metadata
XINFO STREAM extended with idempotency metrics:
Key Benefits
Performance
Comprehensive benchmarks demonstrate that idempotency features introduce minimal overhead to XADD operations while maintaining predictable memory usage across various message sizes.
Benchmark Methodology
Test Configuration:
MAXSIZE 10000(tracking up to 10K idempotent IDs per producer)Test Commands:
XADD test * f __data__(baseline, no idempotency)XADD test IDMP <pid> <iid> * f __data__(manual idempotency)XADD test IDMPAUTO <pid> * f __data__(automatic content-based idempotency)Throughput Performance
Memory Overhead
Key Performance Insights
Throughput Impact:
Memory Efficiency:
Performance Characteristics by Data Size:
Technical Implementation
Per-Producer IDMP Architecture
Each stream maintains a radix tree (rax) mapping producer IDs (pid) to independent idmpProducer structures:
Design Rationale:
pid → idmpProducer*for O(log n) producer lookupIDMP Map Data Structure
Each producer's IDMP map uses a hybrid data structure combining a hash table with an ordered linked list.
Core Data Structure: idmpEntry
Design Rationale:
Flexible Array Member (FAM): The
iid[]flexible array member enables inline storage of variable-length idempotent IDs directly within the structure, eliminating separate heap allocations and reducing memory fragmentation. This design is particularly beneficial since:List Pointer: The
nextpointer maintains the insertion-ordered linked list structure, enabling efficient traversal for time-based expiration and capacity-based eviction operations.Embedded Stream ID: The
streamID idfield serves dual purposes:Dual-Index Architecture (Per Producer)
Each producer's IDMP map maintains two concurrent access patterns using a single set of idmpEntry structures:
Hash Table (dict):
iid → idmpEntry*Insertion-Ordered Linked List:
nextfield in idmpEntrynextfield in each idmpEntry connects entries in insertion orderRelationship Between Structures:
nextpointers to form the ordered listNatural Time Ordering
New messages are naturally ordered by insertion time due to Redis Streams' ID structure:
streamID.msandstreamID.seq)*(auto-generated IDs)This natural ordering eliminates the need for explicit timestamp storage—the
streamID.msfield already contains the entry creation timestamp, and the linked list's insertion order inherently represents time-based ordering.Expiration Operations
Time-Based Expiration (duration):
Executed periodically via Redis cron job (every second):
streamID.msfield of each entrycurrent_time_ms - streamID.ms > duration_msCapacity-Based Eviction (maxsize):
Executed inline during XADD operations (per producer):
Combined Expiration Strategy:
This dual approach ensures:
Memory Efficiency
Per-Producer Overhead:
Each producer adds:
Per-Entry Memory Breakdown:
Each idempotency entry requires storage in both the Redis dict (hash table) and the idmpEntry structure:
nextpointer for linked list: 8 bytesstreamID(millisecond timestamp and sequence number): 16 bytesiid_lenfield: 8 bytesTotal Per-Entry Overhead:
IDMPAUTO pid mode (16-byte idempotent IDs):
IDMP pid iid mode (variable-length idempotent IDs):
Total Memory Example:
For a stream with 5 producers, each tracking 1,000 entries with IDMPAUTO:
This hybrid architecture provides optimal performance for both lookup operations (O(1) via dict per producer) and expiration operations (O(1) for capacity-based, O(k) for time-based per producer), while maintaining predictable memory overhead based on iid length choice and number of active producers. The reuse of
streamID.msfor time-based expiration eliminates the need for separate timestamp storage, and the cron-based background expiration ensures timely cleanup without impacting write path performance.