Skip to content

Commit 1f48d3a

Browse files
fix: show progress even in job optional queries (#2119)
* fix: show progress even in job optional queries * first attempt at publisher * report execution started/stopped in read_gbq_query * render bigquery sent events * Feat render more events (#2121) * feat: Render more BigQuery events in progress bar This change updates bigframes/formatting_helpers.py to render more event types from bigframes/core/events.py. Specifically, it adds rendering support for: - BigQueryRetryEvent - BigQueryReceivedEvent - BigQueryFinishedEvent - BigQueryUnknownEvent This provides users with more detailed feedback during query execution in both notebook (HTML) and terminal (plaintext) environments. * feat: Render more BigQuery events in progress bar This change updates bigframes/formatting_helpers.py to render more event types from bigframes/core/events.py. Specifically, it adds rendering support for: - BigQueryRetryEvent - BigQueryReceivedEvent - BigQueryFinishedEvent - BigQueryUnknownEvent This provides users with more detailed feedback during query execution in both notebook (HTML) and terminal (plaintext) environments. Unit tests have been added to verify the rendering of each new event type. --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> * fix job links * fix system tests * fix mypy * fix unit tests * support more event types * move publisher to session * fix remaining mypy errors * update text * add explicit unsubscribe * fix presubmits * add lock for publisher and publish temp table creations --------- Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com>
1 parent 095c0b8 commit 1f48d3a

28 files changed

+1155
-270
lines changed

bigframes/blob/_functions.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ def _create_udf(self):
9999
project=None,
100100
timeout=None,
101101
query_with_job=True,
102+
publisher=self._session._publisher,
102103
)
103104

104105
return udf_name

bigframes/core/events.py

Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import annotations
16+
17+
import dataclasses
18+
import datetime
19+
import threading
20+
from typing import Any, Callable, Optional, Set
21+
import uuid
22+
23+
import google.cloud.bigquery._job_helpers
24+
import google.cloud.bigquery.job.query
25+
import google.cloud.bigquery.table
26+
27+
import bigframes.session.executor
28+
29+
30+
class Subscriber:
31+
def __init__(self, callback: Callable[[Event], None], *, publisher: Publisher):
32+
self._publisher = publisher
33+
self._callback = callback
34+
self._subscriber_id = uuid.uuid4()
35+
36+
def __call__(self, *args, **kwargs):
37+
return self._callback(*args, **kwargs)
38+
39+
def __hash__(self) -> int:
40+
return hash(self._subscriber_id)
41+
42+
def __eq__(self, value: object):
43+
if not isinstance(value, Subscriber):
44+
return NotImplemented
45+
return value._subscriber_id == self._subscriber_id
46+
47+
def close(self):
48+
self._publisher.unsubscribe(self)
49+
del self._publisher
50+
del self._callback
51+
52+
def __enter__(self):
53+
return self
54+
55+
def __exit__(self, exc_type, exc_value, traceback):
56+
if exc_value is not None:
57+
self(
58+
UnknownErrorEvent(
59+
exc_type=exc_type,
60+
exc_value=exc_value,
61+
traceback=traceback,
62+
)
63+
)
64+
self.close()
65+
66+
67+
class Publisher:
68+
def __init__(self):
69+
self._subscribers_lock = threading.Lock()
70+
self._subscribers: Set[Subscriber] = set()
71+
72+
def subscribe(self, callback: Callable[[Event], None]) -> Subscriber:
73+
# TODO(b/448176657): figure out how to handle subscribers/publishers in
74+
# a background thread. Maybe subscribers should be thread-local?
75+
subscriber = Subscriber(callback, publisher=self)
76+
with self._subscribers_lock:
77+
self._subscribers.add(subscriber)
78+
return subscriber
79+
80+
def unsubscribe(self, subscriber: Subscriber):
81+
with self._subscribers_lock:
82+
self._subscribers.remove(subscriber)
83+
84+
def publish(self, event: Event):
85+
with self._subscribers_lock:
86+
for subscriber in self._subscribers:
87+
subscriber(event)
88+
89+
90+
class Event:
91+
pass
92+
93+
94+
@dataclasses.dataclass(frozen=True)
95+
class SessionClosed(Event):
96+
session_id: str
97+
98+
99+
class ExecutionStarted(Event):
100+
pass
101+
102+
103+
class ExecutionRunning(Event):
104+
pass
105+
106+
107+
@dataclasses.dataclass(frozen=True)
108+
class ExecutionFinished(Event):
109+
result: Optional[bigframes.session.executor.ExecuteResult] = None
110+
111+
112+
@dataclasses.dataclass(frozen=True)
113+
class UnknownErrorEvent(Event):
114+
exc_type: Any
115+
exc_value: Any
116+
traceback: Any
117+
118+
119+
@dataclasses.dataclass(frozen=True)
120+
class BigQuerySentEvent(ExecutionRunning):
121+
"""Query sent to BigQuery."""
122+
123+
query: str
124+
billing_project: Optional[str] = None
125+
location: Optional[str] = None
126+
job_id: Optional[str] = None
127+
request_id: Optional[str] = None
128+
129+
@classmethod
130+
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QuerySentEvent):
131+
return cls(
132+
query=event.query,
133+
billing_project=event.billing_project,
134+
location=event.location,
135+
job_id=event.job_id,
136+
request_id=event.request_id,
137+
)
138+
139+
140+
@dataclasses.dataclass(frozen=True)
141+
class BigQueryRetryEvent(ExecutionRunning):
142+
"""Query sent another time because the previous attempt failed."""
143+
144+
query: str
145+
billing_project: Optional[str] = None
146+
location: Optional[str] = None
147+
job_id: Optional[str] = None
148+
request_id: Optional[str] = None
149+
150+
@classmethod
151+
def from_bqclient(cls, event: google.cloud.bigquery._job_helpers.QueryRetryEvent):
152+
return cls(
153+
query=event.query,
154+
billing_project=event.billing_project,
155+
location=event.location,
156+
job_id=event.job_id,
157+
request_id=event.request_id,
158+
)
159+
160+
161+
@dataclasses.dataclass(frozen=True)
162+
class BigQueryReceivedEvent(ExecutionRunning):
163+
"""Query received and acknowledged by the BigQuery API."""
164+
165+
billing_project: Optional[str] = None
166+
location: Optional[str] = None
167+
job_id: Optional[str] = None
168+
statement_type: Optional[str] = None
169+
state: Optional[str] = None
170+
query_plan: Optional[list[google.cloud.bigquery.job.query.QueryPlanEntry]] = None
171+
created: Optional[datetime.datetime] = None
172+
started: Optional[datetime.datetime] = None
173+
ended: Optional[datetime.datetime] = None
174+
175+
@classmethod
176+
def from_bqclient(
177+
cls, event: google.cloud.bigquery._job_helpers.QueryReceivedEvent
178+
):
179+
return cls(
180+
billing_project=event.billing_project,
181+
location=event.location,
182+
job_id=event.job_id,
183+
statement_type=event.statement_type,
184+
state=event.state,
185+
query_plan=event.query_plan,
186+
created=event.created,
187+
started=event.started,
188+
ended=event.ended,
189+
)
190+
191+
192+
@dataclasses.dataclass(frozen=True)
193+
class BigQueryFinishedEvent(ExecutionRunning):
194+
"""Query finished successfully."""
195+
196+
billing_project: Optional[str] = None
197+
location: Optional[str] = None
198+
query_id: Optional[str] = None
199+
job_id: Optional[str] = None
200+
destination: Optional[google.cloud.bigquery.table.TableReference] = None
201+
total_rows: Optional[int] = None
202+
total_bytes_processed: Optional[int] = None
203+
slot_millis: Optional[int] = None
204+
created: Optional[datetime.datetime] = None
205+
started: Optional[datetime.datetime] = None
206+
ended: Optional[datetime.datetime] = None
207+
208+
@classmethod
209+
def from_bqclient(
210+
cls, event: google.cloud.bigquery._job_helpers.QueryFinishedEvent
211+
):
212+
return cls(
213+
billing_project=event.billing_project,
214+
location=event.location,
215+
query_id=event.query_id,
216+
job_id=event.job_id,
217+
destination=event.destination,
218+
total_rows=event.total_rows,
219+
total_bytes_processed=event.total_bytes_processed,
220+
slot_millis=event.slot_millis,
221+
created=event.created,
222+
started=event.started,
223+
ended=event.ended,
224+
)
225+
226+
227+
@dataclasses.dataclass(frozen=True)
228+
class BigQueryUnknownEvent(ExecutionRunning):
229+
"""Got unknown event from the BigQuery client library."""
230+
231+
# TODO: should we just skip sending unknown events?
232+
233+
event: object
234+
235+
@classmethod
236+
def from_bqclient(cls, event):
237+
return cls(event)

bigframes/dataframe.py

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4670,24 +4670,24 @@ def to_string(
46704670
) -> str | None:
46714671
return self.to_pandas(allow_large_results=allow_large_results).to_string(
46724672
buf,
4673-
columns, # type: ignore
4674-
col_space,
4675-
header, # type: ignore
4676-
index,
4677-
na_rep,
4678-
formatters,
4679-
float_format,
4680-
sparsify,
4681-
index_names,
4682-
justify,
4683-
max_rows,
4684-
max_cols,
4685-
show_dimensions,
4686-
decimal,
4687-
line_width,
4688-
min_rows,
4689-
max_colwidth,
4690-
encoding,
4673+
columns=columns, # type: ignore
4674+
col_space=col_space,
4675+
header=header, # type: ignore
4676+
index=index,
4677+
na_rep=na_rep,
4678+
formatters=formatters,
4679+
float_format=float_format,
4680+
sparsify=sparsify,
4681+
index_names=index_names,
4682+
justify=justify,
4683+
max_rows=max_rows,
4684+
max_cols=max_cols,
4685+
show_dimensions=show_dimensions,
4686+
decimal=decimal,
4687+
line_width=line_width,
4688+
min_rows=min_rows,
4689+
max_colwidth=max_colwidth,
4690+
encoding=encoding,
46914691
)
46924692

46934693
def to_html(
@@ -4720,28 +4720,28 @@ def to_html(
47204720
) -> str:
47214721
return self.to_pandas(allow_large_results=allow_large_results).to_html(
47224722
buf,
4723-
columns, # type: ignore
4724-
col_space,
4725-
header,
4726-
index,
4727-
na_rep,
4728-
formatters,
4729-
float_format,
4730-
sparsify,
4731-
index_names,
4732-
justify, # type: ignore
4733-
max_rows,
4734-
max_cols,
4735-
show_dimensions,
4736-
decimal,
4737-
bold_rows,
4738-
classes,
4739-
escape,
4740-
notebook,
4741-
border,
4742-
table_id,
4743-
render_links,
4744-
encoding,
4723+
columns=columns, # type: ignore
4724+
col_space=col_space,
4725+
header=header,
4726+
index=index,
4727+
na_rep=na_rep,
4728+
formatters=formatters,
4729+
float_format=float_format,
4730+
sparsify=sparsify,
4731+
index_names=index_names,
4732+
justify=justify, # type: ignore
4733+
max_rows=max_rows,
4734+
max_cols=max_cols,
4735+
show_dimensions=show_dimensions,
4736+
decimal=decimal,
4737+
bold_rows=bold_rows,
4738+
classes=classes,
4739+
escape=escape,
4740+
notebook=notebook,
4741+
border=border,
4742+
table_id=table_id,
4743+
render_links=render_links,
4744+
encoding=encoding,
47454745
)
47464746

47474747
def to_markdown(
@@ -4753,7 +4753,7 @@ def to_markdown(
47534753
allow_large_results: Optional[bool] = None,
47544754
**kwargs,
47554755
) -> str | None:
4756-
return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode, index, **kwargs) # type: ignore
4756+
return self.to_pandas(allow_large_results=allow_large_results).to_markdown(buf, mode=mode, index=index, **kwargs) # type: ignore
47574757

47584758
def to_pickle(self, path, *, allow_large_results=None, **kwargs) -> None:
47594759
return self.to_pandas(allow_large_results=allow_large_results).to_pickle(

0 commit comments

Comments
 (0)