Skip to content

Conversation

@liurenjie1024
Copy link
Contributor

I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.

What's changed and what's your intention?

Support limiting kafka query range through condition like _rw_kafka_timestamp > '2022.-01-01 00:00:00+00:00'.

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • All checks passed in ./risedev check (or alias, ./risedev c)

Documentation

If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.

Types of user-facing changes

Please keep the types that apply to your changes, and remove those that do not apply.

  • SQL commands, functions, and operators

Release note

Now we can limit query kafka timestamp range through conditions like _rw_kafka_timestamp > '2022.-01-01 00:00:00+00:00'.

Refer to a related PR or issue link (optional)

Closes #6361

@liurenjie1024 liurenjie1024 added the user-facing-changes Contains changes that are visible to users label Dec 30, 2022
@liurenjie1024 liurenjie1024 force-pushed the renjie/kafka_timestamp_filter branch from 3aae8ca to d3a903d Compare December 30, 2022 08:33
@codecov
Copy link

codecov bot commented Dec 30, 2022

Codecov Report

Merging #7150 (63ba900) into main (d2ef784) will decrease coverage by 0.04%.
The diff coverage is 37.50%.

@@            Coverage Diff             @@
##             main    #7150      +/-   ##
==========================================
- Coverage   73.16%   73.12%   -0.05%     
==========================================
  Files        1053     1053              
  Lines      167680   167858     +178     
==========================================
+ Hits       122690   122752      +62     
- Misses      44990    45106     +116     
Flag Coverage Δ
rust 73.12% <37.50%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
...rc/connector/src/source/kafka/enumerator/client.rs 0.00% <0.00%> (ø)
src/connector/src/source/kafka/source/reader.rs 0.00% <0.00%> (ø)
src/frontend/src/optimizer/plan_node/mod.rs 91.59% <ø> (ø)
src/frontend/src/scheduler/plan_fragmenter.rs 68.86% <0.00%> (-0.49%) ⬇️
...frontend/src/optimizer/plan_node/logical_source.rs 63.63% <44.75%> (-35.13%) ⬇️
src/frontend/src/handler/create_source.rs 59.30% <60.00%> (-1.27%) ⬇️
src/frontend/src/handler/create_table.rs 89.46% <100.00%> (+0.02%) ⬆️
...c/frontend/src/optimizer/plan_node/batch_source.rs 63.79% <100.00%> (+1.29%) ⬆️
src/storage/src/hummock/compactor/sstable_store.rs 74.54% <0.00%> (-0.91%) ⬇️
src/object_store/src/object/mem.rs 86.74% <0.00%> (-0.76%) ⬇️
... and 6 more

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@ZENOTME ZENOTME left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool!LGTM!

}
ret
};
// println!("Watermark: {:?}", watermarks);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use tracing here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for debug, I will remove this.

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will have further discussion about the logic and scalability of the code

hold the pr for now

Comment on lines +191 to +192
// All columns except `_row_id` or starts with `_rw` should be visible.
let is_hidden = c.name.starts_with("_rw");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All columns except _row_id or starts with _rw should be visible.

is it a convention?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, but currently we only have this hidden column, so I just want to leave it here.

///
/// If `expr` can be recognized and consumed by this function, then we return `None`.
/// Otherwise `expr` is returned.
fn expr_to_kafka_timestamp_range(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get why we still have a time range in the optimizer. Is not all splits that don't match time requirements are dismissed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Timestamp range will apply to every split.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will extract the timestamp literal in planner or optimizer. And use kafka_enumerator.list_splits_batch in plan_fragmet to covert timestamp to the according offset.

BTW, this pr don't filter the invalid offset range. As we discuss in #7112 (comment)_, directly return empty vec if reader find the offset range is empty is good enough.

Comment on lines +103 to +109
let kafka_timestamp_column = ColumnDesc {
data_type: DataType::Timestamptz,
column_id: ColumnId::new(column_descs.len() as i32),
name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(),
field_descs: vec![],
type_name: "".to_string(),
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see code that fills this field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which field?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the funtion of this column(field) is that binder can use it to identify the '_rw_kafka_timestamp' in WHERE clause. And we don't need fill this field in row?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

field_desc is used for struct type.

@mergify mergify bot merged commit 1406718 into main Jan 3, 2023
@mergify mergify bot deleted the renjie/kafka_timestamp_filter branch January 3, 2023 08:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

A-batch Area: Batch engine. user-facing-changes Contains changes that are visible to users

Projects

None yet

Development

Successfully merging this pull request may close these issues.

batch: Implement kafka timestamp push down.

4 participants