Skip to content

[Bug]: TextIO.read() with non-default delimiter doesn't split at the right position #32251

@baeminbo

Description

@baeminbo

What happened?

The pipeline [1] splits a text file with the delimiter "ABC". For the input text "ABABCD", the expected result is ["AB", "D"]. But, the actual pipeline result is ["ABABCD", "D"]. See [2] for the result with DirectRunner.

I guess the delimiter match at TextSource has the root cause. It processes the input text "ABABCD" as follows, so fails to match the delimiter "ABC" in the input text.

"A": "A" == delimiter[0] (= "A"), set delPosn to 1
"B": "B" == delimiter[1] (= "B"), set delPosn to 2
"A": "A" != delimiter[2] (= "C"), set delPosn to 0 <-- This is wrong. delPosn should be 1 as "A" matches delimiter[0] 
"B": "B" != delimiter[0] (= "A"), set delPosn to 0
"C": "C" != delimiter[0] (= "A"), set delPosn to 0
"D": "D" != delimiter[0] (= "A"), set delPosn to 0

I think this is something like a regex match problem (e.g. delimiter "ABCABCABD" and input text "...ABCABCABC...". It may need to have multiple delPosns for partial matches).

[1]

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TextReadJob {
  private static final Logger LOG = LoggerFactory.getLogger(TextReadJob.class);

  private static final String INPUT_PATH = "short.csv"; // content: "ABABCD"

  private static final byte[] DELIMITER = "ABC".getBytes(StandardCharsets.UTF_8);

  public static void main(String[] args) throws IOException {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();

    Pipeline pipeline = Pipeline.create(options);
    
    pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER)).apply(ParDo.of(new DoFn<String, Void>() {
      @ProcessElement
      public void processElement(@Element String input) {
        LOG.info("input: <{}>", input);
      }
    }));

    pipeline.run();
  }
}

[2]

ug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern short.csv matched 1 files with total size 6
Aug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern short.csv into bundles of size 0 took 1 ms and produced 1 files and 6 bundles
Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement
INFO: input: <ABABCD>
Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement
INFO: input: <D>

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions