-
Notifications
You must be signed in to change notification settings - Fork 0
fix: Add more robust event deduplication [P-1570] #126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR fixes event deduplication to work across flush cycles by implementing time-based hash cleanup with a 60-second window.
Key Changes:
- Changed
payloadHashesfromSet<string>toMap<string, number>to track timestamps - Improved timestamp precision from minute-level to second-level in hash generation
- Removed
payloadHashes.clear()fromflush()to maintain deduplication across flush cycles - Added automatic cleanup of hashes older than 60 seconds in
isDuplicate()
What This Fixes:
Previously, duplicate events could be sent if they occurred across different flush cycles (which happen every 10-30 seconds or when 20 events are queued). The hash set was cleared on every flush, breaking deduplication. Now, events with identical content within a 60-second window are properly deduplicated regardless of flush cycles.
Issues Found:
- Critical: Timing inconsistency between
generateMessageId()(usesevent.original_timestamp) andisDuplicate()(usesDate.now()). If events have past/future timestamps, deduplication window won't work correctly. - Minor: Unused import
toDateHourMinuteshould be removed - Test Quality: Tests lack proper assertions to verify deduplication behavior
Documentation:
Excellent documentation added with multiple guides explaining the fix, debugging steps, and visual diagrams.
Confidence Score: 3/5
- This PR has a critical timing inconsistency that could cause the deduplication to fail in production scenarios
- The core deduplication logic has a significant issue:
generateMessageId()formats timestamps fromevent.original_timestampfor hashing, whileisDuplicate()usesDate.now()for the time window. This mismatch means events with past/futureoriginal_timestampvalues won't be deduplicated correctly. The 60s window is measured from the current system time, but the hash is based on the event's timestamp, creating an inconsistency. Additionally, tests don't verify actual deduplication behavior (no assertions on queue state or logger calls). - src/lib/queue/EventQueue.ts requires attention to fix the timing inconsistency between hash generation and duplicate detection. test/lib/queue/EventQueue.spec.ts needs better assertions to actually verify deduplication works.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 4/5 | Implements time-based deduplication with 60s window and second-precision timestamps. Found one potential timing inconsistency issue. |
| test/lib/queue/EventQueue.spec.ts | 3/5 | Comprehensive test suite but tests don't properly verify behavior due to missing assertions on queue state. |
Sequence Diagram
sequenceDiagram
participant App as Application
participant EQ as EventQueue
participant Hash as payloadHashes Map
participant API as Analytics API
Note over App,API: Event Submission Flow with Deduplication
App->>EQ: enqueue(event1, timestamp: 10:30:15)
EQ->>EQ: generateMessageId(event1)
Note over EQ: Format timestamp to second precision<br/>(2025-01-01 10:30:15)
EQ->>EQ: hash(event + formatted timestamp)
EQ->>EQ: isDuplicate(hash_abc)
EQ->>Hash: Clean up hashes > 60s old
EQ->>Hash: has(hash_abc)?
Hash-->>EQ: false
EQ->>Hash: set(hash_abc, now=10:30:15)
EQ->>EQ: queue.push(event1)
Note over EQ: Queue size: 1
Note over EQ: ... 10 seconds later ...
App->>EQ: enqueue(event2, timestamp: 10:30:25)<br/>(same content as event1)
EQ->>EQ: generateMessageId(event2)
EQ->>EQ: hash(event + formatted timestamp)
EQ->>EQ: isDuplicate(hash_abc)
EQ->>Hash: Clean up hashes > 60s old
Note over Hash: No hashes to clean (all < 60s)
EQ->>Hash: has(hash_abc)?
Hash-->>EQ: true (stored at 10:30:15)
Note over EQ: Duplicate detected!<br/>Time since last: 10s
EQ->>EQ: logger.warn("Duplicate blocked, sent 10s ago")
EQ-->>App: return (blocked)
Note over EQ: ... Queue reaches 20 events ...
EQ->>EQ: flush()
Note over EQ,Hash: Hash map NOT cleared<br/>(preserves deduplication)
EQ->>API: POST /events (batch)
API-->>EQ: 200 OK
Note over EQ: ... 50 seconds after event1 ...
App->>EQ: enqueue(event3, timestamp: 10:31:05)<br/>(same content as event1)
EQ->>EQ: generateMessageId(event3)
EQ->>EQ: isDuplicate(hash_abc)
EQ->>Hash: Clean up hashes > 60s old
Note over Hash: hash_abc is 50s old, keep it
EQ->>Hash: has(hash_abc)?
Hash-->>EQ: true
EQ->>EQ: logger.warn("Duplicate blocked, sent 50s ago")
EQ-->>App: return (blocked)
Note over EQ: ... 70 seconds after event1 ...
App->>EQ: enqueue(event4, timestamp: 10:31:25)<br/>(same content as event1)
EQ->>EQ: generateMessageId(event4)
EQ->>EQ: isDuplicate(hash_abc)
EQ->>Hash: Clean up hashes > 60s old
Note over Hash: hash_abc is 70s old, DELETE
EQ->>Hash: delete(hash_abc)
EQ->>Hash: has(hash_abc)?
Hash-->>EQ: false
EQ->>Hash: set(hash_abc, now=10:31:25)
EQ->>EQ: queue.push(event4)
Note over EQ: Allowed (outside 60s window)
Additional Comments (2)
-
src/lib/queue/EventQueue.ts, line 112-126 (link)logic: Timing inconsistency:
generateMessageId()usesevent.original_timestampto format the timestamp for hashing, butisDuplicate()usesDate.now()for cleanup. If events arrive with past/futureoriginal_timestampvalues, the deduplication window won't work as expected.For example, if an event has
original_timestampfrom 2 minutes ago, it will generate a hash based on that old time, butisDuplicate()will compare against the current time, potentially allowing duplicates or incorrectly blocking events. -
src/lib/queue/EventQueue.ts, line 8 (link)style:
toDateHourMinuteimport is no longer used after the fix (replaced with inline second-precision formatting).
7 files reviewed, 3 comments
| it("should block duplicate events sent within the same second", async () => { | ||
| const event1 = createMockEvent({ | ||
| event: "transaction_submit", | ||
| original_timestamp: "2025-01-01T10:30:15.123Z", | ||
| }); | ||
|
|
||
| const event2 = createMockEvent({ | ||
| event: "transaction_submit", | ||
| original_timestamp: "2025-01-01T10:30:15.456Z", // Same second, different milliseconds | ||
| }); | ||
|
|
||
| // Enqueue first event - should succeed | ||
| await eventQueue.enqueue(event1); | ||
|
|
||
| // Enqueue duplicate event - should be blocked | ||
| await eventQueue.enqueue(event2); | ||
|
|
||
| // We can't directly check the queue size, but we can verify | ||
| // that the second call doesn't throw and returns silently | ||
| // In a real test, you'd want to spy on logger.warn to verify the warning | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Tests lack assertions - they only verify that enqueue() doesn't throw, but don't actually verify the queue state or that events were blocked. Consider adding test helpers to check queue size or spy on logger.warn to verify deduplication behavior.
|
bugbot run |
|
bugbot run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR fixes event deduplication by switching from minute-level to second-level timestamp precision and maintaining hashes across flush cycles. The implementation now uses a consistent 60-second window and removes the problematic payloadHashes.clear() on flush.
Key improvements:
- Changed
generateMessageId()to return both hash and event timestamp for consistent time-based comparisons - Updated
isDuplicate()to use event timestamps instead ofDate.now(), fixing timing inconsistencies - Hash map now persists across flush cycles with automatic cleanup of entries older than 60 seconds
- Comprehensive test suite added with proper assertions using sinon stubs
Critical issue found:
- The cleanup logic in
isDuplicate()(lines 268-276) has a bug when events arrive out-of-order. It uses the current event's timestamp to determine which hashes to delete, which can incorrectly remove valid hashes when an older event arrives after a newer one. The cleanup should use a monotonically increasing timestamp reference rather than the current event's timestamp.
Confidence Score: 2/5
- This PR contains a critical logic bug in the cleanup mechanism that could cause incorrect behavior with out-of-order events
- While the PR significantly improves the deduplication system by fixing the timing inconsistency and adding comprehensive tests, there is a critical bug in the cleanup logic (lines 268-276) that uses the current event's timestamp to delete old hashes. This will fail when events arrive out-of-order, potentially deleting hashes that are still within the deduplication window from the perspective of later events. The tests don't cover this scenario.
- Pay close attention to
src/lib/queue/EventQueue.ts- theisDuplicate()method's cleanup logic needs to be fixed to handle out-of-order events correctly
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 2/5 | Improved deduplication with timestamp-based window and second-level precision, but has critical out-of-order event handling bug in cleanup logic that could incorrectly delete valid hashes |
| test/lib/queue/EventQueue.spec.ts | 4/5 | Comprehensive test suite added with proper assertions using sinon stubs, covers most deduplication scenarios but missing out-of-order event test case |
Sequence Diagram
sequenceDiagram
participant Client
participant EventQueue
participant generateMessageId
participant isDuplicate
participant payloadHashes
participant APIServer
Client->>EventQueue: enqueue(event)
EventQueue->>generateMessageId: generateMessageId(event)
Note over generateMessageId: Extract event.original_timestamp
Note over generateMessageId: Format to second precision<br/>(YYYY-MM-DD HH:mm:ss)
Note over generateMessageId: Hash event with formatted timestamp
generateMessageId-->>EventQueue: { hash, timestamp }
EventQueue->>isDuplicate: isDuplicate(hash, timestamp)
Note over isDuplicate: Cleanup Phase
loop For each stored hash
isDuplicate->>payloadHashes: Get stored timestamp
alt timestamp - stored > 60s
isDuplicate->>payloadHashes: Delete old hash
end
end
Note over isDuplicate: Duplicate Check
isDuplicate->>payloadHashes: Check if hash exists
alt Hash exists
Note over isDuplicate: Calculate time since last event
isDuplicate-->>EventQueue: true (duplicate)
EventQueue-->>Client: return (event blocked)
else Hash does not exist
isDuplicate->>payloadHashes: Store hash with timestamp
isDuplicate-->>EventQueue: false (not duplicate)
EventQueue->>EventQueue: Add to queue
alt Queue reaches flushAt or maxQueueSize
EventQueue->>APIServer: flush() - POST events
Note over EventQueue: payloadHashes NOT cleared<br/>(maintains deduplication)
APIServer-->>EventQueue: Success
end
EventQueue-->>Client: Event enqueued
end
3 files reviewed, 1 comment
src/lib/queue/EventQueue.ts
Outdated
| // Clean up old hashes that are outside the deduplication window | ||
| // Use eventTimestamp (not Date.now()) to ensure consistency with hash generation | ||
| const hashesToDelete: string[] = []; | ||
| this.payloadHashes.forEach((storedTimestamp, hash) => { | ||
| if (eventTimestamp - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | ||
| hashesToDelete.push(hash); | ||
| } | ||
| }); | ||
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
logic: Cleanup logic can incorrectly delete valid hashes when events arrive out-of-order. If an older event (timestamp T1) arrives after a newer event (timestamp T2), the cleanup uses T1 to determine which hashes to delete. This means any hash with timestamp < T1 - 60s gets deleted, even though from T2's perspective those hashes might still be within the 60s window.
Example: Event at 10:31:00 arrives first (stored), then event at 10:30:00 arrives. The cleanup at line 272 uses 10:30:00 as eventTimestamp, so it deletes any hash with timestamp < 09:29:00. This is incorrect because we should use the maximum timestamp seen, not the current event's timestamp.
Consider tracking the latest timestamp separately or using Date.now() for cleanup only (while still using event timestamps for deduplication comparison).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Refactored event deduplication to use a timestamped hash map with a 60-second real-time window, fixing three critical issues: hash clearing on flush (breaking cross-flush deduplication), timing inconsistency between hash generation and cleanup, and incorrect cleanup with out-of-order events. The solution now properly maintains deduplication state across flush cycles by storing hashes with Date.now() timestamps and cleaning them based on real elapsed time rather than event timestamps.
Key Changes:
- Changed
payloadHashesfromSet<string>toMap<string, number>mapping hash toDate.now()timestamp - Removed
payloadHashes.clear()fromflush()method to maintain deduplication across flush cycles - Updated
isDuplicate()to clean up old hashes based on real-time window (Date.now() - stored > 60s) - Enhanced hash generation to use second-level precision while returning both hash and timestamp
- Added comprehensive test suite covering same-second duplicates, cross-flush behavior, out-of-order events, and real-world scenarios
Issues Found:
- 8 documentation markdown files should be removed (development notes, not production docs)
- Tests only verify logger calls, not actual queue state
- Missing test for deduplication window expiration (events allowed after 60s)
- Unused
timestampreturn value fromgenerateMessageId() - Cleanup logic uses temporary array instead of direct deletion
Confidence Score: 4/5
- This PR is safe to merge with minor cleanup - the core deduplication logic is sound and fixes critical bugs
- Score reflects solid implementation of deduplication fix that addresses real production issues (cross-flush deduplication, out-of-order events). However, the PR includes 8 documentation files that should be removed, tests could be more thorough (missing window expiration test, only checking logger calls), and there are minor code quality improvements possible. The core logic is correct and the fix is important, but cleanup is recommended before merge.
- Remove all 8 documentation .md files (FINAL_FIX_SUMMARY.md, OUT_OF_ORDER_FIX.md, etc.) before merging. Test file should add assertions on queue state and window expiration behavior.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 4/5 | Refactored deduplication to use timestamped hash map with 60s window, removing flush-time clearing for cross-flush persistence. Implementation is sound but has minor issues. |
| test/lib/queue/EventQueue.spec.ts | 3/5 | Comprehensive test suite added covering deduplication scenarios, but tests only verify logger calls rather than actual queue state, and missing critical window expiration test. |
Sequence Diagram
sequenceDiagram
participant Client
participant EventQueue
participant isDuplicate
participant payloadHashes as Map<hash,timestamp>
participant Queue as queue[]
Client->>EventQueue: enqueue(event1, timestamp: 10:30:15.123Z)
EventQueue->>EventQueue: generateMessageId(event1)
Note over EventQueue: Hash includes timestamp<br/>rounded to seconds:<br/>"2025-01-01 10:30:15"
EventQueue->>EventQueue: hash = "abc123..."
EventQueue->>isDuplicate: isDuplicate("abc123")
isDuplicate->>isDuplicate: now = Date.now() = T0
isDuplicate->>payloadHashes: Clean up old hashes<br/>(now - stored > 60s)
isDuplicate->>payloadHashes: has("abc123")?
payloadHashes-->>isDuplicate: false
isDuplicate->>payloadHashes: set("abc123", T0)
isDuplicate-->>EventQueue: false (not duplicate)
EventQueue->>Queue: push(event1)
EventQueue-->>Client: Event enqueued
Client->>EventQueue: enqueue(event2, timestamp: 10:30:15.456Z)
Note over Client: Same second, different ms<br/>Same event data
EventQueue->>EventQueue: generateMessageId(event2)
Note over EventQueue: Hash rounds to same second:<br/>"2025-01-01 10:30:15"
EventQueue->>EventQueue: hash = "abc123..." (SAME!)
EventQueue->>isDuplicate: isDuplicate("abc123")
isDuplicate->>isDuplicate: now = Date.now() = T0 + 0.3s
isDuplicate->>payloadHashes: Clean up old hashes
isDuplicate->>payloadHashes: has("abc123")?
payloadHashes-->>isDuplicate: true (stored at T0)
isDuplicate->>isDuplicate: elapsed = (T0+0.3s) - T0 = 0.3s
isDuplicate-->>EventQueue: true (DUPLICATE)
EventQueue->>EventQueue: logger.warn("Duplicate detected")
EventQueue-->>Client: Event blocked
Note over EventQueue: Queue reaches flushAt=20
EventQueue->>EventQueue: flush()
Note over EventQueue: payloadHashes NOT cleared<br/>(preserves dedup state)
EventQueue->>EventQueue: Send batch to API
Client->>EventQueue: enqueue(event3, timestamp: 10:30:15.789Z)
Note over Client: After flush, same second
EventQueue->>isDuplicate: isDuplicate("abc123")
isDuplicate->>payloadHashes: has("abc123")?
payloadHashes-->>isDuplicate: true (still in memory)
isDuplicate-->>EventQueue: true (DUPLICATE)
EventQueue-->>Client: Event blocked (cross-flush dedup works!)
4 files reviewed, 5 comments
| it("should block duplicate events sent within the same second", async () => { | ||
| const event1 = createMockEvent({ | ||
| event: "transaction_submit", | ||
| original_timestamp: "2025-01-01T10:30:15.123Z", | ||
| }); | ||
|
|
||
| const event2 = createMockEvent({ | ||
| event: "transaction_submit", | ||
| original_timestamp: "2025-01-01T10:30:15.456Z", // Same second, different milliseconds | ||
| }); | ||
|
|
||
| // Enqueue first event - should succeed | ||
| await eventQueue.enqueue(event1); | ||
| expect(loggerLogStub.calledOnce).to.be.true; | ||
| expect(loggerWarnStub.called).to.be.false; | ||
|
|
||
| // Enqueue duplicate event - should be blocked | ||
| await eventQueue.enqueue(event2); | ||
|
|
||
| // Verify duplicate was detected | ||
| expect(loggerWarnStub.calledOnce).to.be.true; | ||
| expect(loggerWarnStub.firstCall.args[0]).to.include("Duplicate event detected"); | ||
| // Only one "Event enqueued" log (first event only) | ||
| expect(loggerLogStub.calledOnce).to.be.true; | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Tests only verify logger output, not actual queue behavior. Consider adding assertions on queue length.
| it("should block duplicate events sent within the same second", async () => { | |
| const event1 = createMockEvent({ | |
| event: "transaction_submit", | |
| original_timestamp: "2025-01-01T10:30:15.123Z", | |
| }); | |
| const event2 = createMockEvent({ | |
| event: "transaction_submit", | |
| original_timestamp: "2025-01-01T10:30:15.456Z", // Same second, different milliseconds | |
| }); | |
| // Enqueue first event - should succeed | |
| await eventQueue.enqueue(event1); | |
| expect(loggerLogStub.calledOnce).to.be.true; | |
| expect(loggerWarnStub.called).to.be.false; | |
| // Enqueue duplicate event - should be blocked | |
| await eventQueue.enqueue(event2); | |
| // Verify duplicate was detected | |
| expect(loggerWarnStub.calledOnce).to.be.true; | |
| expect(loggerWarnStub.firstCall.args[0]).to.include("Duplicate event detected"); | |
| // Only one "Event enqueued" log (first event only) | |
| expect(loggerLogStub.calledOnce).to.be.true; | |
| }); | |
| // Enqueue first event - should succeed | |
| await eventQueue.enqueue(event1); | |
| expect(loggerLogStub.calledOnce).to.be.true; | |
| expect(loggerWarnStub.called).to.be.false; | |
| expect((eventQueue as any).queue.length).to.equal(1); | |
| // Enqueue duplicate event - should be blocked | |
| await eventQueue.enqueue(event2); | |
| // Verify duplicate was detected | |
| expect(loggerWarnStub.calledOnce).to.be.true; | |
| expect(loggerWarnStub.firstCall.args[0]).to.include("Duplicate event detected"); | |
| // Only one "Event enqueued" log (first event only) | |
| expect(loggerLogStub.calledOnce).to.be.true; | |
| expect((eventQueue as any).queue.length).to.equal(1); |
FINAL_FIX_SUMMARY.md
Outdated
| # Final Deduplication Fix - Complete Summary | ||
|
|
||
| ## 🎯 Three Critical Issues Identified and Fixed | ||
|
|
||
| Through thorough code review, three critical bugs were found and fixed in the deduplication system: | ||
|
|
||
| ### 1. ✅ Original Issue: Hash Cleared on Flush | ||
| **Problem**: `payloadHashes.clear()` on every flush broke deduplication across flush cycles | ||
| **Status**: FIXED - Hashes now persist with time-based cleanup | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Remove documentation files from the PR. These 8 markdown files (FINAL_FIX_SUMMARY.md, OUT_OF_ORDER_FIX.md, DEDUPLICATION_FIX.md, CRITICAL_FIX_APPLIED.md, CHANGES_SUMMARY.md, FIX_SUMMARY.md, DEBUGGING_GUIDE.md, DEDUPLICATION_VISUAL.md) should not be committed to the repository - they appear to be development notes.
src/lib/queue/EventQueue.ts
Outdated
| private async generateMessageId(event: IFormoEvent): Promise<{ hash: string; timestamp: number }> { | ||
| // Format timestamp to second precision (YYYY-MM-DD HH:mm:ss) for better deduplication | ||
| const date = new Date(event.original_timestamp); | ||
| const eventTimestamp = date.getTime(); // Get timestamp in milliseconds | ||
|
|
||
| const formattedTimestamp = | ||
| date.getUTCFullYear() + "-" + | ||
| ("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" + | ||
| ("0" + date.getUTCDate()).slice(-2) + " " + | ||
| ("0" + date.getUTCHours()).slice(-2) + ":" + | ||
| ("0" + date.getUTCMinutes()).slice(-2) + ":" + | ||
| ("0" + date.getUTCSeconds()).slice(-2); | ||
|
|
||
| const eventForHashing = { ...event, original_timestamp: formattedTimestamp }; | ||
| const eventString = JSON.stringify(eventForHashing); | ||
| return hash(eventString); | ||
| const hashValue = await hash(eventString); | ||
|
|
||
| return { hash: hashValue, timestamp: eventTimestamp }; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: The timestamp return value from generateMessageId() is never used. Consider removing it or documenting why it's returned.
| private async generateMessageId(event: IFormoEvent): Promise<{ hash: string; timestamp: number }> { | |
| // Format timestamp to second precision (YYYY-MM-DD HH:mm:ss) for better deduplication | |
| const date = new Date(event.original_timestamp); | |
| const eventTimestamp = date.getTime(); // Get timestamp in milliseconds | |
| const formattedTimestamp = | |
| date.getUTCFullYear() + "-" + | |
| ("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" + | |
| ("0" + date.getUTCDate()).slice(-2) + " " + | |
| ("0" + date.getUTCHours()).slice(-2) + ":" + | |
| ("0" + date.getUTCMinutes()).slice(-2) + ":" + | |
| ("0" + date.getUTCSeconds()).slice(-2); | |
| const eventForHashing = { ...event, original_timestamp: formattedTimestamp }; | |
| const eventString = JSON.stringify(eventForHashing); | |
| return hash(eventString); | |
| const hashValue = await hash(eventString); | |
| return { hash: hashValue, timestamp: eventTimestamp }; | |
| } | |
| private async generateMessageId(event: IFormoEvent): Promise<string> { | |
| // Format timestamp to second precision (YYYY-MM-DD HH:mm:ss) for better deduplication | |
| const date = new Date(event.original_timestamp); | |
| const formattedTimestamp = | |
| date.getUTCFullYear() + "-" + | |
| ("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" + | |
| ("0" + date.getUTCDate()).slice(-2) + " " + | |
| ("0" + date.getUTCHours()).slice(-2) + ":" + | |
| ("0" + date.getUTCMinutes()).slice(-2) + ":" + | |
| ("0" + date.getUTCSeconds()).slice(-2); | |
| const eventForHashing = { ...event, original_timestamp: formattedTimestamp }; | |
| const eventString = JSON.stringify(eventForHashing); | |
| return await hash(eventString); | |
| } |
src/lib/queue/EventQueue.ts
Outdated
| const hashesToDelete: string[] = []; | ||
| this.payloadHashes.forEach((storedTimestamp, hash) => { | ||
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | ||
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | ||
| hashesToDelete.push(hash); | ||
| } | ||
| }); | ||
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Creating temporary array for cleanup is inefficient. Use direct deletion during iteration.
| const hashesToDelete: string[] = []; | |
| this.payloadHashes.forEach((storedTimestamp, hash) => { | |
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | |
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | |
| hashesToDelete.push(hash); | |
| } | |
| }); | |
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); | |
| // Clean up old hashes based on actual elapsed time (Date.now()) | |
| // This handles out-of-order events correctly - we clean up based on real time passage | |
| // not based on event timestamps which may arrive out of order | |
| for (const [hash, storedTimestamp] of this.payloadHashes.entries()) { | |
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | |
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | |
| this.payloadHashes.delete(hash); | |
| } | |
| } |
| describe("Hash Cleanup", () => { | ||
| it("should handle out-of-order events correctly", async () => { | ||
| // Simulate events arriving out of chronological order | ||
| const newerEvent = createMockEvent({ | ||
| event: "test_event", | ||
| original_timestamp: "2025-01-01T10:31:00.000Z", // Newer timestamp | ||
| }); | ||
|
|
||
| await eventQueue.enqueue(newerEvent); | ||
|
|
||
| // Now enqueue an older event | ||
| const olderEvent = createMockEvent({ | ||
| event: "different_event", // Different event, so not a duplicate | ||
| original_timestamp: "2025-01-01T10:30:00.000Z", // Older timestamp | ||
| }); | ||
|
|
||
| await eventQueue.enqueue(olderEvent); | ||
|
|
||
| // Both should be allowed (different events, cleanup based on real time) | ||
| expect(loggerWarnStub.called).to.be.false; | ||
| expect(loggerLogStub.calledTwice).to.be.true; | ||
| }); | ||
|
|
||
| it("should clean up hashes based on real elapsed time", async () => { | ||
| // This test shows that cleanup is based on Date.now(), not event timestamps | ||
| // In a real implementation, you'd mock Date.now() to test this properly | ||
| // For now, we verify the logic doesn't break with same-timestamp events | ||
| const event1 = createMockEvent({ | ||
| event: "test_event", | ||
| original_timestamp: "2025-01-01T10:30:00.000Z", | ||
| }); | ||
|
|
||
| const event2 = createMockEvent({ | ||
| event: "test_event", | ||
| original_timestamp: "2025-01-01T10:30:00.000Z", // Same timestamp | ||
| }); | ||
|
|
||
| await eventQueue.enqueue(event1); | ||
| await eventQueue.enqueue(event2); | ||
|
|
||
| // Second event should be blocked as duplicate (same hash) | ||
| expect(loggerWarnStub.calledOnce).to.be.true; | ||
| expect(loggerLogStub.calledOnce).to.be.true; | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Missing critical test: verify that events CAN be sent again after the 60s deduplication window expires. Current tests verify blocking within window, but not expiration behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Improves client-side event deduplication by switching from minute-precision to second-precision hashing and implementing a 60-second real-time deduplication window that persists across flush cycles.
Key Changes:
- Second-precision hashing: Events within the same second (ignoring milliseconds) are considered duplicates, preventing rapid duplicate submissions
- Real-time window tracking: Uses
Date.now()to store hash timestamps instead of event timestamps, fixing the out-of-order event handling issue noted in previous comments - Cross-flush persistence: Removed
payloadHashes.clear()on flush, maintaining deduplication state across batch sends - Automatic cleanup: Old hashes (>60s) are cleaned up during each
isDuplicate()check - Comprehensive tests: 13 test cases covering basic deduplication, time windows, cross-flush behavior, cleanup logic, and real-world scenarios including window expiration
Previous Issues Addressed:
- ✅ Out-of-order cleanup logic fixed (src/lib/queue/EventQueue.ts:268-280)
- ✅ Tests now include queue length assertions (test/lib/queue/EventQueue.spec.ts:88, 99)
- ✅ Window expiration properly tested with Date.now() mocking (test/lib/queue/EventQueue.spec.ts:274-312)
⚠️ Documentation files mentioned in PR description are not in changeset (expected behavior)
Minor Issue:
- Unused import
toDateHourMinuteshould be removed (src/lib/queue/EventQueue.ts:8)
Confidence Score: 4/5
- Safe to merge with one minor cleanup (unused import)
- Implementation correctly addresses the out-of-order event handling issue from previous review. Comprehensive test suite with proper Date.now() mocking validates the 60s window behavior. Only issue is a cosmetic unused import.
- src/lib/queue/EventQueue.ts requires minor cleanup (remove unused import on line 8), otherwise no files require special attention
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 4/5 | Implements second-precision deduplication with 60s real-time window, fixes out-of-order issue. Minor: unused import toDateHourMinute at line 8. |
| test/lib/queue/EventQueue.spec.ts | 5/5 | Comprehensive test suite with 13 tests covering basic deduplication, time windows, cross-flush persistence, cleanup, and real-world scenarios. Properly tests window expiration with Date.now() mocking. |
Sequence Diagram
sequenceDiagram
participant Client
participant EventQueue
participant PayloadHashes as PayloadHashes<br/>Map<hash, timestamp>
participant Logger
participant API
Client->>EventQueue: enqueue(event)
EventQueue->>EventQueue: generateMessageId(event)
Note over EventQueue: Hash = hash(event + timestamp<br/>formatted to second precision)
EventQueue->>EventQueue: isDuplicate(hash)
EventQueue->>PayloadHashes: Check for old hashes
Note over PayloadHashes: Cleanup: Delete hashes where<br/>Date.now() - storedTimestamp > 60s
alt Hash exists in Map
PayloadHashes-->>EventQueue: Hash found
EventQueue->>Logger: warn("Duplicate detected...")
EventQueue-->>Client: Return (blocked)
else Hash not found
PayloadHashes-->>EventQueue: Hash not found
EventQueue->>PayloadHashes: set(hash, Date.now())
EventQueue->>EventQueue: queue.push(event)
EventQueue->>Logger: log("Event enqueued")
alt Queue reaches flushAt (20 events)
EventQueue->>EventQueue: flush()
EventQueue->>API: POST /events (batch)
Note over EventQueue,PayloadHashes: payloadHashes NOT cleared<br/>Maintains deduplication across flushes
API-->>EventQueue: 200 OK
end
EventQueue-->>Client: Success
end
Additional Comments (1)
-
src/lib/queue/EventQueue.ts, line 8 (link)style: Unused import -
toDateHourMinuteis no longer used after switching to second-precision formatting.
10 files reviewed, 1 comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR overhauls the event deduplication system to fix critical bugs that allowed duplicate events to be sent. The implementation now uses a Map-based approach with Date.now() for time-based cleanup, ensuring robust deduplication across flush cycles.
Key improvements:
- Replaced
SetwithMap<string, number>to track when hashes were first seen - Changed hash precision from minute-level to second-level for better granularity
- Removed
payloadHashes.clear()from flush() to maintain deduplication across flush cycles - Implemented real-time cleanup using
Date.now()instead of event timestamps, correctly handling out-of-order events - Added comprehensive test suite with proper assertions on queue length and logger calls
- Updated deduplication window to 60 seconds of real elapsed time
The fix addresses three critical issues: hash clearing on flush, timing inconsistency between hash generation and window checking, and incorrect cleanup with out-of-order events.
Confidence Score: 5/5
- This PR is safe to merge with high confidence
- The implementation correctly fixes the deduplication logic with proper separation of concerns (hash generation, duplicate detection, and cleanup). The use of Date.now() for cleanup elegantly handles out-of-order events. Comprehensive test coverage includes window expiration tests with Date.now() mocking, cross-flush persistence verification, and real-world scenarios. The logic is sound and all edge cases are handled properly.
- No files require special attention. Consider removing DEDUPLICATION_FIX_SUMMARY.md as it's development documentation rather than production code documentation.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 5/5 | Robust deduplication implementation using Date.now() for cleanup and cross-flush persistence. Logic correctly handles out-of-order events and time-based window expiration. |
| test/lib/queue/EventQueue.spec.ts | 5/5 | Comprehensive test suite with proper assertions covering deduplication, cross-flush persistence, out-of-order events, window expiration (with Date.now() mocking), and real-world scenarios. |
| DEDUPLICATION_FIX_SUMMARY.md | 4/5 | Detailed documentation of the fix. While comprehensive, this development notes file may not belong in the repository long-term. |
Sequence Diagram
sequenceDiagram
participant Client
participant EventQueue
participant PayloadHashes as Map<hash, timestamp>
participant Logger
participant API
Note over Client,API: Event Enqueueing & Deduplication Flow
Client->>EventQueue: enqueue(event)
EventQueue->>EventQueue: generateMessageId(event)
Note over EventQueue: Hash = f(event data + timestamp)<br/>Rounded to second precision
EventQueue-->>EventQueue: message_id (hash)
EventQueue->>EventQueue: isDuplicate(message_id)
Note over EventQueue: now = Date.now()
EventQueue->>PayloadHashes: forEach(storedTimestamp, hash)
Note over PayloadHashes: Cleanup: Remove hashes where<br/>now - storedTimestamp > 60s
alt Hash exists in map
PayloadHashes-->>EventQueue: true (found)
EventQueue->>Logger: warn("Duplicate detected")
EventQueue-->>Client: return (blocked)
else Hash not in map
PayloadHashes-->>EventQueue: false (not found)
EventQueue->>PayloadHashes: set(message_id, now)
EventQueue->>EventQueue: queue.push(event)
EventQueue->>Logger: log("Event enqueued")
alt Queue reaches flushAt threshold
EventQueue->>EventQueue: flush()
Note over EventQueue: Note: payloadHashes NOT cleared<br/>Hashes persist across flushes
EventQueue->>API: POST /events (batch)
API-->>EventQueue: 200 OK
end
EventQueue-->>Client: return (success)
end
Note over PayloadHashes: Deduplication Window: 60s<br/>Based on real time (Date.now())<br/>Not event timestamps
5 files reviewed, 2 comments
src/lib/queue/EventQueue.ts
Outdated
| const hashesToDelete: string[] = []; | ||
| this.payloadHashes.forEach((storedTimestamp, hash) => { | ||
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | ||
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | ||
| hashesToDelete.push(hash); | ||
| } | ||
| }); | ||
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Creating a temporary array for deletion adds overhead. Consider deleting directly during iteration since Map.forEach allows safe deletion.
| const hashesToDelete: string[] = []; | |
| this.payloadHashes.forEach((storedTimestamp, hash) => { | |
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | |
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | |
| hashesToDelete.push(hash); | |
| } | |
| }); | |
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); | |
| // Clean up old hashes based on actual elapsed time (Date.now()) | |
| // This handles out-of-order events correctly - we clean up based on real time passage | |
| // not based on event timestamps which may arrive out of order | |
| for (const [hash, storedTimestamp] of this.payloadHashes.entries()) { | |
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | |
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | |
| this.payloadHashes.delete(hash); | |
| } | |
| } |
DEDUPLICATION_FIX_SUMMARY.md
Outdated
| @@ -0,0 +1,318 @@ | |||
| # Event Deduplication Fix - Complete Summary | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Consider removing this development notes file from the repository. Detailed implementation notes like this are typically better suited for PR descriptions, commit messages, or internal documentation rather than being committed to the codebase.
|
bugbot run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR implements a robust event deduplication mechanism that prevents duplicate analytics events from being sent within a 60-second real-time window. The fix addresses four critical bugs: hash clearing on flush, inconsistent time sources for deduplication vs cleanup, incorrect out-of-order event handling, and memory leaks from missing periodic cleanup.
Key improvements:
- Changed from
Set<string>toMap<string, number>to track when each event hash was first seen (usingDate.now()) - Removed
payloadHashes.clear()on flush, enabling cross-flush deduplication - Extracted cleanup logic to separate
cleanupOldHashes()method that uses real elapsed time (Date.now()) consistently - Added periodic cleanup during flush cycles to prevent memory leaks even when events stop arriving
- Improved hash precision from minute-level to second-level for better granularity
- Added comprehensive test suite with proper queue state assertions and time-mocking
Technical correctness:
The implementation correctly handles the separation of concerns: hash generation (event identity), deduplication (duplicate detection), and cleanup (memory management). The use of Date.now() for both storage and cleanup timestamps ensures consistent behavior regardless of event timestamp ordering.
Minor optimization note:
The cleanupOldHashes() method creates a temporary array of hashes to delete (line 274-281) to avoid modifying the Map during iteration. While JavaScript Maps support safe deletion during forEach iteration, this conservative approach is acceptable given cleanup runs infrequently (every flush cycle).
Confidence Score: 4/5
- This PR is safe to merge with minimal risk - the deduplication logic is sound and well-tested
- Score reflects solid implementation with comprehensive tests and correct handling of edge cases. Not a 5 due to: (1) the included 381-line documentation file that should be in PR description instead, and (2) minor cleanup inefficiency using temporary array. The core deduplication logic is correct and addresses real bugs.
- Consider removing
DEDUPLICATION_FIX_SUMMARY.mdfrom the repository - detailed implementation notes like this are better suited for PR descriptions or internal documentation
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 4/5 | Implements robust 60s time-window deduplication with cross-flush persistence and cleanup. Implementation is solid, but temporary array in cleanup adds minor overhead. |
| test/lib/queue/EventQueue.spec.ts | 5/5 | Comprehensive test suite with proper assertions on queue state, time mocking for window expiration tests, and coverage of edge cases including out-of-order events and memory leak prevention. |
| DEDUPLICATION_FIX_SUMMARY.md | 3/5 | 381-line documentation file added to repository. Should likely be in PR description or internal docs instead of committed to codebase. |
Sequence Diagram
sequenceDiagram
participant User
participant SDK
participant EventQueue
participant HashMap as payloadHashes<br/>Map<hash, timestamp>
participant Server
Note over EventQueue: Deduplication Window = 60s
User->>SDK: track("transaction_submit")
SDK->>EventQueue: enqueue(event)
EventQueue->>EventQueue: generateMessageId(event)
Note over EventQueue: Hash = SHA256(event data + <br/>timestamp to second precision)
EventQueue->>EventQueue: isDuplicate(hash)?
EventQueue->>HashMap: cleanupOldHashes()
Note over HashMap: Remove hashes where<br/>Date.now() - stored > 60s
EventQueue->>HashMap: has(hash)?
HashMap-->>EventQueue: false (new event)
EventQueue->>HashMap: set(hash, Date.now())
EventQueue->>EventQueue: queue.push(event)
Note over EventQueue: Event queued successfully
User->>SDK: track("transaction_submit")<br/>(duplicate, 5s later)
SDK->>EventQueue: enqueue(event)
EventQueue->>EventQueue: generateMessageId(event)
Note over EventQueue: Same hash generated
EventQueue->>EventQueue: isDuplicate(hash)?
EventQueue->>HashMap: has(hash)?
HashMap-->>EventQueue: true (seen 5s ago)
EventQueue->>EventQueue: log warning & block
Note over EventQueue: Duplicate blocked ✓
Note over EventQueue: Flush triggered<br/>(queue full or interval)
EventQueue->>EventQueue: flush()
EventQueue->>HashMap: cleanupOldHashes()
EventQueue->>Server: POST /events (batch)
Server-->>EventQueue: 200 OK
Note over HashMap: Hash NOT cleared<br/>Cross-flush persistence ✓
Note over EventQueue: 61 seconds pass...
EventQueue->>HashMap: cleanupOldHashes()
Note over HashMap: Hash expired & removed
User->>SDK: track("transaction_submit")<br/>(same event, after 60s)
SDK->>EventQueue: enqueue(event)
EventQueue->>EventQueue: isDuplicate(hash)?
EventQueue->>HashMap: has(hash)?
HashMap-->>EventQueue: false (expired)
EventQueue->>HashMap: set(hash, Date.now())
EventQueue->>EventQueue: queue.push(event)
Note over EventQueue: Allowed after window ✓
3 files reviewed, 2 comments
src/lib/queue/EventQueue.ts
Outdated
| const hashesToDelete: string[] = []; | ||
| this.payloadHashes.forEach((storedTimestamp, hash) => { | ||
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | ||
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | ||
| hashesToDelete.push(hash); | ||
| } | ||
| }); | ||
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Maps in JavaScript support safe deletion during forEach iteration, so collecting hashes first is unnecessary overhead.
| const hashesToDelete: string[] = []; | |
| this.payloadHashes.forEach((storedTimestamp, hash) => { | |
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | |
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | |
| hashesToDelete.push(hash); | |
| } | |
| }); | |
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); | |
| // Clean up old hashes based on actual elapsed time (Date.now()) | |
| // This handles out-of-order events correctly - we clean up based on real time passage | |
| // not based on event timestamps which may arrive out of order | |
| this.payloadHashes.forEach((storedTimestamp, hash) => { | |
| // storedTimestamp is when we first saw this hash (Date.now() at storage time) | |
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | |
| this.payloadHashes.delete(hash); | |
| } | |
| }); |
DEDUPLICATION_FIX_SUMMARY.md
Outdated
| # Event Deduplication Fix - Complete Summary | ||
|
|
||
| ## Problem Statement | ||
|
|
||
| Users were experiencing an excess of custom events being sent by the Formo Analytics SDK. For example, a single transaction flow (submit → success/error) was generating far more events than expected (e.g., 9000 submits but many more successes/errors). | ||
|
|
||
| ## Root Cause Analysis | ||
|
|
||
| The SDK's deduplication mechanism had **four critical bugs**: | ||
|
|
||
| ### Bug #1: Hash Cleared on Every Flush ❌ | ||
| ```typescript | ||
| // BEFORE: Hash map cleared after every flush | ||
| async flush() { | ||
| // ... send events ... | ||
| this.payloadHashes.clear(); // ❌ Cleared deduplication state! | ||
| } | ||
| ``` | ||
| **Impact**: Duplicate events sent within the same 60-second window would NOT be deduplicated if a flush occurred between them. | ||
|
|
||
| ### Bug #2: Timing Inconsistency ❌ | ||
| ```typescript | ||
| // BEFORE: Inconsistent time sources | ||
| generateMessageId() { | ||
| // Used event.original_timestamp for hash | ||
| } | ||
|
|
||
| isDuplicate() { | ||
| // Used Date.now() for time window | ||
| } | ||
| ``` | ||
| **Impact**: Events with past/future `original_timestamp` values wouldn't be deduplicated correctly. The 60s window was measured from current system time, but the hash was based on the event's timestamp. | ||
|
|
||
| ### Bug #3: Out-of-Order Events ❌ | ||
| ```typescript | ||
| // BEFORE: Cleanup used event timestamp | ||
| isDuplicate(eventId, eventTimestamp) { | ||
| // Clean up using eventTimestamp | ||
| if (eventTimestamp - storedTimestamp > 60s) { | ||
| delete hash; // ❌ Wrong reference point! | ||
| } | ||
| } | ||
| ``` | ||
| **Impact**: When older events arrived after newer ones, cleanup would use the older timestamp as reference, incorrectly deleting hashes that should still be valid. | ||
|
|
||
| **Example**: | ||
| ``` | ||
| 10:31:00 → Event A arrives (timestamp: 10:31:00) | ||
| 10:31:05 → Event B arrives (timestamp: 10:30:00) ← Older! | ||
| Cleanup uses 10:30:00, deletes hashes < 09:29:00 | ||
| But should keep hashes until 09:30:00 based on Event A! | ||
| ``` | ||
|
|
||
| ### Bug #4: Memory Leak - No Cleanup Without Events ❌ | ||
| ```typescript | ||
| // BEFORE: Cleanup only in isDuplicate() | ||
| private async isDuplicate(eventId: string): Promise<boolean> { | ||
| // Clean up old hashes... | ||
| // But this only runs when NEW events arrive! | ||
| } | ||
|
|
||
| async flush() { | ||
| // No cleanup here! | ||
| } | ||
| ``` | ||
| **Impact**: If events stop arriving, expired hashes are never removed from memory. In long-running applications with sporadic event activity, this could accumulate thousands of expired hashes, causing a memory leak. | ||
|
|
||
| **Example**: | ||
| ``` | ||
| 10:30:00 → Event arrives, hash stored | ||
| 10:30:30 → Event arrives, hash stored | ||
| 10:31:00 → Last event arrives, hash stored | ||
| ... 2 hours pass with no events ... | ||
| All 3 hashes still in memory! ❌ | ||
| Should have been cleaned up after 60s | ||
| ``` | ||
|
|
||
| ## Solution Architecture | ||
|
|
||
| ### Core Principle: Separation of Concerns | ||
|
|
||
| 1. **Hash Generation** = Event Identity (uses event data + timestamp) | ||
| 2. **Deduplication** = Duplicate Detection (checks hash equality) | ||
| 3. **Cleanup** = Memory Management (uses real elapsed time) | ||
|
|
||
| ### Implementation | ||
|
|
||
| #### 1. Hash Generation (Event Identity) | ||
| ```typescript | ||
| private async generateMessageId(event: IFormoEvent): Promise<string> { | ||
| // Format timestamp to second precision for consistent hashing | ||
| const date = new Date(event.original_timestamp); | ||
| const formattedTimestamp = | ||
| date.getUTCFullYear() + "-" + | ||
| ("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" + | ||
| ("0" + date.getUTCDate()).slice(-2) + " " + | ||
| ("0" + date.getUTCHours()).slice(-2) + ":" + | ||
| ("0" + date.getUTCMinutes()).slice(-2) + ":" + | ||
| ("0" + date.getUTCSeconds()).slice(-2); | ||
|
|
||
| const eventForHashing = { ...event, original_timestamp: formattedTimestamp }; | ||
| return await hash(JSON.stringify(eventForHashing)); | ||
| } | ||
| ``` | ||
|
|
||
| **Key Points**: | ||
| - Hash based on event data + event timestamp (rounded to seconds) | ||
| - Same event at same time = same hash | ||
| - Different timestamp = different hash (different event) | ||
|
|
||
| #### 2. Deduplication Logic (Duplicate Detection) | ||
| ```typescript | ||
| async enqueue(event: IFormoEvent, callback?: (...args: any) => void) { | ||
| const message_id = await this.generateMessageId(event); | ||
|
|
||
| if (await this.isDuplicate(message_id)) { | ||
| // Duplicate detected - block it | ||
| return; | ||
| } | ||
|
|
||
| // Enqueue the event | ||
| this.queue.push({ message: { ...event, message_id }, callback }); | ||
| } | ||
| ``` | ||
|
|
||
| **Key Points**: | ||
| - Simple hash equality check | ||
| - If seen recently (within 60s real time), block it | ||
|
|
||
| #### 3. Cleanup & Storage (Memory Management) | ||
| ```typescript | ||
| /** | ||
| * Separate cleanup method that can be called from multiple places | ||
| */ | ||
| private cleanupOldHashes(): void { | ||
| const now = Date.now(); | ||
|
|
||
| // CLEANUP: Remove old hashes based on REAL elapsed time | ||
| const hashesToDelete: string[] = []; | ||
| this.payloadHashes.forEach((storedTimestamp, hash) => { | ||
| if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) { | ||
| hashesToDelete.push(hash); | ||
| } | ||
| }); | ||
| hashesToDelete.forEach(hash => this.payloadHashes.delete(hash)); | ||
| } | ||
|
|
||
| private async isDuplicate(eventId: string): Promise<boolean> { | ||
| const now = Date.now(); | ||
|
|
||
| // CLEANUP: Clean up old hashes to prevent memory leaks | ||
| this.cleanupOldHashes(); | ||
|
|
||
| // CHECK: Is this hash in recent memory? | ||
| if (this.payloadHashes.has(eventId)) { | ||
| const storedAt = this.payloadHashes.get(eventId)!; | ||
| const elapsedRealTime = now - storedAt; | ||
| logger.warn(`Duplicate event detected and blocked. Same event was first seen ${Math.round(elapsedRealTime / 1000)}s ago.`); | ||
| return true; | ||
| } | ||
|
|
||
| // STORE: Remember this hash with current real time | ||
| this.payloadHashes.set(eventId, now); | ||
| return false; | ||
| } | ||
| ``` | ||
|
|
||
| **Key Points**: | ||
| - Cleanup extracted to separate `cleanupOldHashes()` method | ||
| - Cleanup based on `Date.now()` (real time elapsed) | ||
| - Storage uses `Date.now()` (when we first saw it) | ||
| - Deduplication window = 60 seconds of real time since first seen | ||
| - Hash map is **never cleared** - only time-based cleanup | ||
|
|
||
| #### 4. Periodic Cleanup on Flush ✅ | ||
| ```typescript | ||
| async flush(callback?: (...args: any) => void) { | ||
| // ... clear timer ... | ||
|
|
||
| // CLEANUP: Run periodic cleanup to prevent memory leaks | ||
| // This ensures cleanup happens even when no new events arrive | ||
| this.cleanupOldHashes(); | ||
|
|
||
| if (!this.queue.length) { | ||
| return Promise.resolve(); | ||
| } | ||
|
|
||
| // ... send events ... | ||
| // Note: payloadHashes is NOT cleared - only time-based cleanup | ||
| } | ||
| ``` | ||
|
|
||
| **Key Points**: | ||
| - Cleanup runs on every flush cycle (default: every 10 seconds) | ||
| - Prevents memory leaks in long-running apps with sporadic events | ||
| - No additional timers needed - leverages existing flush interval | ||
| - Works even when queue is empty | ||
|
|
||
| ## What Changed | ||
|
|
||
| | Aspect | Before | After | | ||
| |--------|--------|-------| | ||
| | **Hash generation** | event.original_timestamp | ✅ event.original_timestamp (same) | | ||
| | **Hash precision** | Minute-level | ✅ **Second-level** | | ||
| | **Hash cleared on flush** | ❌ Yes (cleared) | ✅ **No (preserved)** | | ||
| | **Cleanup time source** | ❌ Inconsistent/event time | ✅ **Date.now() (real time)** | | ||
| | **Storage time source** | ❌ Inconsistent | ✅ **Date.now() (real time)** | | ||
| | **Deduplication window** | ❌ Broken across flushes | ✅ **60s real-time window** | | ||
| | **Out-of-order events** | ❌ Broken | ✅ **Handled correctly** | | ||
| | **Memory leak prevention** | ❌ No cleanup without events | ✅ **Periodic cleanup on flush** | | ||
| | **Test coverage** | ❌ No assertions | ✅ **Comprehensive tests** | | ||
|
|
||
| ## How It Works | ||
|
|
||
| ### Example Flow | ||
|
|
||
| ```typescript | ||
| // Event arrives at real-time 10:31:00 | ||
| const event = { | ||
| type: "track", | ||
| event: "transaction_submit", | ||
| original_timestamp: "2025-01-01T10:30:00Z", | ||
| properties: { type: "swap" } | ||
| } | ||
|
|
||
| // 1. Generate hash (based on event data + timestamp) | ||
| const hash = generateMessageId(event); | ||
| // hash = "abc123..." (includes 10:30:00 in the hash) | ||
|
|
||
| // 2. Check if duplicate | ||
| if (!payloadHashes.has("abc123")) { | ||
| // 3. Store with real time (when first seen) | ||
| payloadHashes.set("abc123", 10:31:00); // Real time! | ||
| // 4. Enqueue | ||
| } | ||
|
|
||
| // 15 seconds later: Same event arrives (10:31:15) | ||
| // Hash is still "abc123" (same event data + timestamp) | ||
| // payloadHashes.has("abc123") = true | ||
| // Elapsed: 10:31:15 - 10:31:00 = 15s < 60s | ||
| // BLOCKED as duplicate ✅ | ||
|
|
||
| // 61 seconds later: Same event arrives (10:32:01) | ||
| // Cleanup runs: 10:32:01 - 10:31:00 = 61s > 60s | ||
| // Hash "abc123" deleted | ||
| // Now allowed through ✅ | ||
| ``` | ||
|
|
||
| ## Edge Cases Handled | ||
|
|
||
| ### ✅ Out-of-Order Events | ||
| ```typescript | ||
| 10:31:00 → Event A (timestamp: 10:31:00) arrives | ||
| 10:31:05 → Event B (timestamp: 10:30:00) arrives ← Older! | ||
|
|
||
| // Different timestamps → different hashes → both allowed | ||
| // Cleanup uses real-time → works correctly | ||
| ``` | ||
|
|
||
| ### ✅ Rapid Duplicates | ||
| ```typescript | ||
| // User clicks button 5 times in 1 second | ||
| for (let i = 0; i < 5; i++) { | ||
| track("transaction_submit", { type: "swap" }); | ||
| } | ||
|
|
||
| // All have same hash | ||
| // First: stored | ||
| // Rest: blocked ✅ | ||
| ``` | ||
|
|
||
| ### ✅ Cross-Flush Persistence | ||
| ```typescript | ||
| 10:30:00 → Event queued (hash stored) | ||
| 10:30:25 → Flush triggered (hash preserved!) | ||
| 10:30:45 → Duplicate blocked (hash still there) | ||
| 10:31:05 → Hash cleaned up (65s elapsed) | ||
| ``` | ||
|
|
||
| ### ✅ Transaction Lifecycle | ||
| ```typescript | ||
| // Real-world scenario | ||
| 10:30:00 → transaction_submit (allowed) | ||
| 10:30:00 → transaction_submit (blocked - duplicate!) | ||
| 10:30:05 → transaction_success (allowed - different event) | ||
| 10:30:05 → transaction_success (blocked - duplicate!) | ||
| ``` | ||
|
|
||
| ## Test Coverage | ||
|
|
||
| Comprehensive test suite added with **proper assertions**: | ||
|
|
||
| 1. ✅ **Basic deduplication** - Same event within same second | ||
| 2. ✅ **Different events allowed** - Different properties/names | ||
| 3. ✅ **Time-based window** - 60s real-time deduplication | ||
| 4. ✅ **Cross-flush persistence** - Hashes survive flushes | ||
| 5. ✅ **Out-of-order events** - Older events after newer | ||
| 6. ✅ **Window expiration** - Events allowed after 60s | ||
| 7. ✅ **Hash cleanup** - Old hashes removed | ||
| 8. ✅ **Memory leak prevention** - Cleanup during flush even with no events | ||
| 9. ✅ **Queue length assertions** - Verify actual queue state | ||
| 10. ✅ **Real-world scenarios** - Transaction lifecycles | ||
|
|
||
| ## Files Modified | ||
|
|
||
| ### Implementation | ||
| - `src/lib/queue/EventQueue.ts` - Core deduplication logic | ||
| - Simplified `generateMessageId()` to return just the hash | ||
| - Extracted cleanup logic into separate `cleanupOldHashes()` method | ||
| - Modified `isDuplicate()` to use `cleanupOldHashes()` | ||
| - Modified `flush()` to call `cleanupOldHashes()` periodically | ||
| - Removed `this.payloadHashes.clear()` from `flush()` | ||
| - Added detailed logging for duplicate detection | ||
|
|
||
| ### Tests | ||
| - `test/lib/queue/EventQueue.spec.ts` - Comprehensive test suite | ||
| - Added `sinon` stubs for logger verification | ||
| - Added queue length assertions | ||
| - Added window expiration test with `Date.now()` mock | ||
| - Added out-of-order event tests | ||
| - Added memory leak prevention test (cleanup during flush) | ||
| - Added real-world scenario tests | ||
|
|
||
| ## Build Status | ||
|
|
||
| ```bash | ||
| ✅ TypeScript compilation: Success | ||
| ✅ Linter: 0 errors | ||
| ✅ Build: Success (CJS, ESM, UMD) | ||
| ✅ Bundle size: 128 KB (no increase) | ||
| ✅ Tests: All passing | ||
| ``` | ||
|
|
||
| ## Impact | ||
|
|
||
| ### Before Fix | ||
| - ❌ Duplicate events sent frequently | ||
| - ❌ 9000 submits could generate many more success/error events | ||
| - ❌ Deduplication broken across flushes | ||
| - ❌ Out-of-order events handled incorrectly | ||
| - ❌ Unpredictable behavior | ||
|
|
||
| ### After Fix | ||
| - ✅ Robust deduplication within 60s window | ||
| - ✅ Consistent behavior across flushes | ||
| - ✅ Correct handling of all edge cases | ||
| - ✅ Predictable real-time-based deduplication | ||
| - ✅ Production-ready | ||
|
|
||
| ## Key Takeaways | ||
|
|
||
| 1. **Hash = Event Identity**: Based on event data + timestamp | ||
| 2. **Window = Real Time**: 60 seconds since we first saw the hash | ||
| 3. **Cleanup = Memory Management**: Remove hashes after 60s of real time | ||
| 4. **Never Clear Hash Map**: Only time-based cleanup, no manual clearing | ||
| 5. **Periodic Cleanup**: Runs on flush to prevent memory leaks | ||
|
|
||
| ## Deduplication Semantics | ||
|
|
||
| **"An event is considered a duplicate if it has the same data and timestamp as an event we've seen within the last 60 seconds of real time."** | ||
|
|
||
| This means: | ||
| - Same event can be sent again after 60 seconds ✅ | ||
| - Different timestamps = different events ✅ | ||
| - Works with out-of-order events ✅ | ||
| - Persists across flushes ✅ | ||
| - Memory-efficient with automatic cleanup ✅ | ||
|
|
||
| --- | ||
|
|
||
| ## All Four Bugs Fixed ✅ | ||
|
|
||
| 1. ✅ **Bug #1 - Hash Cleared on Flush**: Hash map now preserved, cleaned only by time | ||
| 2. ✅ **Bug #2 - Timing Inconsistency**: Cleanup uses `Date.now()` consistently | ||
| 3. ✅ **Bug #3 - Out-of-Order Events**: Cleanup based on real time, not event timestamps | ||
| 4. ✅ **Bug #4 - Memory Leak**: Periodic cleanup on flush prevents hash accumulation | ||
|
|
||
| **Status**: ✅ **PRODUCTION READY** | ||
|
|
||
| All critical issues identified and fixed. Comprehensive test coverage added. Memory leak prevention implemented. Ready for deployment. | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Remove this 381-line documentation file from the repository. While comprehensive and well-written, this level of implementation detail belongs in the PR description or internal documentation, not committed to the codebase.
|
bugbot run |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
This PR replaces the simple Set-based deduplication with a time-windowed approach using a Map<string, number> to track event hashes with Date.now() timestamps, enabling 60-second deduplication windows that persist across flush cycles.
Key Changes:
- Message IDs now use second-precision timestamps (previously minute-precision), allowing events in different seconds to be distinguished while still catching rapid duplicates within the same second
- Deduplication window set to 60 seconds using real-time tracking (
Date.now()), not event timestamps, which correctly handles out-of-order events - Hash cleanup runs during both
isDuplicate()calls andflush()to prevent memory leaks - Comprehensive test suite with 453 lines covering basic, time-based, cross-flush deduplication, and edge cases including proper
Date.now()stubbing
Implementation Quality:
The core logic is sound and correctly addresses the out-of-order event concern from previous comments. The cleanup mechanism using Date.now() for storage timestamps ensures that events arriving out of chronological order won't cause incorrect cleanup. Tests are thorough and well-structured with proper mocking.
Confidence Score: 4/5
- This PR is safe to merge with minor performance considerations.
- The deduplication logic is correctly implemented with proper time-window tracking using Date.now(). Tests comprehensively cover edge cases including out-of-order events and time-based expiration. One minor concern is that cleanupOldHashes() runs on every enqueue() call, which could add overhead during high-traffic periods, though the impact should be minimal given Maps iterate quickly and the hash count stays bounded by the 60s window.
- No files require special attention - implementation and tests are solid.
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 4/5 | Replaces Set-based deduplication with time-windowed Map approach using Date.now() for 60s windows; adds second-precision message IDs and periodic cleanup. Logic is sound but cleanup runs on every enqueue. |
| test/lib/queue/EventQueue.spec.ts | 5/5 | Comprehensive test suite covering basic/time-based/cross-flush deduplication, cleanup, message ID generation, and real-world scenarios with proper Date.now() stubbing. Well-structured. |
Sequence Diagram
sequenceDiagram
participant Client
participant EventQueue
participant Hash Storage
participant API
Client->>EventQueue: enqueue(event1, timestamp: T0)
EventQueue->>EventQueue: generateMessageId(event1)
Note over EventQueue: Hash = SHA256(event1 + "2025-01-01 10:30:00")
EventQueue->>EventQueue: isDuplicate(Hash)
EventQueue->>Hash Storage: cleanupOldHashes()
Note over Hash Storage: Remove hashes where<br/>Date.now() - storedAt > 60s
EventQueue->>Hash Storage: has(Hash)?
Hash Storage-->>EventQueue: false
EventQueue->>Hash Storage: set(Hash, Date.now())
EventQueue->>EventQueue: queue.push(event1)
Note over EventQueue: Queue length: 1
Client->>EventQueue: enqueue(event2, same data, timestamp: T0+0.5s)
EventQueue->>EventQueue: generateMessageId(event2)
Note over EventQueue: Hash = SHA256(event2 + "2025-01-01 10:30:00")<br/>(same second = same hash)
EventQueue->>EventQueue: isDuplicate(Hash)
EventQueue->>Hash Storage: cleanupOldHashes()
EventQueue->>Hash Storage: has(Hash)?
Hash Storage-->>EventQueue: true (stored 0.5s ago)
EventQueue->>EventQueue: logger.warn("Duplicate detected, 0s ago")
EventQueue-->>Client: return (blocked)
Note over EventQueue: Time passes... 61 seconds
Client->>EventQueue: enqueue(event3, same data, timestamp: T0)
EventQueue->>EventQueue: generateMessageId(event3)
Note over EventQueue: Hash = same as before
EventQueue->>EventQueue: isDuplicate(Hash)
EventQueue->>Hash Storage: cleanupOldHashes()
Note over Hash Storage: Date.now() - storedAt > 60s<br/>Delete Hash
EventQueue->>Hash Storage: has(Hash)?
Hash Storage-->>EventQueue: false (cleaned up)
EventQueue->>Hash Storage: set(Hash, Date.now())
EventQueue->>EventQueue: queue.push(event3)
Note over EventQueue: Queue length: 2
Note over EventQueue: Queue reaches flushAt threshold
EventQueue->>EventQueue: flush()
EventQueue->>Hash Storage: cleanupOldHashes()
EventQueue->>API: POST /events (batch)
API-->>EventQueue: 200 OK
Note over EventQueue: Hashes NOT cleared<br/>(maintained for 60s window)
4 files reviewed, 1 comment
| private async isDuplicate(eventId: string): Promise<boolean> { | ||
| const now = Date.now(); | ||
|
|
||
| // Clean up old hashes to prevent memory leaks | ||
| this.cleanupOldHashes(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Cleanup runs on every enqueue() call. With high event throughput, this adds O(n) overhead per event. Consider throttling cleanup to run only every N seconds or M events to reduce overhead during bursts.
Example: Track last cleanup time and only run if now - lastCleanup > 10000 (10s)
| flushAt: 20, | ||
| flushInterval: 30000, // 30 seconds | ||
| maxQueueSize: 1024 * 500, // 500kB | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Greptile Overview
Greptile Summary
Implements robust 60-second time-windowed event deduplication to prevent duplicate events from being queued.
Key Changes:
- Changed
payloadHashesfromSet<string>toMap<string, number>to track first-seen timestamps usingDate.now() - Implemented
cleanupOldHashes()with 10-second throttling to remove expired hashes and prevent memory leaks - Modified
generateMessageId()to use second-level timestamp precision (previously minute-level) - Preserved deduplication state across flush cycles (no longer clearing
payloadHasheson flush) - Added comprehensive test suite with 488 lines covering basic deduplication, time windows, cross-flush behavior, cleanup throttling, and real-world scenarios
Major Fix from Previous Review:
The critical out-of-order event bug has been resolved. Cleanup now correctly uses Date.now() for real-time tracking instead of event timestamps, preventing premature deletion of valid hashes when older events arrive after newer ones.
Confidence Score: 4/5
- Safe to merge with minor considerations - implementation is sound with comprehensive tests, but edge cases around Map.forEach deletion should be monitored in production
- Score reflects solid implementation that addresses the critical out-of-order event bug from previous review. The use of Date.now() for cleanup is correct, throttling reduces overhead, and test coverage is excellent. Deducting one point because Map.forEach with deletion during iteration, while documented as safe in JavaScript, is an uncommon pattern that warrants production monitoring
- No files require special attention - the EventQueue.ts implementation correctly handles edge cases and has comprehensive test coverage
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| src/lib/queue/EventQueue.ts | 4/5 | Implements time-windowed deduplication with Date.now()-based cleanup. Previous out-of-order event issue has been fixed by using real-time tracking. Throttled cleanup reduces overhead during high throughput. |
| test/lib/queue/EventQueue.spec.ts | 5/5 | Comprehensive test suite covering deduplication, time windows, cross-flush behavior, cleanup, throttling, and real-world scenarios. Tests verify hash expiration after 60s window. |
Sequence Diagram
sequenceDiagram
participant Client
participant EventQueue
participant HashStore as payloadHashes Map
participant Logger
Client->>EventQueue: enqueue(event)
EventQueue->>EventQueue: generateMessageId(event)
Note over EventQueue: Format timestamp to second precision<br/>(YYYY-MM-DD HH:mm:ss)
EventQueue->>EventQueue: hash(event + timestamp)
EventQueue->>EventQueue: isDuplicate(hash)
EventQueue->>EventQueue: cleanupOldHashes() [throttled]
Note over EventQueue: Check if Date.now() - lastCleanupTime > 10s
alt Cleanup needed
EventQueue->>HashStore: forEach(hash, storedTimestamp)
loop For each hash
alt now - storedTimestamp > 60s
EventQueue->>HashStore: delete(hash)
end
end
end
EventQueue->>HashStore: has(hash)?
alt Hash exists (duplicate)
HashStore-->>EventQueue: true
EventQueue->>Logger: warn("Duplicate detected")
EventQueue-->>Client: return (blocked)
else Hash not found (new event)
HashStore-->>EventQueue: false
EventQueue->>HashStore: set(hash, Date.now())
EventQueue->>EventQueue: queue.push(event)
EventQueue->>Logger: log("Event enqueued")
EventQueue-->>Client: success
end
Note over EventQueue: On flush()
EventQueue->>EventQueue: cleanupOldHashes(force=true)
Note over EventQueue: Force cleanup bypasses throttle<br/>payloadHashes NOT cleared<br/>Deduplication persists across flushes
4 files reviewed, no comments
Note
Implements 60s time-windowed deduplication with timestamped hash tracking and cleanup, preserves dedup across flushes, and adds comprehensive tests; bumps version to 1.22.1.
payloadHashes: Map<hash, timestamp>with throttled cleanup (cleanupOldHashes, 10s throttle).payloadHashesonflush; perform forced cleanup duringflush).generateMessageIdto second-level timestamp precision; awaithash.isDuplicateto use real-time tracking, log detailed warnings, and store first-seen time.test/lib/queue/EventQueue.spec.tscovering basic/time-based deduplication, cross-flush behavior, cleanup/throttling, message ID generation, and real-world scenarios.1.22.1.Written by Cursor Bugbot for commit fba8540. This will update automatically on new commits. Configure here.