-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(signals): Video-first session summarization #43805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
ee/hogai/videos/utils.py
Outdated
| # Convert ms to seconds, ceil to avoid grey "not-rendered" frames at the start | ||
| return math.ceil(track.duration / 1000.0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Q @sortafreel: I copied this verbatim from the existing code, but I'm not sure if it's actually intended to ceil here? Seems like if we wanted to skip, we'd floor. And either way, this should not be needed, as we have a more intentional SESSION_VIDEO_RENDERING_DELAY offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
23 files reviewed, 8 comments
|
|
||
| logger = structlog.get_logger(__name__) | ||
|
|
||
| raw_client = RawGenAIClient(api_key=settings.GEMINI_API_KEY) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Module-level client initialization creates a shared state across all activity executions. In Temporal activities, this could lead to connection issues or thread safety problems.
| raw_client = RawGenAIClient(api_key=settings.GEMINI_API_KEY) | |
| # Move client creation inside the activity function to avoid shared state issues |
Prompt To Fix With AI
This is a comment left during a code review.
Path: posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py
Line: 22:22
Comment:
**style:** Module-level client initialization creates a shared state across all activity executions. In Temporal activities, this could lead to connection issues or thread safety problems.
```suggestion
# Move client creation inside the activity function to avoid shared state issues
```
How can I resolve this? If you propose a fix, please make it concise.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, fair
posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py
Outdated
Show resolved
Hide resolved
posthog/temporal/ai/session_summary/activities/a4_consolidate_video_segments.py
Outdated
Show resolved
Hide resolved
posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py
Show resolved
Hide resolved
posthog/temporal/ai/session_summary/activities/a6_store_video_session_summary.py
Show resolved
Hide resolved
posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
8 issues found across 25 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="posthog/temporal/ai/session_summary/activities/a5_embed_and_store_segments.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a5_embed_and_store_segments.py:18">
P2: Parameter `asset_id` is declared but never used in the function body. This is either dead code that should be removed, or indicates missing functionality where `asset_id` should be included in the metadata or used elsewhere.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py:48">
P1: Missing `tmp_file.flush()` after writing to temp file. Since the upload reads the file by path (not file handle), the data must be flushed to disk first, otherwise it may upload an incomplete or empty file.</violation>
<violation number="2" location="posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py:62">
P1: Using blocking `time.sleep(0.5)` in an async function blocks the event loop. Use `await asyncio.sleep(0.5)` instead to allow other async operations to proceed during the wait.</violation>
</file>
<file name="ee/hogai/videos/utils.py">
<violation number="1" location="ee/hogai/videos/utils.py:16">
P1: `track.duration` can be `None` for some media files, which would cause a `TypeError` when dividing by 1000.0. Add a null check before using the duration value.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/summarize_session_group.py">
<violation number="1" location="posthog/temporal/ai/session_summary/summarize_session_group.py:406">
P1: Missing try/except block breaks error isolation. The `_run_summary` method's documented purpose is to 'avoid one activity failing the whole group' by returning exceptions instead of raising them. The removal of the try/except block means any exception will propagate and fail the entire group summarization, rather than being caught and returned for graceful handling in `_run_summaries`.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/activities/a1_export_session_video.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a1_export_session_video.py:105">
P1: Using `uuid.uuid4()` in the workflow ID makes it non-deterministic across activity retries. If this activity is retried (e.g., due to timeout), a new workflow will be spawned instead of resuming the existing one. Use a deterministic ID based on `exported_asset.id` instead.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py:279">
P1: `get_column_index` raises `ValueError` for missing columns, but this code expects it to return `None`. Sessions without exception events (which lack `$exception_types`, `$exception_values` columns) will cause this activity to crash. Wrap the call in try/except to handle missing columns gracefully.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/summarize_session.py">
<violation number="1" location="posthog/temporal/ai/session_summary/summarize_session.py:432">
P1: Videos shorter than 17 seconds (after 2s rendering delay) will produce zero segments and fail with a misleading error message. If `analysis_duration < SESSION_VIDEO_CHUNK_DURATION_S`, `num_segments` becomes 0. Consider ensuring at least 1 segment is created for any positive analysis duration.</violation>
</file>
Since this is your first cubic review, here's how it works:
- cubic automatically reviews your code and comments on bugs and improvements
- Teach cubic by replying to its comments. cubic learns from your replies and gets better over time
- Ask questions if you need clarification on any suggestion
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
| async def embed_and_store_segments_activity( | ||
| inputs: VideoSummarySingleSessionInputs, | ||
| segments: list[VideoSegmentOutput], | ||
| asset_id: int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Parameter asset_id is declared but never used in the function body. This is either dead code that should be removed, or indicates missing functionality where asset_id should be included in the metadata or used elsewhere.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a5_embed_and_store_segments.py, line 18:
<comment>Parameter `asset_id` is declared but never used in the function body. This is either dead code that should be removed, or indicates missing functionality where `asset_id` should be included in the metadata or used elsewhere.</comment>
<file context>
@@ -0,0 +1,77 @@
+async def embed_and_store_segments_activity(
+ inputs: VideoSummarySingleSessionInputs,
+ segments: list[VideoSegmentOutput],
+ asset_id: int,
+) -> None:
+ """Generate embeddings for all segments and produce to Kafka for ClickHouse storage
</file context>
posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py
Outdated
Show resolved
Hide resolved
|
|
||
| # Write video to temporary file for upload | ||
| with tempfile.NamedTemporaryFile() as tmp_file: | ||
| tmp_file.write(video_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Missing tmp_file.flush() after writing to temp file. Since the upload reads the file by path (not file handle), the data must be flushed to disk first, otherwise it may upload an incomplete or empty file.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py, line 48:
<comment>Missing `tmp_file.flush()` after writing to temp file. Since the upload reads the file by path (not file handle), the data must be flushed to disk first, otherwise it may upload an incomplete or empty file.</comment>
<file context>
@@ -0,0 +1,95 @@
+
+ # Write video to temporary file for upload
+ with tempfile.NamedTemporaryFile() as tmp_file:
+ tmp_file.write(video_bytes)
+ # Upload to Gemini
+ logger.info(
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's a valid concern for large video files.
| for track in media_info.tracks: | ||
| if track.track_type == "General": | ||
| # Convert ms to seconds, ceil to avoid grey "not-rendered" frames at the start | ||
| return int(math.ceil(track.duration / 1000.0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: track.duration can be None for some media files, which would cause a TypeError when dividing by 1000.0. Add a null check before using the duration value.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At ee/hogai/videos/utils.py, line 16:
<comment>`track.duration` can be `None` for some media files, which would cause a `TypeError` when dividing by 1000.0. Add a null check before using the duration value.</comment>
<file context>
@@ -0,0 +1,17 @@
+ for track in media_info.tracks:
+ if track.track_type == "General":
+ # Convert ms to seconds, ceil to avoid grey "not-rendered" frames at the start
+ return int(math.ceil(track.duration / 1000.0))
+ raise ValueError("No General track found in video to extract duration from")
</file context>
| await client.execute_workflow( | ||
| VideoExportWorkflow.run, | ||
| VideoExportInputs(exported_asset_id=exported_asset.id), | ||
| id=f"session-video-summary-export_{inputs.session_id}_{uuid.uuid4()}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Using uuid.uuid4() in the workflow ID makes it non-deterministic across activity retries. If this activity is retried (e.g., due to timeout), a new workflow will be spawned instead of resuming the existing one. Use a deterministic ID based on exported_asset.id instead.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a1_export_session_video.py, line 105:
<comment>Using `uuid.uuid4()` in the workflow ID makes it non-deterministic across activity retries. If this activity is retried (e.g., due to timeout), a new workflow will be spawned instead of resuming the existing one. Use a deterministic ID based on `exported_asset.id` instead.</comment>
<file context>
@@ -0,0 +1,124 @@
+ await client.execute_workflow(
+ VideoExportWorkflow.run,
+ VideoExportInputs(exported_asset_id=exported_asset.id),
+ id=f"session-video-summary-export_{inputs.session_id}_{uuid.uuid4()}",
+ task_queue=settings.VIDEO_EXPORT_TASK_QUEUE,
+ retry_policy=RetryPolicy(maximum_attempts=int(TEMPORAL_WORKFLOW_MAX_ATTEMPTS)),
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is determenistic as it includes session id.
| # Get indices for key columns | ||
| column_indices: dict[str, int | None] = {} | ||
| for col in key_columns: | ||
| column_indices[col] = get_column_index(simplified_events_columns, col) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: get_column_index raises ValueError for missing columns, but this code expects it to return None. Sessions without exception events (which lack $exception_types, $exception_values columns) will cause this activity to crash. Wrap the call in try/except to handle missing columns gracefully.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py, line 279:
<comment>`get_column_index` raises `ValueError` for missing columns, but this code expects it to return `None`. Sessions without exception events (which lack `$exception_types`, `$exception_values` columns) will cause this activity to crash. Wrap the call in try/except to handle missing columns gracefully.</comment>
<file context>
@@ -0,0 +1,328 @@
+ # Get indices for key columns
+ column_indices: dict[str, int | None] = {}
+ for col in key_columns:
+ column_indices[col] = get_column_index(simplified_events_columns, col)
+
+ # Format events
</file context>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
7 issues found across 25 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them.
<file name="posthog/temporal/ai/session_summary/activities/a1_export_session_video.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a1_export_session_video.py:80">
P2: Missing string concatenation: The second f-string is a standalone expression and won't be concatenated to `msg`. The error message will be incomplete, missing the duration details. Wrap both strings in parentheses.</violation>
</file>
<file name="ee/hogai/videos/utils.py">
<violation number="1" location="ee/hogai/videos/utils.py:16">
P2: `track.duration` can be `None` in pymediainfo when duration metadata is unavailable, causing a `TypeError` on division. Consider checking for `None` before using the value.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/activities/a4_consolidate_video_segments.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a4_consolidate_video_segments.py:95">
P2: If the LLM returns `{"segments": null}`, `parsed.get("segments", [])` returns `None` (not the default), causing a `TypeError` when iterating. Use `or []` to handle both missing keys and null values.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py:48">
P1: Missing `tmp_file.flush()` after writing video bytes. When passing a temp file's name to another library for reading, you must flush the buffer first to ensure all data is written to disk. Without this, the Gemini upload may receive an empty or partial file.</violation>
<violation number="2" location="posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py:61">
P2: Polling loop has no timeout mechanism. If Gemini's file processing hangs indefinitely, this activity will block forever. Consider adding a maximum wait time or iteration count with a timeout error.</violation>
</file>
<file name="posthog/temporal/ai/session_summary/summarize_session_group.py">
<violation number="1" location="posthog/temporal/ai/session_summary/summarize_session_group.py:406">
P1: Missing error handling: The try/except block was removed, but the method signature still returns `None | Exception` and the caller (`_run_summaries`) expects exceptions to be returned, not raised. Without the try/except, any exception from `ensure_llm_single_session_summary` will propagate through the `TaskGroup`, causing all concurrent session summaries to be cancelled - defeating the stated purpose of "avoid one activity failing the whole group."</violation>
</file>
<file name="posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py">
<violation number="1" location="posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py:279">
P1: `get_column_index` raises `ValueError` when a column is not found, but this code expects it to return `None` for missing columns. Optional columns like `$exception_types`, `$exception_values` may not exist in all event schemas, causing this to crash.</violation>
</file>
Reply to cubic to teach it or ask questions. Re-run a review with @cubic-dev-ai review this PR
|
|
||
| # Check if session is too short for summarization - note: this is different from the video duration, but probs close enough | ||
| if session_duration * 1000 < MIN_SESSION_DURATION_FOR_SUMMARY_MS: | ||
| msg = f"Session {inputs.session_id} video is too short for summarization " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Missing string concatenation: The second f-string is a standalone expression and won't be concatenated to msg. The error message will be incomplete, missing the duration details. Wrap both strings in parentheses.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a1_export_session_video.py, line 80:
<comment>Missing string concatenation: The second f-string is a standalone expression and won't be concatenated to `msg`. The error message will be incomplete, missing the duration details. Wrap both strings in parentheses.</comment>
<file context>
@@ -0,0 +1,130 @@
+
+ # Check if session is too short for summarization - note: this is different from the video duration, but probs close enough
+ if session_duration * 1000 < MIN_SESSION_DURATION_FOR_SUMMARY_MS:
+ msg = f"Session {inputs.session_id} video is too short for summarization "
+ f"({session_duration * 1000:.0f}ms < {MIN_SESSION_DURATION_FOR_SUMMARY_MS}ms)"
+ logger.error(msg, session_id=inputs.session_id, signals_type="session-summaries")
</file context>
| for track in media_info.tracks: | ||
| if track.track_type == "General": | ||
| # Convert ms to seconds, ceil to avoid grey "not-rendered" frames at the start | ||
| return int(math.ceil(track.duration / 1000.0)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: track.duration can be None in pymediainfo when duration metadata is unavailable, causing a TypeError on division. Consider checking for None before using the value.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At ee/hogai/videos/utils.py, line 16:
<comment>`track.duration` can be `None` in pymediainfo when duration metadata is unavailable, causing a `TypeError` on division. Consider checking for `None` before using the value.</comment>
<file context>
@@ -0,0 +1,17 @@
+ for track in media_info.tracks:
+ if track.track_type == "General":
+ # Convert ms to seconds, ceil to avoid grey "not-rendered" frames at the start
+ return int(math.ceil(track.duration / 1000.0))
+ raise ValueError("No General track found in video to extract duration from")
</file context>
| confusion_detected=item.get("confusion_detected", False), | ||
| abandonment_detected=item.get("abandonment_detected", False), | ||
| ) | ||
| for item in parsed.get("segments", []) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: If the LLM returns {"segments": null}, parsed.get("segments", []) returns None (not the default), causing a TypeError when iterating. Use or [] to handle both missing keys and null values.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a4_consolidate_video_segments.py, line 95:
<comment>If the LLM returns `{"segments": null}`, `parsed.get("segments", [])` returns `None` (not the default), causing a `TypeError` when iterating. Use `or []` to handle both missing keys and null values.</comment>
<file context>
@@ -0,0 +1,189 @@
+ confusion_detected=item.get("confusion_detected", False),
+ abandonment_detected=item.get("abandonment_detected", False),
+ )
+ for item in parsed.get("segments", [])
+ ]
+
</file context>
| file=tmp_file.name, config=types.UploadFileConfig(mime_type=asset.export_format) | ||
| ) | ||
| # Wait for file to be ready | ||
| while uploaded_file.state and uploaded_file.state.name == "PROCESSING": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P2: Polling loop has no timeout mechanism. If Gemini's file processing hangs indefinitely, this activity will block forever. Consider adding a maximum wait time or iteration count with a timeout error.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py, line 61:
<comment>Polling loop has no timeout mechanism. If Gemini's file processing hangs indefinitely, this activity will block forever. Consider adding a maximum wait time or iteration count with a timeout error.</comment>
<file context>
@@ -0,0 +1,95 @@
+ file=tmp_file.name, config=types.UploadFileConfig(mime_type=asset.export_format)
+ )
+ # Wait for file to be ready
+ while uploaded_file.state and uploaded_file.state.name == "PROCESSING":
+ await asyncio.sleep(0.5)
+ logger.info(
</file context>
|
|
||
| # Write video to temporary file for upload | ||
| with tempfile.NamedTemporaryFile() as tmp_file: | ||
| tmp_file.write(video_bytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: Missing tmp_file.flush() after writing video bytes. When passing a temp file's name to another library for reading, you must flush the buffer first to ensure all data is written to disk. Without this, the Gemini upload may receive an empty or partial file.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a2_upload_video_to_gemini.py, line 48:
<comment>Missing `tmp_file.flush()` after writing video bytes. When passing a temp file's name to another library for reading, you must flush the buffer first to ensure all data is written to disk. Without this, the Gemini upload may receive an empty or partial file.</comment>
<file context>
@@ -0,0 +1,95 @@
+
+ # Write video to temporary file for upload
+ with tempfile.NamedTemporaryFile() as tmp_file:
+ tmp_file.write(video_bytes)
+ # Upload to Gemini
+ logger.info(
</file context>
| # Get indices for key columns | ||
| column_indices: dict[str, int | None] = {} | ||
| for col in key_columns: | ||
| column_indices[col] = get_column_index(simplified_events_columns, col) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P1: get_column_index raises ValueError when a column is not found, but this code expects it to return None for missing columns. Optional columns like $exception_types, $exception_values may not exist in all event schemas, causing this to crash.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At posthog/temporal/ai/session_summary/activities/a3_analyze_video_segment.py, line 279:
<comment>`get_column_index` raises `ValueError` when a column is not found, but this code expects it to return `None` for missing columns. Optional columns like `$exception_types`, `$exception_values` may not exist in all event schemas, causing this to crash.</comment>
<file context>
@@ -0,0 +1,329 @@
+ # Get indices for key columns
+ column_indices: dict[str, int | None] = {}
+ for col in key_columns:
+ column_indices[col] = get_column_index(simplified_events_columns, col)
+
+ # Format events
</file context>
|
|
||
| def _if_video_validation_enabled(self, user: User) -> bool | None: | ||
| # Check if the summaries should be validated with videos | ||
| def _determine_video_validation_enabled(self, user: User) -> bool | Literal["full"] | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't like 3 return types even one bit, but understand the reasoning, ok to keep it.
| logger = structlog.get_logger(__name__) | ||
|
|
||
|
|
||
| def get_video_duration_s(video_bytes: bytes) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume it's just the rename, no logic changed?
| @@ -0,0 +1,16 @@ | |||
| from .a1_export_session_video import export_session_video_activity | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I heavily dislike a1-a6 naming, could you elaborate a bit? :) Ok to keep it, but need to understand your reasoning better.
|
|
||
|
|
||
| async def ensure_llm_single_session_summary(inputs: SingleSessionSummaryInputs): | ||
| if inputs.video_validation_enabled == "full": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: No need to keep the code nested. If not full - start and return regular activity, no need for else afterward.
| user_distinct_id_to_log=inputs.user_distinct_id_to_log, | ||
| team_id=inputs.team_id, | ||
| redis_key_base=inputs.redis_key_base, | ||
| model_to_use="gemini-2.5-flash", # Default model for video analysis |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: Constants.
| f"({existing_duration_s * 1000:.0f}ms < {MIN_SESSION_DURATION_FOR_SUMMARY_MS}ms)", | ||
| non_retryable=True, | ||
| ) | ||
| logger.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest removing all the info logging, as I don't expect us read it. Warnings/exceptions - sure, while info means "everything's ok", so seems like excessive logging.
| ) | ||
| for segment_spec in segment_specs | ||
| ] | ||
| segment_results = await asyncio.gather(*segment_tasks) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would strongly suggest using TaskGroup instead of gather (like in posthog/temporal/ai/session_summary/activities/patterns.py#L304), as any failed activity will fail the whole workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is could be the reason while we schedule the same workflow twice (new one even if the old one didn't fail completely).
| segment_index=segment.segment_index, | ||
| ) | ||
|
|
||
| team_name = (await Team.objects.only("name").aget(id=inputs.team_id)).name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we need only team name - can we get it outside of the activity once, instead of querying on every loop iteration?
| ] | ||
|
|
||
| # Activity 3: Analyze all segments in parallel | ||
| segment_tasks = [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest adding Semaphore with limit of 10 or smth, as we can easily hit RPM/TPM.
| import traceback | ||
|
|
||
| self.stdout.write(traceback.format_exc()) | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to do any cleanup?
sortafreel
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lots of blocking comments, approved as a matter of habbit 😆
…sthog into video-based-session-summaries
…sthog into video-based-session-summaries
Problem
We've started with session summaries based on just the events, but we've seen the reliability of such description to be lower than when we use an actual session video.
Accurate descriptions of what the user genuinely experienced are essential, as they will allow us to identify real clusters of user experiences, letting us propose meaningful engineering tasks with improvements.
Changes
This adds a new pathway of creating a single session summary, implemented as a sequence of Temporal activities (behind flag
max-session-summarization-video-as-base):export_session_video_activity– exports a full video of the session via ExportedAsset creation (if session already has a video ExportedAsset, this is skipped)upload_video_to_gemini_activity– queries ExportedAsset for the relevant video, and uploads it to the Gemini Files APIanalyze_video_segment_activity– each run is a different 15s chunk of the full session video analyzed by Gemini (with events present in the 15s span included for context)consolidate_video_segments_activity– concats the LLM-generated descriptions of all the 15s chunks, and runs them through one last LLM call to segment the complete session transcript into user journey segmentsembed_and_store_segments_activity– queues every user journey segment toembedding-worker, storing every segment in an embedded format for later clusteringstore_video_session_summary_activity– takes the summary created so far, and serializes it into our good old friend SingleSessionSummary, in the same format as classic summaries(these files are prefixed with
afor "activity", likea1_export_session_video, because reflecting the order is so helpful, but also Python requires module names to start with a letter)Example result (Hedgebox dummy app)
Which existing building blocks are used here
SummarizeSingleSessionWorkflow, which also results in the sameSessionSummarySerializeroutput as existing session summarization. This branching is done oninputs.video_validation_enabled == "full".embedding-worker, which handles the embedding processing async (this is valuable for later clustering, but not for immediate consumption).What is missing
SummarizeSingleSessionStreamWorkflow) – this gets kicked off directly in the session replay UI, and I've not yet rejigged it in this PR as this one's large already; means "Summarize this session" button errors outplayback_speedof 1.0 in the full session export, long sessions take proportionally long to export, and somewhere the summarization workflow timeout right now is 600 s, so sessions longer 10 minutes fail 😿abondonmentorconfusionfields of summaries is there, but feels meh, I'm not sure it's useful as isHow did you test this code?
Mostly manual testing via the new
python manage.py team_signals__summarize_single_session <session_id>command.Currently no tests covering the new branches. Do bash my lack of tests.