-
Notifications
You must be signed in to change notification settings - Fork 704
feat: Initial support for kafka timestamp pushdown. #7150
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
3aae8ca to
d3a903d
Compare
Codecov Report
@@ Coverage Diff @@
## main #7150 +/- ##
==========================================
- Coverage 73.16% 73.12% -0.05%
==========================================
Files 1053 1053
Lines 167680 167858 +178
==========================================
+ Hits 122690 122752 +62
- Misses 44990 45106 +116
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
ZENOTME
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cool!LGTM!
| } | ||
| ret | ||
| }; | ||
| // println!("Watermark: {:?}", watermarks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not use tracing here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for debug, I will remove this.
tabVersion
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will have further discussion about the logic and scalability of the code
hold the pr for now
| // All columns except `_row_id` or starts with `_rw` should be visible. | ||
| let is_hidden = c.name.starts_with("_rw"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All columns except
_row_idor starts with_rwshould be visible.
is it a convention?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but currently we only have this hidden column, so I just want to leave it here.
| /// | ||
| /// If `expr` can be recognized and consumed by this function, then we return `None`. | ||
| /// Otherwise `expr` is returned. | ||
| fn expr_to_kafka_timestamp_range( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get why we still have a time range in the optimizer. Is not all splits that don't match time requirements are dismissed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Timestamp range will apply to every split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will extract the timestamp literal in planner or optimizer. And use kafka_enumerator.list_splits_batch in plan_fragmet to covert timestamp to the according offset.
BTW, this pr don't filter the invalid offset range. As we discuss in #7112 (comment)_, directly return empty vec if reader find the offset range is empty is good enough.
| let kafka_timestamp_column = ColumnDesc { | ||
| data_type: DataType::Timestamptz, | ||
| column_id: ColumnId::new(column_descs.len() as i32), | ||
| name: KAFKA_TIMESTAMP_COLUMN_NAME.to_string(), | ||
| field_descs: vec![], | ||
| type_name: "".to_string(), | ||
| }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see code that fills this field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which field?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the funtion of this column(field) is that binder can use it to identify the '_rw_kafka_timestamp' in WHERE clause. And we don't need fill this field in row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
field_desc is used for struct type.
I hereby agree to the terms of the Singularity Data, Inc. Contributor License Agreement.
What's changed and what's your intention?
Support limiting kafka query range through condition like
_rw_kafka_timestamp > '2022.-01-01 00:00:00+00:00'.Checklist
./risedev check(or alias,./risedev c)Documentation
If your pull request contains user-facing changes, please specify the types of the changes, and create a release note. Otherwise, please feel free to remove this section.
Types of user-facing changes
Please keep the types that apply to your changes, and remove those that do not apply.
Release note
Now we can limit query kafka timestamp range through conditions like
_rw_kafka_timestamp > '2022.-01-01 00:00:00+00:00'.Refer to a related PR or issue link (optional)
Closes #6361