Skip to content

Commit 97bbf3b

Browse files
Allow more empty loops before stopping log streaming (#52614)
In #50715 we starting short-circuiting if we hit 5 iterations of no new log messages. This works well, except in the scenario where there are no log messages at all. ES log handler has it's own short-circuit for that scenario, but it triggers based on time and that works out to ~7 iterations. Let's let ES have the first crack at it so the user gets a better message. Co-authored-by: Rahul Vats <[email protected]>
1 parent c489678 commit 97bbf3b

File tree

2 files changed

+4
-7
lines changed

2 files changed

+4
-7
lines changed

airflow-core/src/airflow/utils/log/log_reader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ class TaskLogReader:
4545

4646
STREAM_LOOP_SLEEP_SECONDS = 1
4747
"""Time to sleep between loops while waiting for more logs"""
48-
STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 5
48+
49+
STREAM_LOOP_STOP_AFTER_EMPTY_ITERATIONS = 10
4950
"""Number of empty loop iterations before stopping the stream"""
5051

5152
def read_log_chunks(

airflow-core/tests/unit/utils/log/test_log_reader.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,7 @@ def test_read_log_stream_should_read_each_try_in_turn(self, mock_read):
222222
def test_read_log_stream_no_end_of_log_marker(self, mock_read):
223223
mock_read.side_effect = [
224224
(["hello"], {"end_of_log": False}),
225-
([], {"end_of_log": False}),
226-
([], {"end_of_log": False}),
227-
([], {"end_of_log": False}),
228-
([], {"end_of_log": False}),
229-
([], {"end_of_log": False}),
225+
*[([], {"end_of_log": False}) for _ in range(10)],
230226
]
231227

232228
self.ti.state = TaskInstanceState.SUCCESS
@@ -237,7 +233,7 @@ def test_read_log_stream_no_end_of_log_marker(self, mock_read):
237233
"hello\n",
238234
"(Log stream stopped - End of log marker not found; logs may be incomplete.)\n",
239235
]
240-
assert mock_read.call_count == 6
236+
assert mock_read.call_count == 11
241237

242238
def test_supports_external_link(self):
243239
task_log_reader = TaskLogReader()

0 commit comments

Comments
 (0)