Skip to content

Conversation

mukund-ananthu
Copy link
Contributor

@mukund-ananthu mukund-ananthu commented Jan 6, 2025

Current behavior:

If TransportError is thrown by google.auth library:

https://github.com/googleapis/google-auth-library-python/blob/c3ea09fd8b9ee8f094263fdc809c59d9dfaa3124/google/auth/compute_engine/_metadata.py#L187-L189

when doing a _gapic_publish:

response = self._client._gapic_publish(
topic=self._topic,
messages=[wrapper.message for wrapper in self._message_wrappers],
retry=self._commit_retry,
timeout=self._commit_timeout,
)

Then,

  1. The exception is not caught:

except google.api_core.exceptions.GoogleAPIError as exc:

  1. Status of batch not set to ERROR:

self._status = base.BatchStatus.ERROR

  1. Exceptions not set in the publish futures

if self._batch_done_callback is not None:
# Failed to publish batch.
self._batch_done_callback(batch_transport_succeeded)
for future in self._futures:
future.set_exception(exc)

  1. The thread in which the batch gapic_publish was occuring dies without setting the correct batch state / future exceptions:

commit_thread = threading.Thread(
name="Thread-CommitBatchPublisher", target=self._commit, daemon=True
)

  1. State of Batch remains IN_PROGRESS indefinitely

self._status = base.BatchStatus.IN_PROGRESS

  1. Subsequent publishes of messages result in creation of a new batch:

if self.status != base.BatchStatus.ACCEPTING_MESSAGES:
return None

while future is None:
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(wrapper)

  1. Messages that were in the batch whose commit thread died:
  • Will not have the TransportError set as an exception in their publish futures
  • The publish future will either hang or timeout

Behavior expected with change

  1. TransportError during gapic_publish will be caught in addition to the existing GoogleAPIError

except google.api_core.exceptions.GoogleAPIError as exc:

  1. Existing behavior when GoogleAPIError is caught during gapic_publish would also now apply to TransportError, i.e:
  • Batch status will be set to ERROR, future exceptions will be set:

except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
# all futures and exit.
self._status = base.BatchStatus.ERROR
if self._client.open_telemetry_enabled:
if self._rpc_span:
self._rpc_span.record_exception(
exception=exc,
)
self._rpc_span.set_status(
trace.Status(status_code=trace.StatusCode.ERROR)
)
self._rpc_span.end()
for wrapper in self._message_wrappers:
wrapper.end_create_span(exc=exc)
batch_transport_succeeded = False
if self._batch_done_callback is not None:
# Failed to publish batch.
self._batch_done_callback(batch_transport_succeeded)
for future in self._futures:
future.set_exception(exc)
return

  • Subsequent publish of messages on the batch object will cause an AssertionError:

with self._state_lock:
assert (
self._status != base.BatchStatus.ERROR
), "Publish after stop() or publish error."

which will be bubbled up through the Ordered / Unordered Sequenceer's publish method:

batch = self._ordered_batches[-1]
future = batch.publish(wrapper)
while future is None:
batch = self._create_batch(commit_retry=retry, commit_timeout=timeout)
self._ordered_batches.append(batch)
future = batch.publish(wrapper)
return future

to the Publisher client's publish method:

future = sequencer.publish(
wrapper=wrapper, retry=retry, timeout=timeout
)
future.add_done_callback(on_publish_done)
except BaseException as be:
# Exceptions can be thrown when attempting to add messages to
# the batch. If they're thrown, record them in publisher
# batching and create span, end the spans and bubble the
# exception up.
if self._open_telemetry_enabled:
if wrapper:
wrapper.end_publisher_batching_span(be)
wrapper.end_create_span(be)
else: # pragma: NO COVER
warnings.warn(
message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span",
category=RuntimeWarning,
)
raise be

which would bubble the exception back to the user

Additional details can be found here: #1173 (comment)

Fixes #1173 🦕

@mukund-ananthu mukund-ananthu requested review from a team as code owners January 6, 2025 01:18
@product-auto-label product-auto-label bot added size: s Pull request size is small. api: pubsub Issues related to the googleapis/python-pubsub API. labels Jan 6, 2025
@mukund-ananthu mukund-ananthu merged commit 0e058c7 into main Jan 6, 2025
27 checks passed
@mukund-ananthu mukund-ananthu deleted the gceMeta branch January 6, 2025 19:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. size: s Pull request size is small.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Publisher thread terminates, forever breaking publication when GCE metadata service blips
2 participants