Describe the bug
RowGroupAccessPlanFilter::prune_by_limit counts a fully matched row group by its full RowGroupMetaData::num_rows().
This is wrong when the row group uses RowGroupAccess::Selection(...), because only the selected rows should count toward the LIMIT.
prune_by_limit also rebuilds the plan with:
new_access_plan.scan(idx);
This drops the original RowSelection and changes the row group back to a full scan.
To Reproduce
use std::fs::File;
use std::sync::Arc;
use arrow::array::{Array, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::{DFSchema, Result};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionContext, col, lit};
use tempfile::NamedTempFile;
#[tokio::main]
async fn main() -> Result<()> {
let test_file = write_test_file()?;
let schema = test_file.schema;
let mut access_plan = ParquetAccessPlan::new_all(2);
access_plan.scan_selection(
0,
RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(990)]),
);
let partitioned_file =
PartitionedFile::new(test_file.path, test_file.size).with_extension(access_plan);
let ctx = SessionContext::new();
let predicate_expr = col("id").gt_eq(lit(0_i32));
let df_schema = DFSchema::try_from(Arc::clone(&schema))?;
let predicate = ctx.create_physical_expr(predicate_expr, &df_schema)?;
let source =
Arc::new(ParquetSource::new(Arc::clone(&schema)).with_predicate(predicate));
let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
.with_file(partitioned_file)
.with_limit(Some(50))
.build();
let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
let batches = datafusion::physical_plan::collect(plan, ctx.task_ctx()).await?;
let actual_ids = collect_ids(&batches);
let expected_ids = (0..10).chain(1000..1040).collect::<Vec<_>>();
println!("actual ids: {actual_ids:?}");
println!("expected ids: {expected_ids:?}");
assert_eq!(
actual_ids, expected_ids,
"LIMIT pruning should preserve RowSelection and count only selected rows"
);
Ok(())
}
struct TestFile {
path: String,
size: u64,
schema: Arc<Schema>,
_temp_file: NamedTempFile,
}
fn write_test_file() -> Result<TestFile> {
let temp_file = NamedTempFile::new()?;
let path = temp_file.path().to_string_lossy().to_string();
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));
let props = WriterProperties::builder()
.set_max_row_group_row_count(Some(1000))
.build();
let file = File::create(&path)?;
let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;
writer.write(&batch(0, Arc::clone(&schema))?)?;
writer.write(&batch(1000, Arc::clone(&schema))?)?;
writer.close()?;
let size = temp_file.path().metadata()?.len();
Ok(TestFile {
path,
size,
schema,
_temp_file: temp_file,
})
}
fn batch(start: i32, schema: Arc<Schema>) -> Result<RecordBatch> {
let values = (start..start + 1000).collect::<Vec<_>>();
Ok(RecordBatch::try_new(
schema,
vec![Arc::new(Int32Array::from(values))],
)?)
}
fn collect_ids(batches: &[RecordBatch]) -> Vec<i32> {
batches
.iter()
.flat_map(|batch| {
batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.expect("id column should be Int32")
.iter()
.map(|value| value.expect("id should be non-null"))
.collect::<Vec<_>>()
})
.collect()
}
Expected behavior
No response
Additional context
find this when work on #22939
Describe the bug
RowGroupAccessPlanFilter::prune_by_limitcounts a fully matched row group by its fullRowGroupMetaData::num_rows().This is wrong when the row group uses
RowGroupAccess::Selection(...), because only the selected rows should count toward theLIMIT.prune_by_limitalso rebuilds the plan with:This drops the original
RowSelectionand changes the row group back to a full scan.To Reproduce
Expected behavior
No response
Additional context
find this when work on #22939