Skip to content

[Feature Request]: Allow schema fields that might be null or missing #35179

@jonathaningram

Description

@jonathaningram

What would you like to happen?

I would like to be able to have ReadFromPubSub understand that an incoming field in my message payload might be null, and that's OK.

I want this to work:

pipeline:
  source:
    type: ReadFromPubSub
    config:
      subscription: "sub"
      format: JSON
      schema:
        type: object
        properties:
          id: { type: string }
          event_subtype: { type: string } # <-- this causes a failure if message does not have this field

But it fails with KeyError: 'event_subtype'. Expand below to see the stack trace.

Error message from worker: generic::unknown: Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 2086, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/yaml_io.py", line 380, in mapper
    values = parser(msg.data).as_dict()
             ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 224, in parse
    return to_row(o)
           ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <lambda>
    {name: convert(value[name])
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <dictcomp>
    {name: convert(value[name])
                   ~~~~~^^^^^^
KeyError: 'event_subtype'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 313, in _execute
    response = task()
               ^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 388, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 658, in do_instruction
    return getattr(self, request_type)(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py", line 696, in process_bundle
    bundle_processor.process_bundle(instruction_id))
    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1274, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py", line 237, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 566, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 259, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 949, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1585, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 683, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1680, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1793, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 949, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1585, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 683, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1680, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1793, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 262, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 949, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 950, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1497, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1606, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1495, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 684, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "/usr/local/lib/python3.11/site-packages/apache_beam/transforms/core.py", line 2086, in <lambda>
    wrapper = lambda x: [fn(x)]
                         ^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/yaml_io.py", line 380, in mapper
    values = parser(msg.data).as_dict()
             ^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 224, in parse
    return to_row(o)
           ^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <lambda>
    {name: convert(value[name])
  File "/usr/local/lib/python3.11/site-packages/apache_beam/yaml/json_utils.py", line 191, in <dictcomp>
    {name: convert(value[name])
                   ~~~~~^^^^^^
KeyError: "event_subtype [while running 'read_from_pubsub/ParseMessage-ptransform-44']"

passed through:
==>
    dist_proc/dax/workflow/worker/fnapi_service_impl.cc:1334

I've tried two things to make this work:

event_subtype: { type: ["string", "null"] }

Which fails with ValueError: Error applying transform "ReadFromPubSub" at line 5: unhashable type: 'list'.

And:

event_subtype:
  oneOf:
    - type: "string"
    - type: "null"

Which fails with ValueError: Error applying transform "ReadFromPubSub" at line 5: Malformed type {'oneOf': [{'type': 'string'}, {'type': None}]}..

In my case, when publishing to Pub/Sub event_subtype is actually omitted from the JSON bytes if it's empty (json:"event_subtype,omitempty" in Go). So in this case it's not actually null, it's just not there but I thought I'd try the above anyway.

Is there an existing solution here or do I have to solve this by changing the shape of the data at publish time?

Issue Priority

Priority: 2 (default / most feature requests should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions