-
Notifications
You must be signed in to change notification settings - Fork 2.2k
feat: support file-level parquet row selections #22940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
137217a
512bd56
e6a80ff
49f6382
c1bd290
f57b059
0d39116
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| use crate::sort::reverse_row_selection; | ||
| use arrow::datatypes::Schema; | ||
| use datafusion_common::{Result, assert_eq_or_internal_err}; | ||
| use datafusion_common::{Result, assert_eq_or_internal_err, exec_err}; | ||
| use datafusion_physical_expr::expressions::Column; | ||
| use datafusion_physical_expr_common::sort_expr::LexOrdering; | ||
| use log::debug; | ||
|
|
@@ -104,6 +104,41 @@ pub struct ParquetAccessPlan { | |
| fully_matched: Vec<bool>, | ||
| } | ||
|
|
||
| /// A file-level row selection for a parquet scan. | ||
| /// | ||
| /// Attach this type to a [`PartitionedFile`](datafusion_datasource::PartitionedFile) | ||
| /// with [`PartitionedFile::with_extension`](datafusion_datasource::PartitionedFile::with_extension) | ||
| /// when an external index produces a [`RowSelection`] across the entire parquet | ||
| /// file. DataFusion will use parquet metadata to split it into row-group-level | ||
| /// access when the file is opened. | ||
| #[derive(Debug, Clone, PartialEq)] | ||
| pub struct ParquetRowSelection { | ||
| selection: RowSelection, | ||
| } | ||
|
|
||
| impl ParquetRowSelection { | ||
| /// Create a new file-level parquet row selection. | ||
| pub fn new(selection: RowSelection) -> Self { | ||
| Self { selection } | ||
| } | ||
|
|
||
| /// Return a reference to the underlying [`RowSelection`]. | ||
| pub fn selection(&self) -> &RowSelection { | ||
| &self.selection | ||
| } | ||
|
|
||
| /// Convert into the underlying [`RowSelection`]. | ||
| pub fn into_inner(self) -> RowSelection { | ||
| self.selection | ||
| } | ||
| } | ||
|
|
||
| impl From<RowSelection> for ParquetRowSelection { | ||
| fn from(selection: RowSelection) -> Self { | ||
| Self::new(selection) | ||
| } | ||
| } | ||
|
|
||
| /// Describes how the parquet reader will access a row group | ||
| #[derive(Debug, Clone, PartialEq)] | ||
| pub enum RowGroupAccess { | ||
|
|
@@ -169,6 +204,110 @@ impl ParquetAccessPlan { | |
| } | ||
| } | ||
|
|
||
| /// Create a new `ParquetAccessPlan` from a file-level [`RowSelection`]. | ||
| /// | ||
| /// The selection is interpreted across all rows in the file, in row group | ||
| /// order, and is split into row-group level access using `row_group_meta_data`. | ||
| /// Fully skipped row groups become [`RowGroupAccess::Skip`], fully selected | ||
| /// row groups become [`RowGroupAccess::Scan`], and partially selected row | ||
| /// groups become [`RowGroupAccess::Selection`]. | ||
| /// | ||
| /// # Errors | ||
| /// | ||
| /// Returns an error if the selection does not specify exactly the same | ||
| /// number of rows as the file metadata. | ||
| pub fn try_new_from_overall_row_selection( | ||
| selection: RowSelection, | ||
| row_group_meta_data: &[RowGroupMetaData], | ||
| ) -> Result<Self> { | ||
| let selectors: Vec<RowSelector> = selection.into(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The performance concern around If we keep this version, it would be helpful to add a short comment or benchmark note explaining the measured reason for the manual approach. Otherwise, the earlier |
||
| let mut selector_iter = selectors.into_iter(); | ||
| let mut current = selector_iter.next(); | ||
|
|
||
| let mut selection_rows = 0usize; | ||
| let mut file_rows = 0usize; | ||
|
|
||
| let mut row_groups = Vec::with_capacity(row_group_meta_data.len()); | ||
| for rg_meta in row_group_meta_data { | ||
| let rg_rows = rg_meta.num_rows() as usize; | ||
| file_rows += rg_rows; | ||
|
|
||
| // `leading` holds the first selector seen in this group when we have | ||
| // not yet seen a direction change; once both skip and select rows appear | ||
| // we promote to `mixed` so we can build the full RowSelection Vec. | ||
| // This avoids a heap allocation for the common all-scan / all-skip case. | ||
| let mut leading: Option<RowSelector> = None; | ||
| let mut mixed: Option<Vec<RowSelector>> = None; | ||
| let mut selected = 0usize; | ||
| let mut skipped = 0usize; | ||
| let mut remaining = rg_rows; | ||
|
|
||
| while remaining > 0 { | ||
| let Some(sel) = current else { break }; | ||
| let take = sel.row_count.min(remaining); | ||
| selection_rows += take; | ||
| remaining -= take; | ||
|
|
||
| let rg_sel = RowSelector { | ||
| row_count: take, | ||
| skip: sel.skip, | ||
| }; | ||
| if sel.skip { | ||
| skipped += take; | ||
| } else { | ||
| selected += take; | ||
| } | ||
|
|
||
| if let Some(v) = &mut mixed { | ||
| v.push(rg_sel); | ||
| } else if let Some(lead) = leading.as_mut() { | ||
| if lead.skip == rg_sel.skip { | ||
| // Merge adjacent same-direction pieces (e.g. a selector | ||
| // split at a row-group boundary has the same direction as | ||
| // its continuation). | ||
| lead.row_count += rg_sel.row_count; | ||
| } else { | ||
| mixed = Some(vec![*lead, rg_sel]); | ||
| } | ||
| } else { | ||
| leading = Some(rg_sel); | ||
| } | ||
|
|
||
| current = if take < sel.row_count { | ||
| Some(RowSelector { | ||
| row_count: sel.row_count - take, | ||
| skip: sel.skip, | ||
| }) | ||
| } else { | ||
| selector_iter.next() | ||
| }; | ||
| } | ||
|
|
||
| let access = if selected == 0 { | ||
| RowGroupAccess::Skip | ||
| } else if skipped == 0 { | ||
| RowGroupAccess::Scan | ||
| } else { | ||
| // Both `selected > 0` and `skipped > 0` means a direction change | ||
| // occurred, which always initialises `mixed`. | ||
| RowGroupAccess::Selection(mixed.unwrap_or_default().into()) | ||
| }; | ||
| row_groups.push(access); | ||
| } | ||
|
|
||
| selection_rows += current.map_or(0, |s| s.row_count) | ||
| + selector_iter.map(|s| s.row_count).sum::<usize>(); | ||
|
|
||
| if selection_rows != file_rows { | ||
| return exec_err!( | ||
| "Invalid Parquet RowSelection. File has {file_rows} rows, \ | ||
| but selection specifies {selection_rows} rows." | ||
| ); | ||
| } | ||
|
|
||
| Ok(Self::new(row_groups)) | ||
| } | ||
|
|
||
| /// Set the i-th row group to the specified [`RowGroupAccess`] | ||
| pub fn set(&mut self, idx: usize, access: RowGroupAccess) { | ||
| let should_scan = access.should_scan(); | ||
|
|
@@ -758,6 +897,57 @@ mod test { | |
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_new_from_overall_row_selection() { | ||
| let row_selection = RowSelection::from(vec![ | ||
| RowSelector::select(10), | ||
| RowSelector::skip(25), | ||
| RowSelector::select(10), | ||
| RowSelector::skip(15), | ||
| RowSelector::select(40), | ||
| ]); | ||
|
|
||
| let access_plan = ParquetAccessPlan::try_new_from_overall_row_selection( | ||
| row_selection, | ||
| &ROW_GROUP_METADATA, | ||
| ) | ||
| .unwrap(); | ||
|
|
||
| assert_eq!( | ||
| access_plan, | ||
| ParquetAccessPlan::new(vec![ | ||
| RowGroupAccess::Scan, | ||
| RowGroupAccess::Skip, | ||
| RowGroupAccess::Selection( | ||
| vec![ | ||
| RowSelector::skip(5), | ||
| RowSelector::select(10), | ||
| RowSelector::skip(15), | ||
| ] | ||
| .into() | ||
| ), | ||
| RowGroupAccess::Scan, | ||
| ]) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_new_from_overall_row_selection_invalid_row_count() { | ||
| let row_selection = RowSelection::from(vec![RowSelector::select(99)]); | ||
|
|
||
| let err = ParquetAccessPlan::try_new_from_overall_row_selection( | ||
| row_selection, | ||
| &ROW_GROUP_METADATA, | ||
| ) | ||
| .unwrap_err() | ||
| .to_string(); | ||
|
|
||
| assert_contains!( | ||
| err, | ||
| "Invalid Parquet RowSelection. File has 100 rows, but selection specifies 99 rows" | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_invalid_too_few() { | ||
| let access_plan = ParquetAccessPlan::new(vec![ | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work here. I think this could be simplified a bit by leaning on
RowSelection::split_off, since it already handles the boundary splitting invariant.One possible shape would be to keep
let mut remaining_selection = selection;, then for each row group calllet group_selection = remaining_selection.split_off(rg_rows);. From there, deriveselected = group_selection.row_count()andskipped = group_selection.skipped_row_count(), then map(selected, skipped)toSkip,Scan, orSelection(group_selection).After the loop, the total row count check could also be more direct by validating
remaining_selection.row_count() + remaining_selection.skipped_row_count() == 0along with the accumulated file count. That would remove the manualcurrent/leading/mixedcursor state machine.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for you detail suggestion. The main reason I wrote it this way is that using
split_offwill allocate new memory and also traverse the row groups two more time(split_offone time,row_countone time,skipped_row_countone time). I can write a benchmark later to see how much impact this has.