-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Description
What happened?
The pipeline [1] splits a text file with the delimiter "ABC". For the input text "ABABCD", the expected result is ["AB", "D"]. But, the actual pipeline result is ["ABABCD", "D"]. See [2] for the result with DirectRunner.
I guess the delimiter match at TextSource has the root cause. It processes the input text "ABABCD" as follows, so fails to match the delimiter "ABC" in the input text.
"A": "A" == delimiter[0] (= "A"), set delPosn to 1
"B": "B" == delimiter[1] (= "B"), set delPosn to 2
"A": "A" != delimiter[2] (= "C"), set delPosn to 0 <-- This is wrong. delPosn should be 1 as "A" matches delimiter[0]
"B": "B" != delimiter[0] (= "A"), set delPosn to 0
"C": "C" != delimiter[0] (= "A"), set delPosn to 0
"D": "D" != delimiter[0] (= "A"), set delPosn to 0
I think this is something like a regex match problem (e.g. delimiter "ABCABCABD" and input text "...ABCABCABC...". It may need to have multiple delPosn
s for partial matches).
[1]
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TextReadJob {
private static final Logger LOG = LoggerFactory.getLogger(TextReadJob.class);
private static final String INPUT_PATH = "short.csv"; // content: "ABABCD"
private static final byte[] DELIMITER = "ABC".getBytes(StandardCharsets.UTF_8);
public static void main(String[] args) throws IOException {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(TextIO.read().from(INPUT_PATH).withDelimiter(DELIMITER)).apply(ParDo.of(new DoFn<String, Void>() {
@ProcessElement
public void processElement(@Element String input) {
LOG.info("input: <{}>", input);
}
}));
pipeline.run();
}
}
[2]
ug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource getEstimatedSizeBytes
INFO: Filepattern short.csv matched 1 files with total size 6
Aug 19, 2024 11:30:43 PM org.apache.beam.sdk.io.FileBasedSource split
INFO: Splitting filepattern short.csv into bundles of size 0 took 1 ms and produced 1 files and 6 bundles
Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement
INFO: input: <ABABCD>
Aug 19, 2024 11:30:43 PM baeminbo.TextReadJob$1 processElement
INFO: input: <D>
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner