-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Description
What would you like to happen?
I would like to be able to have ReadFromPubSub
understand that an incoming field in my message payload might be null, and that's OK.
I want this to work:
pipeline:
source:
type: ReadFromPubSub
config:
subscription: "sub"
format: JSON
schema:
type: object
properties:
id: { type: string }
event_subtype: { type: string } # <-- this causes a failure if message does not have this field
But it fails with KeyError: 'event_subtype'
. Expand below to see the stack trace.
Error message from worker: generic::unknown: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 2086, in <lambda>
wrapper = lambda x: [fn(x)]
^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/yaml_io.py", line 380, in mapper
values = parser(msg.data).as_dict()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 224, in parse
return to_row(o)
^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <lambda>
{name: convert(value[name])
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <dictcomp>
{name: convert(value[name])
~~~~~^^^^^^
KeyError: 'event_subtype'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute
response = task()
^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 696, in process_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1274, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 566, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 259, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 949, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1585, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 683, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1680, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1793, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 949, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1585, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 683, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1680, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1793, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 949, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1606, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 2086, in <lambda>
wrapper = lambda x: [fn(x)]
^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/yaml_io.py", line 380, in mapper
values = parser(msg.data).as_dict()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 224, in parse
return to_row(o)
^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <lambda>
{name: convert(value[name])
File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <dictcomp>
{name: convert(value[name])
~~~~~^^^^^^
KeyError: "event_subtype [while running 'read_from_pubsub/ParseMessage-ptransform-44']"
passed through:
==>
dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1334
I've tried two things to make this work:
event_subtype: { type: ["string", "null"] }
Which fails with ValueError: Error applying transform "ReadFromPubSub" at line 5: unhashable type: 'list'
.
And:
event_subtype:
oneOf:
- type: "string"
- type: "null"
Which fails with ValueError: Error applying transform "ReadFromPubSub" at line 5: Malformed type {'oneOf': [{'type': 'string'}, {'type': None}]}.
.
In my case, when publishing to Pub/Sub event_subtype
is actually omitted from the JSON bytes if it's empty (json:"event_subtype,omitempty"
in Go). So in this case it's not actually null
, it's just not there but I thought I'd try the above anyway.
Is there an existing solution here or do I have to solve this by changing the shape of the data at publish time?
Issue Priority
Priority: 2 (default / most feature requests should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner