Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 85 additions & 1 deletion datafusion/core/tests/parquet/external_access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::physical_plan::ParquetSource;
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchema, assert_contains};
use datafusion_datasource_parquet::{ParquetAccessPlan, RowGroupAccess};
use datafusion_datasource_parquet::{
ParquetAccessPlan, ParquetRowSelection, RowGroupAccess,
};
use datafusion_execution::object_store::ObjectStoreUrl;
use datafusion_expr::{Expr, col, lit};
use datafusion_physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -152,6 +154,77 @@ async fn skip_scan() {
}
}

#[tokio::test]
async fn row_selection_extension() {
// The file has 2 row groups of 5 rows each (10 rows total). Attach a
// file-level `ParquetRowSelection` to the `PartitionedFile` and verify it
// survives the path from `PartitionedFile` into the parquet opener/reader.

// select a single row in the first row group
let parquet_metrics = TestFull {
access_plan: None,
row_selection: Some(ParquetRowSelection::new(RowSelection::from(vec![
RowSelector::skip(2),
RowSelector::select(1),
RowSelector::skip(7),
]))),
expected_rows: 1,
predicate: None,
}
.run()
.await
.unwrap();

// only the first row group is read, so some bytes are scanned
let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
assert_ne!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
}

#[tokio::test]
async fn row_selection_extension_spanning_row_groups() {
// A selection whose selectors straddle the row group boundary (row 4 is the
// last row of group 0, rows 5-6 are the first rows of group 1).
let parquet_metrics = TestFull {
access_plan: None,
row_selection: Some(ParquetRowSelection::new(RowSelection::from(vec![
RowSelector::skip(4),
RowSelector::select(3),
RowSelector::skip(3),
]))),
expected_rows: 3,
predicate: None,
}
.run()
.await
.unwrap();

let bytes_scanned = metric_value(&parquet_metrics, "bytes_scanned").unwrap();
assert_ne!(bytes_scanned, 0, "metrics : {parquet_metrics:#?}",);
}

#[tokio::test]
async fn bad_row_selection_extension() {
// selection specifies fewer rows than the file actually contains
let err = TestFull {
access_plan: None,
row_selection: Some(ParquetRowSelection::new(RowSelection::from(vec![
RowSelector::skip(2),
RowSelector::select(1),
]))),
expected_rows: 10000,
predicate: None,
}
.run()
.await
.unwrap_err();
let err_string = err.to_string();
assert_contains!(&err_string, "Invalid Parquet RowSelection");
assert_contains!(
&err_string,
"File has 10 rows, but selection specifies 3 rows."
);
}

#[tokio::test]
async fn plan_and_filter() {
// show that row group pruning is applied even when an initial plan is supplied
Expand All @@ -170,6 +243,7 @@ async fn plan_and_filter() {
// initial
let parquet_metrics = TestFull {
access_plan,
row_selection: None,
expected_rows: 0,
predicate: Some(predicate),
}
Expand Down Expand Up @@ -227,6 +301,7 @@ async fn bad_row_groups() {
RowGroupAccess::Skip,
RowGroupAccess::Scan,
])),
row_selection: None,
expected_rows: 0,
predicate: None,
}
Expand All @@ -249,6 +324,7 @@ async fn bad_selection() {
])),
RowGroupAccess::Skip,
])),
row_selection: None,
// expects that we hit an error, this should not be run
expected_rows: 10000,
predicate: None,
Expand Down Expand Up @@ -300,6 +376,7 @@ impl Test {
} = self;
TestFull {
access_plan,
row_selection: None,
expected_rows,
predicate: None,
}
Expand All @@ -317,6 +394,7 @@ impl Test {
/// 4. Returns the statistics from running the plan
struct TestFull {
access_plan: Option<ParquetAccessPlan>,
row_selection: Option<ParquetRowSelection>,
expected_rows: usize,
predicate: Option<Expr>,
}
Expand All @@ -327,6 +405,7 @@ impl TestFull {

let Self {
access_plan,
row_selection,
expected_rows,
predicate,
} = self;
Expand All @@ -352,6 +431,11 @@ impl TestFull {
partitioned_file = partitioned_file.with_extension(access_plan);
}

// add the file-level row selection, if any, as an extension
if let Some(row_selection) = row_selection {
partitioned_file = partitioned_file.with_extension(row_selection);
}

// Create a DataSourceExec to read the file
let object_store_url = ObjectStoreUrl::local_filesystem();
// add the predicate, if requested
Expand Down
192 changes: 191 additions & 1 deletion datafusion/datasource-parquet/src/access_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::sort::reverse_row_selection;
use arrow::datatypes::Schema;
use datafusion_common::{Result, assert_eq_or_internal_err};
use datafusion_common::{Result, assert_eq_or_internal_err, exec_err};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use log::debug;
Expand Down Expand Up @@ -104,6 +104,41 @@ pub struct ParquetAccessPlan {
fully_matched: Vec<bool>,
}

/// A file-level row selection for a parquet scan.
///
/// Attach this type to a [`PartitionedFile`](datafusion_datasource::PartitionedFile)
/// with [`PartitionedFile::with_extension`](datafusion_datasource::PartitionedFile::with_extension)
/// when an external index produces a [`RowSelection`] across the entire parquet
/// file. DataFusion will use parquet metadata to split it into row-group-level
/// access when the file is opened.
#[derive(Debug, Clone, PartialEq)]
pub struct ParquetRowSelection {
selection: RowSelection,
}

impl ParquetRowSelection {
/// Create a new file-level parquet row selection.
pub fn new(selection: RowSelection) -> Self {
Self { selection }
}

/// Return a reference to the underlying [`RowSelection`].
pub fn selection(&self) -> &RowSelection {
&self.selection
}

/// Convert into the underlying [`RowSelection`].
pub fn into_inner(self) -> RowSelection {
self.selection
}
}

impl From<RowSelection> for ParquetRowSelection {
fn from(selection: RowSelection) -> Self {
Self::new(selection)
}
}

/// Describes how the parquet reader will access a row group
#[derive(Debug, Clone, PartialEq)]
pub enum RowGroupAccess {
Expand Down Expand Up @@ -169,6 +204,110 @@ impl ParquetAccessPlan {
}
}

/// Create a new `ParquetAccessPlan` from a file-level [`RowSelection`].
///
/// The selection is interpreted across all rows in the file, in row group
/// order, and is split into row-group level access using `row_group_meta_data`.
/// Fully skipped row groups become [`RowGroupAccess::Skip`], fully selected
/// row groups become [`RowGroupAccess::Scan`], and partially selected row
/// groups become [`RowGroupAccess::Selection`].
///
/// # Errors
///
/// Returns an error if the selection does not specify exactly the same
/// number of rows as the file metadata.
pub fn try_new_from_overall_row_selection(
selection: RowSelection,
row_group_meta_data: &[RowGroupMetaData],
) -> Result<Self> {
let selectors: Vec<RowSelector> = selection.into();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work here. I think this could be simplified a bit by leaning on RowSelection::split_off, since it already handles the boundary splitting invariant.

One possible shape would be to keep let mut remaining_selection = selection;, then for each row group call let group_selection = remaining_selection.split_off(rg_rows);. From there, derive selected = group_selection.row_count() and skipped = group_selection.skipped_row_count(), then map (selected, skipped) to Skip, Scan, or Selection(group_selection).

After the loop, the total row count check could also be more direct by validating remaining_selection.row_count() + remaining_selection.skipped_row_count() == 0 along with the accumulated file count. That would remove the manual current / leading / mixed cursor state machine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for you detail suggestion. The main reason I wrote it this way is that using split_off will allocate new memory and also traverse the row groups two more time(split_off one time, row_count one time, skipped_row_count one time). I can write a benchmark later to see how much impact this has.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance concern around RowSelection::split_off makes sense. That said, this manual current / leading / mixed cursor logic is a bit harder to audit, especially since the full selector vector is still materialized up front.

If we keep this version, it would be helpful to add a short comment or benchmark note explaining the measured reason for the manual approach. Otherwise, the earlier split_off shape feels easier to reason about because it directly captures the boundary-splitting invariant.

let mut selector_iter = selectors.into_iter();
let mut current = selector_iter.next();

let mut selection_rows = 0usize;
let mut file_rows = 0usize;

let mut row_groups = Vec::with_capacity(row_group_meta_data.len());
for rg_meta in row_group_meta_data {
let rg_rows = rg_meta.num_rows() as usize;
file_rows += rg_rows;

// `leading` holds the first selector seen in this group when we have
// not yet seen a direction change; once both skip and select rows appear
// we promote to `mixed` so we can build the full RowSelection Vec.
// This avoids a heap allocation for the common all-scan / all-skip case.
let mut leading: Option<RowSelector> = None;
let mut mixed: Option<Vec<RowSelector>> = None;
let mut selected = 0usize;
let mut skipped = 0usize;
let mut remaining = rg_rows;

while remaining > 0 {
let Some(sel) = current else { break };
let take = sel.row_count.min(remaining);
selection_rows += take;
remaining -= take;

let rg_sel = RowSelector {
row_count: take,
skip: sel.skip,
};
if sel.skip {
skipped += take;
} else {
selected += take;
}

if let Some(v) = &mut mixed {
v.push(rg_sel);
} else if let Some(lead) = leading.as_mut() {
if lead.skip == rg_sel.skip {
// Merge adjacent same-direction pieces (e.g. a selector
// split at a row-group boundary has the same direction as
// its continuation).
lead.row_count += rg_sel.row_count;
} else {
mixed = Some(vec![*lead, rg_sel]);
}
} else {
leading = Some(rg_sel);
}

current = if take < sel.row_count {
Some(RowSelector {
row_count: sel.row_count - take,
skip: sel.skip,
})
} else {
selector_iter.next()
};
}

let access = if selected == 0 {
RowGroupAccess::Skip
} else if skipped == 0 {
RowGroupAccess::Scan
} else {
// Both `selected > 0` and `skipped > 0` means a direction change
// occurred, which always initialises `mixed`.
RowGroupAccess::Selection(mixed.unwrap_or_default().into())
};
row_groups.push(access);
}

selection_rows += current.map_or(0, |s| s.row_count)
+ selector_iter.map(|s| s.row_count).sum::<usize>();

if selection_rows != file_rows {
return exec_err!(
"Invalid Parquet RowSelection. File has {file_rows} rows, \
but selection specifies {selection_rows} rows."
);
}

Ok(Self::new(row_groups))
}

/// Set the i-th row group to the specified [`RowGroupAccess`]
pub fn set(&mut self, idx: usize, access: RowGroupAccess) {
let should_scan = access.should_scan();
Expand Down Expand Up @@ -758,6 +897,57 @@ mod test {
);
}

#[test]
fn test_new_from_overall_row_selection() {
let row_selection = RowSelection::from(vec![
RowSelector::select(10),
RowSelector::skip(25),
RowSelector::select(10),
RowSelector::skip(15),
RowSelector::select(40),
]);

let access_plan = ParquetAccessPlan::try_new_from_overall_row_selection(
row_selection,
&ROW_GROUP_METADATA,
)
.unwrap();

assert_eq!(
access_plan,
ParquetAccessPlan::new(vec![
RowGroupAccess::Scan,
RowGroupAccess::Skip,
RowGroupAccess::Selection(
vec![
RowSelector::skip(5),
RowSelector::select(10),
RowSelector::skip(15),
]
.into()
),
RowGroupAccess::Scan,
])
);
}

#[test]
fn test_new_from_overall_row_selection_invalid_row_count() {
let row_selection = RowSelection::from(vec![RowSelector::select(99)]);

let err = ParquetAccessPlan::try_new_from_overall_row_selection(
row_selection,
&ROW_GROUP_METADATA,
)
.unwrap_err()
.to_string();

assert_contains!(
err,
"Invalid Parquet RowSelection. File has 100 rows, but selection specifies 99 rows"
);
}

#[test]
fn test_invalid_too_few() {
let access_plan = ParquetAccessPlan::new(vec![
Expand Down
2 changes: 1 addition & 1 deletion datafusion/datasource-parquet/src/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ mod test_util;
mod virtual_column;
mod writer;

pub use access_plan::{ParquetAccessPlan, RowGroupAccess};
pub use access_plan::{ParquetAccessPlan, ParquetRowSelection, RowGroupAccess};
pub use file_format::*;
pub use metrics::ParquetFileMetrics;
pub use page_filter::PagePruningAccessPlanFilter;
Expand Down
Loading
Loading