Skip to content

Conversation

@yosriady
Copy link
Contributor

@yosriady yosriady commented Oct 24, 2025

Note

Implements 60s time-windowed deduplication with timestamped hash tracking and cleanup, preserves dedup across flushes, and adds comprehensive tests; bumps version to 1.22.1.

  • EventQueue:
    • Implement 60s time-windowed deduplication using payloadHashes: Map<hash, timestamp> with throttled cleanup (cleanupOldHashes, 10s throttle).
    • Preserve deduplication across flushes (no longer clearing payloadHashes on flush; perform forced cleanup during flush).
    • Improve generateMessageId to second-level timestamp precision; await hash.
    • Enhance isDuplicate to use real-time tracking, log detailed warnings, and store first-seen time.
  • Tests:
    • Add test/lib/queue/EventQueue.spec.ts covering basic/time-based deduplication, cross-flush behavior, cleanup/throttling, message ID generation, and real-world scenarios.
  • Version:
    • Bump to 1.22.1.

Written by Cursor Bugbot for commit fba8540. This will update automatically on new commits. Configure here.

@linear
Copy link

linear bot commented Oct 24, 2025

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR fixes event deduplication to work across flush cycles by implementing time-based hash cleanup with a 60-second window.

Key Changes:

  • Changed payloadHashes from Set<string> to Map<string, number> to track timestamps
  • Improved timestamp precision from minute-level to second-level in hash generation
  • Removed payloadHashes.clear() from flush() to maintain deduplication across flush cycles
  • Added automatic cleanup of hashes older than 60 seconds in isDuplicate()

What This Fixes:
Previously, duplicate events could be sent if they occurred across different flush cycles (which happen every 10-30 seconds or when 20 events are queued). The hash set was cleared on every flush, breaking deduplication. Now, events with identical content within a 60-second window are properly deduplicated regardless of flush cycles.

Issues Found:

  • Critical: Timing inconsistency between generateMessageId() (uses event.original_timestamp) and isDuplicate() (uses Date.now()). If events have past/future timestamps, deduplication window won't work correctly.
  • Minor: Unused import toDateHourMinute should be removed
  • Test Quality: Tests lack proper assertions to verify deduplication behavior

Documentation:
Excellent documentation added with multiple guides explaining the fix, debugging steps, and visual diagrams.

Confidence Score: 3/5

  • This PR has a critical timing inconsistency that could cause the deduplication to fail in production scenarios
  • The core deduplication logic has a significant issue: generateMessageId() formats timestamps from event.original_timestamp for hashing, while isDuplicate() uses Date.now() for the time window. This mismatch means events with past/future original_timestamp values won't be deduplicated correctly. The 60s window is measured from the current system time, but the hash is based on the event's timestamp, creating an inconsistency. Additionally, tests don't verify actual deduplication behavior (no assertions on queue state or logger calls).
  • src/lib/queue/EventQueue.ts requires attention to fix the timing inconsistency between hash generation and duplicate detection. test/lib/queue/EventQueue.spec.ts needs better assertions to actually verify deduplication works.

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 4/5 Implements time-based deduplication with 60s window and second-precision timestamps. Found one potential timing inconsistency issue.
test/lib/queue/EventQueue.spec.ts 3/5 Comprehensive test suite but tests don't properly verify behavior due to missing assertions on queue state.

Sequence Diagram

sequenceDiagram
    participant App as Application
    participant EQ as EventQueue
    participant Hash as payloadHashes Map
    participant API as Analytics API

    Note over App,API: Event Submission Flow with Deduplication

    App->>EQ: enqueue(event1, timestamp: 10:30:15)
    EQ->>EQ: generateMessageId(event1)
    Note over EQ: Format timestamp to second precision<br/>(2025-01-01 10:30:15)
    EQ->>EQ: hash(event + formatted timestamp)
    EQ->>EQ: isDuplicate(hash_abc)
    EQ->>Hash: Clean up hashes > 60s old
    EQ->>Hash: has(hash_abc)?
    Hash-->>EQ: false
    EQ->>Hash: set(hash_abc, now=10:30:15)
    EQ->>EQ: queue.push(event1)
    Note over EQ: Queue size: 1

    Note over EQ: ... 10 seconds later ...

    App->>EQ: enqueue(event2, timestamp: 10:30:25)<br/>(same content as event1)
    EQ->>EQ: generateMessageId(event2)
    EQ->>EQ: hash(event + formatted timestamp)
    EQ->>EQ: isDuplicate(hash_abc)
    EQ->>Hash: Clean up hashes > 60s old
    Note over Hash: No hashes to clean (all < 60s)
    EQ->>Hash: has(hash_abc)?
    Hash-->>EQ: true (stored at 10:30:15)
    Note over EQ: Duplicate detected!<br/>Time since last: 10s
    EQ->>EQ: logger.warn("Duplicate blocked, sent 10s ago")
    EQ-->>App: return (blocked)

    Note over EQ: ... Queue reaches 20 events ...

    EQ->>EQ: flush()
    Note over EQ,Hash: Hash map NOT cleared<br/>(preserves deduplication)
    EQ->>API: POST /events (batch)
    API-->>EQ: 200 OK

    Note over EQ: ... 50 seconds after event1 ...

    App->>EQ: enqueue(event3, timestamp: 10:31:05)<br/>(same content as event1)
    EQ->>EQ: generateMessageId(event3)
    EQ->>EQ: isDuplicate(hash_abc)
    EQ->>Hash: Clean up hashes > 60s old
    Note over Hash: hash_abc is 50s old, keep it
    EQ->>Hash: has(hash_abc)?
    Hash-->>EQ: true
    EQ->>EQ: logger.warn("Duplicate blocked, sent 50s ago")
    EQ-->>App: return (blocked)

    Note over EQ: ... 70 seconds after event1 ...

    App->>EQ: enqueue(event4, timestamp: 10:31:25)<br/>(same content as event1)
    EQ->>EQ: generateMessageId(event4)
    EQ->>EQ: isDuplicate(hash_abc)
    EQ->>Hash: Clean up hashes > 60s old
    Note over Hash: hash_abc is 70s old, DELETE
    EQ->>Hash: delete(hash_abc)
    EQ->>Hash: has(hash_abc)?
    Hash-->>EQ: false
    EQ->>Hash: set(hash_abc, now=10:31:25)
    EQ->>EQ: queue.push(event4)
    Note over EQ: Allowed (outside 60s window)
Loading

Additional Comments (2)

  1. src/lib/queue/EventQueue.ts, line 112-126 (link)

    logic: Timing inconsistency: generateMessageId() uses event.original_timestamp to format the timestamp for hashing, but isDuplicate() uses Date.now() for cleanup. If events arrive with past/future original_timestamp values, the deduplication window won't work as expected.

    For example, if an event has original_timestamp from 2 minutes ago, it will generate a hash based on that old time, but isDuplicate() will compare against the current time, potentially allowing duplicates or incorrectly blocking events.

  2. src/lib/queue/EventQueue.ts, line 8 (link)

    style: toDateHourMinute import is no longer used after the fix (replaced with inline second-precision formatting).

7 files reviewed, 3 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 49 to 69
it("should block duplicate events sent within the same second", async () => {
const event1 = createMockEvent({
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:15.123Z",
});

const event2 = createMockEvent({
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:15.456Z", // Same second, different milliseconds
});

// Enqueue first event - should succeed
await eventQueue.enqueue(event1);

// Enqueue duplicate event - should be blocked
await eventQueue.enqueue(event2);

// We can't directly check the queue size, but we can verify
// that the second call doesn't throw and returns silently
// In a real test, you'd want to spy on logger.warn to verify the warning
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Tests lack assertions - they only verify that enqueue() doesn't throw, but don't actually verify the queue state or that events were blocked. Consider adding test helpers to check queue size or spy on logger.warn to verify deduplication behavior.

@yosriady
Copy link
Contributor Author

bugbot run

cursor[bot]

This comment was marked as outdated.

@yosriady yosriady changed the title [P-1570] More robust event deduplication (WIP) [P-1570] Add more robust event deduplication Oct 24, 2025
@yosriady yosriady changed the title [P-1570] Add more robust event deduplication [P-1570] Fix: Add more robust event deduplication Oct 24, 2025
@yosriady
Copy link
Contributor Author

@greptile

@yosriady
Copy link
Contributor Author

bugbot run

cursor[bot]

This comment was marked as outdated.

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR fixes event deduplication by switching from minute-level to second-level timestamp precision and maintaining hashes across flush cycles. The implementation now uses a consistent 60-second window and removes the problematic payloadHashes.clear() on flush.

Key improvements:

  • Changed generateMessageId() to return both hash and event timestamp for consistent time-based comparisons
  • Updated isDuplicate() to use event timestamps instead of Date.now(), fixing timing inconsistencies
  • Hash map now persists across flush cycles with automatic cleanup of entries older than 60 seconds
  • Comprehensive test suite added with proper assertions using sinon stubs

Critical issue found:

  • The cleanup logic in isDuplicate() (lines 268-276) has a bug when events arrive out-of-order. It uses the current event's timestamp to determine which hashes to delete, which can incorrectly remove valid hashes when an older event arrives after a newer one. The cleanup should use a monotonically increasing timestamp reference rather than the current event's timestamp.

Confidence Score: 2/5

  • This PR contains a critical logic bug in the cleanup mechanism that could cause incorrect behavior with out-of-order events
  • While the PR significantly improves the deduplication system by fixing the timing inconsistency and adding comprehensive tests, there is a critical bug in the cleanup logic (lines 268-276) that uses the current event's timestamp to delete old hashes. This will fail when events arrive out-of-order, potentially deleting hashes that are still within the deduplication window from the perspective of later events. The tests don't cover this scenario.
  • Pay close attention to src/lib/queue/EventQueue.ts - the isDuplicate() method's cleanup logic needs to be fixed to handle out-of-order events correctly

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 2/5 Improved deduplication with timestamp-based window and second-level precision, but has critical out-of-order event handling bug in cleanup logic that could incorrectly delete valid hashes
test/lib/queue/EventQueue.spec.ts 4/5 Comprehensive test suite added with proper assertions using sinon stubs, covers most deduplication scenarios but missing out-of-order event test case

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventQueue
    participant generateMessageId
    participant isDuplicate
    participant payloadHashes
    participant APIServer

    Client->>EventQueue: enqueue(event)
    EventQueue->>generateMessageId: generateMessageId(event)
    Note over generateMessageId: Extract event.original_timestamp
    Note over generateMessageId: Format to second precision<br/>(YYYY-MM-DD HH:mm:ss)
    Note over generateMessageId: Hash event with formatted timestamp
    generateMessageId-->>EventQueue: { hash, timestamp }
    
    EventQueue->>isDuplicate: isDuplicate(hash, timestamp)
    
    Note over isDuplicate: Cleanup Phase
    loop For each stored hash
        isDuplicate->>payloadHashes: Get stored timestamp
        alt timestamp - stored > 60s
            isDuplicate->>payloadHashes: Delete old hash
        end
    end
    
    Note over isDuplicate: Duplicate Check
    isDuplicate->>payloadHashes: Check if hash exists
    alt Hash exists
        Note over isDuplicate: Calculate time since last event
        isDuplicate-->>EventQueue: true (duplicate)
        EventQueue-->>Client: return (event blocked)
    else Hash does not exist
        isDuplicate->>payloadHashes: Store hash with timestamp
        isDuplicate-->>EventQueue: false (not duplicate)
        EventQueue->>EventQueue: Add to queue
        
        alt Queue reaches flushAt or maxQueueSize
            EventQueue->>APIServer: flush() - POST events
            Note over EventQueue: payloadHashes NOT cleared<br/>(maintains deduplication)
            APIServer-->>EventQueue: Success
        end
        
        EventQueue-->>Client: Event enqueued
    end
Loading

3 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines 268 to 276
// Clean up old hashes that are outside the deduplication window
// Use eventTimestamp (not Date.now()) to ensure consistency with hash generation
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
if (eventTimestamp - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Cleanup logic can incorrectly delete valid hashes when events arrive out-of-order. If an older event (timestamp T1) arrives after a newer event (timestamp T2), the cleanup uses T1 to determine which hashes to delete. This means any hash with timestamp < T1 - 60s gets deleted, even though from T2's perspective those hashes might still be within the 60s window.

Example: Event at 10:31:00 arrives first (stored), then event at 10:30:00 arrives. The cleanup at line 272 uses 10:30:00 as eventTimestamp, so it deletes any hash with timestamp < 09:29:00. This is incorrect because we should use the maximum timestamp seen, not the current event's timestamp.

Consider tracking the latest timestamp separately or using Date.now() for cleanup only (while still using event timestamps for deduplication comparison).

@yosriady
Copy link
Contributor Author

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

Refactored event deduplication to use a timestamped hash map with a 60-second real-time window, fixing three critical issues: hash clearing on flush (breaking cross-flush deduplication), timing inconsistency between hash generation and cleanup, and incorrect cleanup with out-of-order events. The solution now properly maintains deduplication state across flush cycles by storing hashes with Date.now() timestamps and cleaning them based on real elapsed time rather than event timestamps.

Key Changes:

  • Changed payloadHashes from Set<string> to Map<string, number> mapping hash to Date.now() timestamp
  • Removed payloadHashes.clear() from flush() method to maintain deduplication across flush cycles
  • Updated isDuplicate() to clean up old hashes based on real-time window (Date.now() - stored > 60s)
  • Enhanced hash generation to use second-level precision while returning both hash and timestamp
  • Added comprehensive test suite covering same-second duplicates, cross-flush behavior, out-of-order events, and real-world scenarios

Issues Found:

  • 8 documentation markdown files should be removed (development notes, not production docs)
  • Tests only verify logger calls, not actual queue state
  • Missing test for deduplication window expiration (events allowed after 60s)
  • Unused timestamp return value from generateMessageId()
  • Cleanup logic uses temporary array instead of direct deletion

Confidence Score: 4/5

  • This PR is safe to merge with minor cleanup - the core deduplication logic is sound and fixes critical bugs
  • Score reflects solid implementation of deduplication fix that addresses real production issues (cross-flush deduplication, out-of-order events). However, the PR includes 8 documentation files that should be removed, tests could be more thorough (missing window expiration test, only checking logger calls), and there are minor code quality improvements possible. The core logic is correct and the fix is important, but cleanup is recommended before merge.
  • Remove all 8 documentation .md files (FINAL_FIX_SUMMARY.md, OUT_OF_ORDER_FIX.md, etc.) before merging. Test file should add assertions on queue state and window expiration behavior.

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 4/5 Refactored deduplication to use timestamped hash map with 60s window, removing flush-time clearing for cross-flush persistence. Implementation is sound but has minor issues.
test/lib/queue/EventQueue.spec.ts 3/5 Comprehensive test suite added covering deduplication scenarios, but tests only verify logger calls rather than actual queue state, and missing critical window expiration test.

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventQueue
    participant isDuplicate
    participant payloadHashes as Map<hash,timestamp>
    participant Queue as queue[]
    
    Client->>EventQueue: enqueue(event1, timestamp: 10:30:15.123Z)
    EventQueue->>EventQueue: generateMessageId(event1)
    Note over EventQueue: Hash includes timestamp<br/>rounded to seconds:<br/>"2025-01-01 10:30:15"
    EventQueue->>EventQueue: hash = "abc123..."
    EventQueue->>isDuplicate: isDuplicate("abc123")
    isDuplicate->>isDuplicate: now = Date.now() = T0
    isDuplicate->>payloadHashes: Clean up old hashes<br/>(now - stored > 60s)
    isDuplicate->>payloadHashes: has("abc123")?
    payloadHashes-->>isDuplicate: false
    isDuplicate->>payloadHashes: set("abc123", T0)
    isDuplicate-->>EventQueue: false (not duplicate)
    EventQueue->>Queue: push(event1)
    EventQueue-->>Client: Event enqueued
    
    Client->>EventQueue: enqueue(event2, timestamp: 10:30:15.456Z)
    Note over Client: Same second, different ms<br/>Same event data
    EventQueue->>EventQueue: generateMessageId(event2)
    Note over EventQueue: Hash rounds to same second:<br/>"2025-01-01 10:30:15"
    EventQueue->>EventQueue: hash = "abc123..." (SAME!)
    EventQueue->>isDuplicate: isDuplicate("abc123")
    isDuplicate->>isDuplicate: now = Date.now() = T0 + 0.3s
    isDuplicate->>payloadHashes: Clean up old hashes
    isDuplicate->>payloadHashes: has("abc123")?
    payloadHashes-->>isDuplicate: true (stored at T0)
    isDuplicate->>isDuplicate: elapsed = (T0+0.3s) - T0 = 0.3s
    isDuplicate-->>EventQueue: true (DUPLICATE)
    EventQueue->>EventQueue: logger.warn("Duplicate detected")
    EventQueue-->>Client: Event blocked
    
    Note over EventQueue: Queue reaches flushAt=20
    EventQueue->>EventQueue: flush()
    Note over EventQueue: payloadHashes NOT cleared<br/>(preserves dedup state)
    EventQueue->>EventQueue: Send batch to API
    
    Client->>EventQueue: enqueue(event3, timestamp: 10:30:15.789Z)
    Note over Client: After flush, same second
    EventQueue->>isDuplicate: isDuplicate("abc123")
    isDuplicate->>payloadHashes: has("abc123")?
    payloadHashes-->>isDuplicate: true (still in memory)
    isDuplicate-->>EventQueue: true (DUPLICATE)
    EventQueue-->>Client: Event blocked (cross-flush dedup works!)
Loading

4 files reviewed, 5 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 73 to 97
it("should block duplicate events sent within the same second", async () => {
const event1 = createMockEvent({
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:15.123Z",
});

const event2 = createMockEvent({
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:15.456Z", // Same second, different milliseconds
});

// Enqueue first event - should succeed
await eventQueue.enqueue(event1);
expect(loggerLogStub.calledOnce).to.be.true;
expect(loggerWarnStub.called).to.be.false;

// Enqueue duplicate event - should be blocked
await eventQueue.enqueue(event2);

// Verify duplicate was detected
expect(loggerWarnStub.calledOnce).to.be.true;
expect(loggerWarnStub.firstCall.args[0]).to.include("Duplicate event detected");
// Only one "Event enqueued" log (first event only)
expect(loggerLogStub.calledOnce).to.be.true;
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Tests only verify logger output, not actual queue behavior. Consider adding assertions on queue length.

Suggested change
it("should block duplicate events sent within the same second", async () => {
const event1 = createMockEvent({
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:15.123Z",
});
const event2 = createMockEvent({
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:15.456Z", // Same second, different milliseconds
});
// Enqueue first event - should succeed
await eventQueue.enqueue(event1);
expect(loggerLogStub.calledOnce).to.be.true;
expect(loggerWarnStub.called).to.be.false;
// Enqueue duplicate event - should be blocked
await eventQueue.enqueue(event2);
// Verify duplicate was detected
expect(loggerWarnStub.calledOnce).to.be.true;
expect(loggerWarnStub.firstCall.args[0]).to.include("Duplicate event detected");
// Only one "Event enqueued" log (first event only)
expect(loggerLogStub.calledOnce).to.be.true;
});
// Enqueue first event - should succeed
await eventQueue.enqueue(event1);
expect(loggerLogStub.calledOnce).to.be.true;
expect(loggerWarnStub.called).to.be.false;
expect((eventQueue as any).queue.length).to.equal(1);
// Enqueue duplicate event - should be blocked
await eventQueue.enqueue(event2);
// Verify duplicate was detected
expect(loggerWarnStub.calledOnce).to.be.true;
expect(loggerWarnStub.firstCall.args[0]).to.include("Duplicate event detected");
// Only one "Event enqueued" log (first event only)
expect(loggerLogStub.calledOnce).to.be.true;
expect((eventQueue as any).queue.length).to.equal(1);

Comment on lines 1 to 10
# Final Deduplication Fix - Complete Summary

## 🎯 Three Critical Issues Identified and Fixed

Through thorough code review, three critical bugs were found and fixed in the deduplication system:

### 1. ✅ Original Issue: Hash Cleared on Flush
**Problem**: `payloadHashes.clear()` on every flush broke deduplication across flush cycles
**Status**: FIXED - Hashes now persist with time-based cleanup

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Remove documentation files from the PR. These 8 markdown files (FINAL_FIX_SUMMARY.md, OUT_OF_ORDER_FIX.md, DEDUPLICATION_FIX.md, CRITICAL_FIX_APPLIED.md, CHANGES_SUMMARY.md, FIX_SUMMARY.md, DEBUGGING_GUIDE.md, DEDUPLICATION_VISUAL.md) should not be committed to the repository - they appear to be development notes.

Comment on lines 113 to 131
private async generateMessageId(event: IFormoEvent): Promise<{ hash: string; timestamp: number }> {
// Format timestamp to second precision (YYYY-MM-DD HH:mm:ss) for better deduplication
const date = new Date(event.original_timestamp);
const eventTimestamp = date.getTime(); // Get timestamp in milliseconds

const formattedTimestamp =
date.getUTCFullYear() + "-" +
("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" +
("0" + date.getUTCDate()).slice(-2) + " " +
("0" + date.getUTCHours()).slice(-2) + ":" +
("0" + date.getUTCMinutes()).slice(-2) + ":" +
("0" + date.getUTCSeconds()).slice(-2);

const eventForHashing = { ...event, original_timestamp: formattedTimestamp };
const eventString = JSON.stringify(eventForHashing);
return hash(eventString);
const hashValue = await hash(eventString);

return { hash: hashValue, timestamp: eventTimestamp };
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The timestamp return value from generateMessageId() is never used. Consider removing it or documenting why it's returned.

Suggested change
private async generateMessageId(event: IFormoEvent): Promise<{ hash: string; timestamp: number }> {
// Format timestamp to second precision (YYYY-MM-DD HH:mm:ss) for better deduplication
const date = new Date(event.original_timestamp);
const eventTimestamp = date.getTime(); // Get timestamp in milliseconds
const formattedTimestamp =
date.getUTCFullYear() + "-" +
("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" +
("0" + date.getUTCDate()).slice(-2) + " " +
("0" + date.getUTCHours()).slice(-2) + ":" +
("0" + date.getUTCMinutes()).slice(-2) + ":" +
("0" + date.getUTCSeconds()).slice(-2);
const eventForHashing = { ...event, original_timestamp: formattedTimestamp };
const eventString = JSON.stringify(eventForHashing);
return hash(eventString);
const hashValue = await hash(eventString);
return { hash: hashValue, timestamp: eventTimestamp };
}
private async generateMessageId(event: IFormoEvent): Promise<string> {
// Format timestamp to second precision (YYYY-MM-DD HH:mm:ss) for better deduplication
const date = new Date(event.original_timestamp);
const formattedTimestamp =
date.getUTCFullYear() + "-" +
("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" +
("0" + date.getUTCDate()).slice(-2) + " " +
("0" + date.getUTCHours()).slice(-2) + ":" +
("0" + date.getUTCMinutes()).slice(-2) + ":" +
("0" + date.getUTCSeconds()).slice(-2);
const eventForHashing = { ...event, original_timestamp: formattedTimestamp };
const eventString = JSON.stringify(eventForHashing);
return await hash(eventString);
}

Comment on lines 275 to 282
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Creating temporary array for cleanup is inefficient. Use direct deletion during iteration.

Suggested change
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
// Clean up old hashes based on actual elapsed time (Date.now())
// This handles out-of-order events correctly - we clean up based on real time passage
// not based on event timestamps which may arrive out of order
for (const [hash, storedTimestamp] of this.payloadHashes.entries()) {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
this.payloadHashes.delete(hash);
}
}

Comment on lines 226 to 270
describe("Hash Cleanup", () => {
it("should handle out-of-order events correctly", async () => {
// Simulate events arriving out of chronological order
const newerEvent = createMockEvent({
event: "test_event",
original_timestamp: "2025-01-01T10:31:00.000Z", // Newer timestamp
});

await eventQueue.enqueue(newerEvent);

// Now enqueue an older event
const olderEvent = createMockEvent({
event: "different_event", // Different event, so not a duplicate
original_timestamp: "2025-01-01T10:30:00.000Z", // Older timestamp
});

await eventQueue.enqueue(olderEvent);

// Both should be allowed (different events, cleanup based on real time)
expect(loggerWarnStub.called).to.be.false;
expect(loggerLogStub.calledTwice).to.be.true;
});

it("should clean up hashes based on real elapsed time", async () => {
// This test shows that cleanup is based on Date.now(), not event timestamps
// In a real implementation, you'd mock Date.now() to test this properly
// For now, we verify the logic doesn't break with same-timestamp events
const event1 = createMockEvent({
event: "test_event",
original_timestamp: "2025-01-01T10:30:00.000Z",
});

const event2 = createMockEvent({
event: "test_event",
original_timestamp: "2025-01-01T10:30:00.000Z", // Same timestamp
});

await eventQueue.enqueue(event1);
await eventQueue.enqueue(event2);

// Second event should be blocked as duplicate (same hash)
expect(loggerWarnStub.calledOnce).to.be.true;
expect(loggerLogStub.calledOnce).to.be.true;
});
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Missing critical test: verify that events CAN be sent again after the 60s deduplication window expires. Current tests verify blocking within window, but not expiration behavior.

@yosriady
Copy link
Contributor Author

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

Improves client-side event deduplication by switching from minute-precision to second-precision hashing and implementing a 60-second real-time deduplication window that persists across flush cycles.

Key Changes:

  • Second-precision hashing: Events within the same second (ignoring milliseconds) are considered duplicates, preventing rapid duplicate submissions
  • Real-time window tracking: Uses Date.now() to store hash timestamps instead of event timestamps, fixing the out-of-order event handling issue noted in previous comments
  • Cross-flush persistence: Removed payloadHashes.clear() on flush, maintaining deduplication state across batch sends
  • Automatic cleanup: Old hashes (>60s) are cleaned up during each isDuplicate() check
  • Comprehensive tests: 13 test cases covering basic deduplication, time windows, cross-flush behavior, cleanup logic, and real-world scenarios including window expiration

Previous Issues Addressed:

  • ✅ Out-of-order cleanup logic fixed (src/lib/queue/EventQueue.ts:268-280)
  • ✅ Tests now include queue length assertions (test/lib/queue/EventQueue.spec.ts:88, 99)
  • ✅ Window expiration properly tested with Date.now() mocking (test/lib/queue/EventQueue.spec.ts:274-312)
  • ⚠️ Documentation files mentioned in PR description are not in changeset (expected behavior)

Minor Issue:

  • Unused import toDateHourMinute should be removed (src/lib/queue/EventQueue.ts:8)

Confidence Score: 4/5

  • Safe to merge with one minor cleanup (unused import)
  • Implementation correctly addresses the out-of-order event handling issue from previous review. Comprehensive test suite with proper Date.now() mocking validates the 60s window behavior. Only issue is a cosmetic unused import.
  • src/lib/queue/EventQueue.ts requires minor cleanup (remove unused import on line 8), otherwise no files require special attention

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 4/5 Implements second-precision deduplication with 60s real-time window, fixes out-of-order issue. Minor: unused import toDateHourMinute at line 8.
test/lib/queue/EventQueue.spec.ts 5/5 Comprehensive test suite with 13 tests covering basic deduplication, time windows, cross-flush persistence, cleanup, and real-world scenarios. Properly tests window expiration with Date.now() mocking.

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventQueue
    participant PayloadHashes as PayloadHashes<br/>Map<hash, timestamp>
    participant Logger
    participant API
    
    Client->>EventQueue: enqueue(event)
    EventQueue->>EventQueue: generateMessageId(event)
    Note over EventQueue: Hash = hash(event + timestamp<br/>formatted to second precision)
    
    EventQueue->>EventQueue: isDuplicate(hash)
    EventQueue->>PayloadHashes: Check for old hashes
    Note over PayloadHashes: Cleanup: Delete hashes where<br/>Date.now() - storedTimestamp > 60s
    
    alt Hash exists in Map
        PayloadHashes-->>EventQueue: Hash found
        EventQueue->>Logger: warn("Duplicate detected...")
        EventQueue-->>Client: Return (blocked)
    else Hash not found
        PayloadHashes-->>EventQueue: Hash not found
        EventQueue->>PayloadHashes: set(hash, Date.now())
        EventQueue->>EventQueue: queue.push(event)
        EventQueue->>Logger: log("Event enqueued")
        
        alt Queue reaches flushAt (20 events)
            EventQueue->>EventQueue: flush()
            EventQueue->>API: POST /events (batch)
            Note over EventQueue,PayloadHashes: payloadHashes NOT cleared<br/>Maintains deduplication across flushes
            API-->>EventQueue: 200 OK
        end
        
        EventQueue-->>Client: Success
    end
Loading

Additional Comments (1)

  1. src/lib/queue/EventQueue.ts, line 8 (link)

    style: Unused import - toDateHourMinute is no longer used after switching to second-precision formatting.

10 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

@yosriady yosriady requested a review from TamHuynhTee October 25, 2025 10:20
@yosriady yosriady changed the title [P-1570] Fix: Add more robust event deduplication fix: Add more robust event deduplication [P-1570] Oct 26, 2025
@yosriady
Copy link
Contributor Author

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR overhauls the event deduplication system to fix critical bugs that allowed duplicate events to be sent. The implementation now uses a Map-based approach with Date.now() for time-based cleanup, ensuring robust deduplication across flush cycles.

Key improvements:

  • Replaced Set with Map<string, number> to track when hashes were first seen
  • Changed hash precision from minute-level to second-level for better granularity
  • Removed payloadHashes.clear() from flush() to maintain deduplication across flush cycles
  • Implemented real-time cleanup using Date.now() instead of event timestamps, correctly handling out-of-order events
  • Added comprehensive test suite with proper assertions on queue length and logger calls
  • Updated deduplication window to 60 seconds of real elapsed time

The fix addresses three critical issues: hash clearing on flush, timing inconsistency between hash generation and window checking, and incorrect cleanup with out-of-order events.

Confidence Score: 5/5

  • This PR is safe to merge with high confidence
  • The implementation correctly fixes the deduplication logic with proper separation of concerns (hash generation, duplicate detection, and cleanup). The use of Date.now() for cleanup elegantly handles out-of-order events. Comprehensive test coverage includes window expiration tests with Date.now() mocking, cross-flush persistence verification, and real-world scenarios. The logic is sound and all edge cases are handled properly.
  • No files require special attention. Consider removing DEDUPLICATION_FIX_SUMMARY.md as it's development documentation rather than production code documentation.

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 5/5 Robust deduplication implementation using Date.now() for cleanup and cross-flush persistence. Logic correctly handles out-of-order events and time-based window expiration.
test/lib/queue/EventQueue.spec.ts 5/5 Comprehensive test suite with proper assertions covering deduplication, cross-flush persistence, out-of-order events, window expiration (with Date.now() mocking), and real-world scenarios.
DEDUPLICATION_FIX_SUMMARY.md 4/5 Detailed documentation of the fix. While comprehensive, this development notes file may not belong in the repository long-term.

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventQueue
    participant PayloadHashes as Map<hash, timestamp>
    participant Logger
    participant API

    Note over Client,API: Event Enqueueing & Deduplication Flow

    Client->>EventQueue: enqueue(event)
    EventQueue->>EventQueue: generateMessageId(event)
    Note over EventQueue: Hash = f(event data + timestamp)<br/>Rounded to second precision
    EventQueue-->>EventQueue: message_id (hash)
    
    EventQueue->>EventQueue: isDuplicate(message_id)
    Note over EventQueue: now = Date.now()
    
    EventQueue->>PayloadHashes: forEach(storedTimestamp, hash)
    Note over PayloadHashes: Cleanup: Remove hashes where<br/>now - storedTimestamp > 60s
    
    alt Hash exists in map
        PayloadHashes-->>EventQueue: true (found)
        EventQueue->>Logger: warn("Duplicate detected")
        EventQueue-->>Client: return (blocked)
    else Hash not in map
        PayloadHashes-->>EventQueue: false (not found)
        EventQueue->>PayloadHashes: set(message_id, now)
        EventQueue->>EventQueue: queue.push(event)
        EventQueue->>Logger: log("Event enqueued")
        
        alt Queue reaches flushAt threshold
            EventQueue->>EventQueue: flush()
            Note over EventQueue: Note: payloadHashes NOT cleared<br/>Hashes persist across flushes
            EventQueue->>API: POST /events (batch)
            API-->>EventQueue: 200 OK
        end
        
        EventQueue-->>Client: return (success)
    end

    Note over PayloadHashes: Deduplication Window: 60s<br/>Based on real time (Date.now())<br/>Not event timestamps
Loading

5 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 273 to 280
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Creating a temporary array for deletion adds overhead. Consider deleting directly during iteration since Map.forEach allows safe deletion.

Suggested change
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
// Clean up old hashes based on actual elapsed time (Date.now())
// This handles out-of-order events correctly - we clean up based on real time passage
// not based on event timestamps which may arrive out of order
for (const [hash, storedTimestamp] of this.payloadHashes.entries()) {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
this.payloadHashes.delete(hash);
}
}

@@ -0,0 +1,318 @@
# Event Deduplication Fix - Complete Summary
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Consider removing this development notes file from the repository. Detailed implementation notes like this are typically better suited for PR descriptions, commit messages, or internal documentation rather than being committed to the codebase.

@yosriady
Copy link
Contributor Author

bugbot run

cursor[bot]

This comment was marked as outdated.

@yosriady
Copy link
Contributor Author

@greptile

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR implements a robust event deduplication mechanism that prevents duplicate analytics events from being sent within a 60-second real-time window. The fix addresses four critical bugs: hash clearing on flush, inconsistent time sources for deduplication vs cleanup, incorrect out-of-order event handling, and memory leaks from missing periodic cleanup.

Key improvements:

  • Changed from Set<string> to Map<string, number> to track when each event hash was first seen (using Date.now())
  • Removed payloadHashes.clear() on flush, enabling cross-flush deduplication
  • Extracted cleanup logic to separate cleanupOldHashes() method that uses real elapsed time (Date.now()) consistently
  • Added periodic cleanup during flush cycles to prevent memory leaks even when events stop arriving
  • Improved hash precision from minute-level to second-level for better granularity
  • Added comprehensive test suite with proper queue state assertions and time-mocking

Technical correctness:
The implementation correctly handles the separation of concerns: hash generation (event identity), deduplication (duplicate detection), and cleanup (memory management). The use of Date.now() for both storage and cleanup timestamps ensures consistent behavior regardless of event timestamp ordering.

Minor optimization note:
The cleanupOldHashes() method creates a temporary array of hashes to delete (line 274-281) to avoid modifying the Map during iteration. While JavaScript Maps support safe deletion during forEach iteration, this conservative approach is acceptable given cleanup runs infrequently (every flush cycle).

Confidence Score: 4/5

  • This PR is safe to merge with minimal risk - the deduplication logic is sound and well-tested
  • Score reflects solid implementation with comprehensive tests and correct handling of edge cases. Not a 5 due to: (1) the included 381-line documentation file that should be in PR description instead, and (2) minor cleanup inefficiency using temporary array. The core deduplication logic is correct and addresses real bugs.
  • Consider removing DEDUPLICATION_FIX_SUMMARY.md from the repository - detailed implementation notes like this are better suited for PR descriptions or internal documentation

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 4/5 Implements robust 60s time-window deduplication with cross-flush persistence and cleanup. Implementation is solid, but temporary array in cleanup adds minor overhead.
test/lib/queue/EventQueue.spec.ts 5/5 Comprehensive test suite with proper assertions on queue state, time mocking for window expiration tests, and coverage of edge cases including out-of-order events and memory leak prevention.
DEDUPLICATION_FIX_SUMMARY.md 3/5 381-line documentation file added to repository. Should likely be in PR description or internal docs instead of committed to codebase.

Sequence Diagram

sequenceDiagram
    participant User
    participant SDK
    participant EventQueue
    participant HashMap as payloadHashes<br/>Map<hash, timestamp>
    participant Server

    Note over EventQueue: Deduplication Window = 60s

    User->>SDK: track("transaction_submit")
    SDK->>EventQueue: enqueue(event)
    
    EventQueue->>EventQueue: generateMessageId(event)
    Note over EventQueue: Hash = SHA256(event data + <br/>timestamp to second precision)
    
    EventQueue->>EventQueue: isDuplicate(hash)?
    EventQueue->>HashMap: cleanupOldHashes()
    Note over HashMap: Remove hashes where<br/>Date.now() - stored > 60s
    
    EventQueue->>HashMap: has(hash)?
    HashMap-->>EventQueue: false (new event)
    
    EventQueue->>HashMap: set(hash, Date.now())
    EventQueue->>EventQueue: queue.push(event)
    Note over EventQueue: Event queued successfully
    
    User->>SDK: track("transaction_submit")<br/>(duplicate, 5s later)
    SDK->>EventQueue: enqueue(event)
    EventQueue->>EventQueue: generateMessageId(event)
    Note over EventQueue: Same hash generated
    EventQueue->>EventQueue: isDuplicate(hash)?
    EventQueue->>HashMap: has(hash)?
    HashMap-->>EventQueue: true (seen 5s ago)
    EventQueue->>EventQueue: log warning & block
    Note over EventQueue: Duplicate blocked ✓
    
    Note over EventQueue: Flush triggered<br/>(queue full or interval)
    EventQueue->>EventQueue: flush()
    EventQueue->>HashMap: cleanupOldHashes()
    EventQueue->>Server: POST /events (batch)
    Server-->>EventQueue: 200 OK
    Note over HashMap: Hash NOT cleared<br/>Cross-flush persistence ✓
    
    Note over EventQueue: 61 seconds pass...
    EventQueue->>HashMap: cleanupOldHashes()
    Note over HashMap: Hash expired & removed
    
    User->>SDK: track("transaction_submit")<br/>(same event, after 60s)
    SDK->>EventQueue: enqueue(event)
    EventQueue->>EventQueue: isDuplicate(hash)?
    EventQueue->>HashMap: has(hash)?
    HashMap-->>EventQueue: false (expired)
    EventQueue->>HashMap: set(hash, Date.now())
    EventQueue->>EventQueue: queue.push(event)
    Note over EventQueue: Allowed after window ✓
Loading

3 files reviewed, 2 comments

Edit Code Review Agent Settings | Greptile

Comment on lines 274 to 281
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Maps in JavaScript support safe deletion during forEach iteration, so collecting hashes first is unnecessary overhead.

Suggested change
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
// Clean up old hashes based on actual elapsed time (Date.now())
// This handles out-of-order events correctly - we clean up based on real time passage
// not based on event timestamps which may arrive out of order
this.payloadHashes.forEach((storedTimestamp, hash) => {
// storedTimestamp is when we first saw this hash (Date.now() at storage time)
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
this.payloadHashes.delete(hash);
}
});

Comment on lines 1 to 381
# Event Deduplication Fix - Complete Summary

## Problem Statement

Users were experiencing an excess of custom events being sent by the Formo Analytics SDK. For example, a single transaction flow (submit → success/error) was generating far more events than expected (e.g., 9000 submits but many more successes/errors).

## Root Cause Analysis

The SDK's deduplication mechanism had **four critical bugs**:

### Bug #1: Hash Cleared on Every Flush ❌
```typescript
// BEFORE: Hash map cleared after every flush
async flush() {
// ... send events ...
this.payloadHashes.clear(); // ❌ Cleared deduplication state!
}
```
**Impact**: Duplicate events sent within the same 60-second window would NOT be deduplicated if a flush occurred between them.

### Bug #2: Timing Inconsistency ❌
```typescript
// BEFORE: Inconsistent time sources
generateMessageId() {
// Used event.original_timestamp for hash
}

isDuplicate() {
// Used Date.now() for time window
}
```
**Impact**: Events with past/future `original_timestamp` values wouldn't be deduplicated correctly. The 60s window was measured from current system time, but the hash was based on the event's timestamp.

### Bug #3: Out-of-Order Events ❌
```typescript
// BEFORE: Cleanup used event timestamp
isDuplicate(eventId, eventTimestamp) {
// Clean up using eventTimestamp
if (eventTimestamp - storedTimestamp > 60s) {
delete hash; // ❌ Wrong reference point!
}
}
```
**Impact**: When older events arrived after newer ones, cleanup would use the older timestamp as reference, incorrectly deleting hashes that should still be valid.

**Example**:
```
10:31:00 → Event A arrives (timestamp: 10:31:00)
10:31:05 → Event B arrives (timestamp: 10:30:00) ← Older!
Cleanup uses 10:30:00, deletes hashes < 09:29:00
But should keep hashes until 09:30:00 based on Event A!
```

### Bug #4: Memory Leak - No Cleanup Without Events ❌
```typescript
// BEFORE: Cleanup only in isDuplicate()
private async isDuplicate(eventId: string): Promise<boolean> {
// Clean up old hashes...
// But this only runs when NEW events arrive!
}

async flush() {
// No cleanup here!
}
```
**Impact**: If events stop arriving, expired hashes are never removed from memory. In long-running applications with sporadic event activity, this could accumulate thousands of expired hashes, causing a memory leak.

**Example**:
```
10:30:00 → Event arrives, hash stored
10:30:30 → Event arrives, hash stored
10:31:00 → Last event arrives, hash stored
... 2 hours pass with no events ...
All 3 hashes still in memory! ❌
Should have been cleaned up after 60s
```

## Solution Architecture

### Core Principle: Separation of Concerns

1. **Hash Generation** = Event Identity (uses event data + timestamp)
2. **Deduplication** = Duplicate Detection (checks hash equality)
3. **Cleanup** = Memory Management (uses real elapsed time)

### Implementation

#### 1. Hash Generation (Event Identity)
```typescript
private async generateMessageId(event: IFormoEvent): Promise<string> {
// Format timestamp to second precision for consistent hashing
const date = new Date(event.original_timestamp);
const formattedTimestamp =
date.getUTCFullYear() + "-" +
("0" + (date.getUTCMonth() + 1)).slice(-2) + "-" +
("0" + date.getUTCDate()).slice(-2) + " " +
("0" + date.getUTCHours()).slice(-2) + ":" +
("0" + date.getUTCMinutes()).slice(-2) + ":" +
("0" + date.getUTCSeconds()).slice(-2);

const eventForHashing = { ...event, original_timestamp: formattedTimestamp };
return await hash(JSON.stringify(eventForHashing));
}
```

**Key Points**:
- Hash based on event data + event timestamp (rounded to seconds)
- Same event at same time = same hash
- Different timestamp = different hash (different event)

#### 2. Deduplication Logic (Duplicate Detection)
```typescript
async enqueue(event: IFormoEvent, callback?: (...args: any) => void) {
const message_id = await this.generateMessageId(event);

if (await this.isDuplicate(message_id)) {
// Duplicate detected - block it
return;
}

// Enqueue the event
this.queue.push({ message: { ...event, message_id }, callback });
}
```

**Key Points**:
- Simple hash equality check
- If seen recently (within 60s real time), block it

#### 3. Cleanup & Storage (Memory Management)
```typescript
/**
* Separate cleanup method that can be called from multiple places
*/
private cleanupOldHashes(): void {
const now = Date.now();

// CLEANUP: Remove old hashes based on REAL elapsed time
const hashesToDelete: string[] = [];
this.payloadHashes.forEach((storedTimestamp, hash) => {
if (now - storedTimestamp > DEDUPLICATION_WINDOW_MS) {
hashesToDelete.push(hash);
}
});
hashesToDelete.forEach(hash => this.payloadHashes.delete(hash));
}

private async isDuplicate(eventId: string): Promise<boolean> {
const now = Date.now();

// CLEANUP: Clean up old hashes to prevent memory leaks
this.cleanupOldHashes();

// CHECK: Is this hash in recent memory?
if (this.payloadHashes.has(eventId)) {
const storedAt = this.payloadHashes.get(eventId)!;
const elapsedRealTime = now - storedAt;
logger.warn(`Duplicate event detected and blocked. Same event was first seen ${Math.round(elapsedRealTime / 1000)}s ago.`);
return true;
}

// STORE: Remember this hash with current real time
this.payloadHashes.set(eventId, now);
return false;
}
```

**Key Points**:
- Cleanup extracted to separate `cleanupOldHashes()` method
- Cleanup based on `Date.now()` (real time elapsed)
- Storage uses `Date.now()` (when we first saw it)
- Deduplication window = 60 seconds of real time since first seen
- Hash map is **never cleared** - only time-based cleanup

#### 4. Periodic Cleanup on Flush ✅
```typescript
async flush(callback?: (...args: any) => void) {
// ... clear timer ...

// CLEANUP: Run periodic cleanup to prevent memory leaks
// This ensures cleanup happens even when no new events arrive
this.cleanupOldHashes();

if (!this.queue.length) {
return Promise.resolve();
}

// ... send events ...
// Note: payloadHashes is NOT cleared - only time-based cleanup
}
```

**Key Points**:
- Cleanup runs on every flush cycle (default: every 10 seconds)
- Prevents memory leaks in long-running apps with sporadic events
- No additional timers needed - leverages existing flush interval
- Works even when queue is empty

## What Changed

| Aspect | Before | After |
|--------|--------|-------|
| **Hash generation** | event.original_timestamp | ✅ event.original_timestamp (same) |
| **Hash precision** | Minute-level |**Second-level** |
| **Hash cleared on flush** | ❌ Yes (cleared) |**No (preserved)** |
| **Cleanup time source** | ❌ Inconsistent/event time |**Date.now() (real time)** |
| **Storage time source** | ❌ Inconsistent |**Date.now() (real time)** |
| **Deduplication window** | ❌ Broken across flushes |**60s real-time window** |
| **Out-of-order events** | ❌ Broken |**Handled correctly** |
| **Memory leak prevention** | ❌ No cleanup without events |**Periodic cleanup on flush** |
| **Test coverage** | ❌ No assertions |**Comprehensive tests** |

## How It Works

### Example Flow

```typescript
// Event arrives at real-time 10:31:00
const event = {
type: "track",
event: "transaction_submit",
original_timestamp: "2025-01-01T10:30:00Z",
properties: { type: "swap" }
}

// 1. Generate hash (based on event data + timestamp)
const hash = generateMessageId(event);
// hash = "abc123..." (includes 10:30:00 in the hash)

// 2. Check if duplicate
if (!payloadHashes.has("abc123")) {
// 3. Store with real time (when first seen)
payloadHashes.set("abc123", 10:31:00); // Real time!
// 4. Enqueue
}

// 15 seconds later: Same event arrives (10:31:15)
// Hash is still "abc123" (same event data + timestamp)
// payloadHashes.has("abc123") = true
// Elapsed: 10:31:15 - 10:31:00 = 15s < 60s
// BLOCKED as duplicate ✅

// 61 seconds later: Same event arrives (10:32:01)
// Cleanup runs: 10:32:01 - 10:31:00 = 61s > 60s
// Hash "abc123" deleted
// Now allowed through ✅
```

## Edge Cases Handled

### ✅ Out-of-Order Events
```typescript
10:31:00Event A (timestamp: 10:31:00) arrives
10:31:05Event B (timestamp: 10:30:00) arrivesOlder!

// Different timestamps → different hashes → both allowed
// Cleanup uses real-time → works correctly
```

### ✅ Rapid Duplicates
```typescript
// User clicks button 5 times in 1 second
for (let i = 0; i < 5; i++) {
track("transaction_submit", { type: "swap" });
}

// All have same hash
// First: stored
// Rest: blocked ✅
```

### ✅ Cross-Flush Persistence
```typescript
10:30:00Event queued (hash stored)
10:30:25Flush triggered (hash preserved!)
10:30:45Duplicate blocked (hash still there)
10:31:05Hash cleaned up (65s elapsed)
```

### ✅ Transaction Lifecycle
```typescript
// Real-world scenario
10:30:00transaction_submit (allowed)
10:30:00transaction_submit (blocked - duplicate!)
10:30:05transaction_success (allowed - different event)
10:30:05transaction_success (blocked - duplicate!)
```

## Test Coverage

Comprehensive test suite added with **proper assertions**:

1.**Basic deduplication** - Same event within same second
2.**Different events allowed** - Different properties/names
3.**Time-based window** - 60s real-time deduplication
4.**Cross-flush persistence** - Hashes survive flushes
5.**Out-of-order events** - Older events after newer
6.**Window expiration** - Events allowed after 60s
7.**Hash cleanup** - Old hashes removed
8.**Memory leak prevention** - Cleanup during flush even with no events
9.**Queue length assertions** - Verify actual queue state
10.**Real-world scenarios** - Transaction lifecycles

## Files Modified

### Implementation
- `src/lib/queue/EventQueue.ts` - Core deduplication logic
- Simplified `generateMessageId()` to return just the hash
- Extracted cleanup logic into separate `cleanupOldHashes()` method
- Modified `isDuplicate()` to use `cleanupOldHashes()`
- Modified `flush()` to call `cleanupOldHashes()` periodically
- Removed `this.payloadHashes.clear()` from `flush()`
- Added detailed logging for duplicate detection

### Tests
- `test/lib/queue/EventQueue.spec.ts` - Comprehensive test suite
- Added `sinon` stubs for logger verification
- Added queue length assertions
- Added window expiration test with `Date.now()` mock
- Added out-of-order event tests
- Added memory leak prevention test (cleanup during flush)
- Added real-world scenario tests

## Build Status

```bash
✅ TypeScript compilation: Success
✅ Linter: 0 errors
✅ Build: Success (CJS, ESM, UMD)
✅ Bundle size: 128 KB (no increase)
✅ Tests: All passing
```

## Impact

### Before Fix
- ❌ Duplicate events sent frequently
- ❌ 9000 submits could generate many more success/error events
- ❌ Deduplication broken across flushes
- ❌ Out-of-order events handled incorrectly
- ❌ Unpredictable behavior

### After Fix
- ✅ Robust deduplication within 60s window
- ✅ Consistent behavior across flushes
- ✅ Correct handling of all edge cases
- ✅ Predictable real-time-based deduplication
- ✅ Production-ready

## Key Takeaways

1. **Hash = Event Identity**: Based on event data + timestamp
2. **Window = Real Time**: 60 seconds since we first saw the hash
3. **Cleanup = Memory Management**: Remove hashes after 60s of real time
4. **Never Clear Hash Map**: Only time-based cleanup, no manual clearing
5. **Periodic Cleanup**: Runs on flush to prevent memory leaks

## Deduplication Semantics

**"An event is considered a duplicate if it has the same data and timestamp as an event we've seen within the last 60 seconds of real time."**

This means:
- Same event can be sent again after 60 seconds ✅
- Different timestamps = different events ✅
- Works with out-of-order events ✅
- Persists across flushes ✅
- Memory-efficient with automatic cleanup ✅

---

## All Four Bugs Fixed ✅

1.**Bug #1 - Hash Cleared on Flush**: Hash map now preserved, cleaned only by time
2.**Bug #2 - Timing Inconsistency**: Cleanup uses `Date.now()` consistently
3.**Bug #3 - Out-of-Order Events**: Cleanup based on real time, not event timestamps
4.**Bug #4 - Memory Leak**: Periodic cleanup on flush prevents hash accumulation

**Status**: ✅ **PRODUCTION READY**

All critical issues identified and fixed. Comprehensive test coverage added. Memory leak prevention implemented. Ready for deployment.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Remove this 381-line documentation file from the repository. While comprehensive and well-written, this level of implementation detail belongs in the PR description or internal documentation, not committed to the codebase.

@yosriady
Copy link
Contributor Author

@greptile

@yosriady
Copy link
Contributor Author

bugbot run

cursor[bot]

This comment was marked as outdated.

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

This PR replaces the simple Set-based deduplication with a time-windowed approach using a Map<string, number> to track event hashes with Date.now() timestamps, enabling 60-second deduplication windows that persist across flush cycles.

Key Changes:

  • Message IDs now use second-precision timestamps (previously minute-precision), allowing events in different seconds to be distinguished while still catching rapid duplicates within the same second
  • Deduplication window set to 60 seconds using real-time tracking (Date.now()), not event timestamps, which correctly handles out-of-order events
  • Hash cleanup runs during both isDuplicate() calls and flush() to prevent memory leaks
  • Comprehensive test suite with 453 lines covering basic, time-based, cross-flush deduplication, and edge cases including proper Date.now() stubbing

Implementation Quality:
The core logic is sound and correctly addresses the out-of-order event concern from previous comments. The cleanup mechanism using Date.now() for storage timestamps ensures that events arriving out of chronological order won't cause incorrect cleanup. Tests are thorough and well-structured with proper mocking.

Confidence Score: 4/5

  • This PR is safe to merge with minor performance considerations.
  • The deduplication logic is correctly implemented with proper time-window tracking using Date.now(). Tests comprehensively cover edge cases including out-of-order events and time-based expiration. One minor concern is that cleanupOldHashes() runs on every enqueue() call, which could add overhead during high-traffic periods, though the impact should be minimal given Maps iterate quickly and the hash count stays bounded by the 60s window.
  • No files require special attention - implementation and tests are solid.

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 4/5 Replaces Set-based deduplication with time-windowed Map approach using Date.now() for 60s windows; adds second-precision message IDs and periodic cleanup. Logic is sound but cleanup runs on every enqueue.
test/lib/queue/EventQueue.spec.ts 5/5 Comprehensive test suite covering basic/time-based/cross-flush deduplication, cleanup, message ID generation, and real-world scenarios with proper Date.now() stubbing. Well-structured.

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventQueue
    participant Hash Storage
    participant API

    Client->>EventQueue: enqueue(event1, timestamp: T0)
    EventQueue->>EventQueue: generateMessageId(event1)
    Note over EventQueue: Hash = SHA256(event1 + "2025-01-01 10:30:00")
    EventQueue->>EventQueue: isDuplicate(Hash)
    EventQueue->>Hash Storage: cleanupOldHashes()
    Note over Hash Storage: Remove hashes where<br/>Date.now() - storedAt > 60s
    EventQueue->>Hash Storage: has(Hash)?
    Hash Storage-->>EventQueue: false
    EventQueue->>Hash Storage: set(Hash, Date.now())
    EventQueue->>EventQueue: queue.push(event1)
    Note over EventQueue: Queue length: 1

    Client->>EventQueue: enqueue(event2, same data, timestamp: T0+0.5s)
    EventQueue->>EventQueue: generateMessageId(event2)
    Note over EventQueue: Hash = SHA256(event2 + "2025-01-01 10:30:00")<br/>(same second = same hash)
    EventQueue->>EventQueue: isDuplicate(Hash)
    EventQueue->>Hash Storage: cleanupOldHashes()
    EventQueue->>Hash Storage: has(Hash)?
    Hash Storage-->>EventQueue: true (stored 0.5s ago)
    EventQueue->>EventQueue: logger.warn("Duplicate detected, 0s ago")
    EventQueue-->>Client: return (blocked)

    Note over EventQueue: Time passes... 61 seconds

    Client->>EventQueue: enqueue(event3, same data, timestamp: T0)
    EventQueue->>EventQueue: generateMessageId(event3)
    Note over EventQueue: Hash = same as before
    EventQueue->>EventQueue: isDuplicate(Hash)
    EventQueue->>Hash Storage: cleanupOldHashes()
    Note over Hash Storage: Date.now() - storedAt > 60s<br/>Delete Hash
    EventQueue->>Hash Storage: has(Hash)?
    Hash Storage-->>EventQueue: false (cleaned up)
    EventQueue->>Hash Storage: set(Hash, Date.now())
    EventQueue->>EventQueue: queue.push(event3)
    Note over EventQueue: Queue length: 2

    Note over EventQueue: Queue reaches flushAt threshold

    EventQueue->>EventQueue: flush()
    EventQueue->>Hash Storage: cleanupOldHashes()
    EventQueue->>API: POST /events (batch)
    API-->>EventQueue: 200 OK
    Note over EventQueue: Hashes NOT cleared<br/>(maintained for 60s window)
Loading

4 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile

Comment on lines +290 to +294
private async isDuplicate(eventId: string): Promise<boolean> {
const now = Date.now();

// Clean up old hashes to prevent memory leaks
this.cleanupOldHashes();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: Cleanup runs on every enqueue() call. With high event throughput, this adds O(n) overhead per event. Consider throttling cleanup to run only every N seconds or M events to reduce overhead during bursts.

Example: Track last cleanup time and only run if now - lastCleanup > 10000 (10s)

@yosriady
Copy link
Contributor Author

@greptile

flushAt: 20,
flushInterval: 30000, // 30 seconds
maxQueueSize: 1024 * 500, // 500kB
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Bug

The EventQueue constructor receives a url option instead of the expected apiHost. This results in this.apiHost being undefined, causing fetch requests during flush operations to fail.

Fix in Cursor Fix in Web

Copy link

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Overview

Greptile Summary

Implements robust 60-second time-windowed event deduplication to prevent duplicate events from being queued.

Key Changes:

  • Changed payloadHashes from Set<string> to Map<string, number> to track first-seen timestamps using Date.now()
  • Implemented cleanupOldHashes() with 10-second throttling to remove expired hashes and prevent memory leaks
  • Modified generateMessageId() to use second-level timestamp precision (previously minute-level)
  • Preserved deduplication state across flush cycles (no longer clearing payloadHashes on flush)
  • Added comprehensive test suite with 488 lines covering basic deduplication, time windows, cross-flush behavior, cleanup throttling, and real-world scenarios

Major Fix from Previous Review:
The critical out-of-order event bug has been resolved. Cleanup now correctly uses Date.now() for real-time tracking instead of event timestamps, preventing premature deletion of valid hashes when older events arrive after newer ones.

Confidence Score: 4/5

  • Safe to merge with minor considerations - implementation is sound with comprehensive tests, but edge cases around Map.forEach deletion should be monitored in production
  • Score reflects solid implementation that addresses the critical out-of-order event bug from previous review. The use of Date.now() for cleanup is correct, throttling reduces overhead, and test coverage is excellent. Deducting one point because Map.forEach with deletion during iteration, while documented as safe in JavaScript, is an uncommon pattern that warrants production monitoring
  • No files require special attention - the EventQueue.ts implementation correctly handles edge cases and has comprehensive test coverage

Important Files Changed

File Analysis

Filename Score Overview
src/lib/queue/EventQueue.ts 4/5 Implements time-windowed deduplication with Date.now()-based cleanup. Previous out-of-order event issue has been fixed by using real-time tracking. Throttled cleanup reduces overhead during high throughput.
test/lib/queue/EventQueue.spec.ts 5/5 Comprehensive test suite covering deduplication, time windows, cross-flush behavior, cleanup, throttling, and real-world scenarios. Tests verify hash expiration after 60s window.

Sequence Diagram

sequenceDiagram
    participant Client
    participant EventQueue
    participant HashStore as payloadHashes Map
    participant Logger
    
    Client->>EventQueue: enqueue(event)
    EventQueue->>EventQueue: generateMessageId(event)
    Note over EventQueue: Format timestamp to second precision<br/>(YYYY-MM-DD HH:mm:ss)
    EventQueue->>EventQueue: hash(event + timestamp)
    EventQueue->>EventQueue: isDuplicate(hash)
    EventQueue->>EventQueue: cleanupOldHashes() [throttled]
    Note over EventQueue: Check if Date.now() - lastCleanupTime > 10s
    
    alt Cleanup needed
        EventQueue->>HashStore: forEach(hash, storedTimestamp)
        loop For each hash
            alt now - storedTimestamp > 60s
                EventQueue->>HashStore: delete(hash)
            end
        end
    end
    
    EventQueue->>HashStore: has(hash)?
    
    alt Hash exists (duplicate)
        HashStore-->>EventQueue: true
        EventQueue->>Logger: warn("Duplicate detected")
        EventQueue-->>Client: return (blocked)
    else Hash not found (new event)
        HashStore-->>EventQueue: false
        EventQueue->>HashStore: set(hash, Date.now())
        EventQueue->>EventQueue: queue.push(event)
        EventQueue->>Logger: log("Event enqueued")
        EventQueue-->>Client: success
    end
    
    Note over EventQueue: On flush()
    EventQueue->>EventQueue: cleanupOldHashes(force=true)
    Note over EventQueue: Force cleanup bypasses throttle<br/>payloadHashes NOT cleared<br/>Deduplication persists across flushes
Loading

4 files reviewed, no comments

Edit Code Review Agent Settings | Greptile

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants