Skip to content

limit pruning ignores RowSelection #22941

@haohuaijin

Description

@haohuaijin

Describe the bug

RowGroupAccessPlanFilter::prune_by_limit counts a fully matched row group by its full RowGroupMetaData::num_rows().

This is wrong when the row group uses RowGroupAccess::Selection(...), because only the selected rows should count toward the LIMIT.

prune_by_limit also rebuilds the plan with:

new_access_plan.scan(idx);

This drops the original RowSelection and changes the row group back to a full scan.

To Reproduce

use std::fs::File;
use std::sync::Arc;

use arrow::array::{Array, Int32Array, RecordBatch};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::common::{DFSchema, Result};
use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
use datafusion::datasource::physical_plan::{FileScanConfigBuilder, ParquetSource};
use datafusion::execution::object_store::ObjectStoreUrl;
use datafusion::parquet::arrow::ArrowWriter;
use datafusion::parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use datafusion::parquet::file::properties::WriterProperties;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{SessionContext, col, lit};
use tempfile::NamedTempFile;

#[tokio::main]
async fn main() -> Result<()> {
    let test_file = write_test_file()?;
    let schema = test_file.schema;

    let mut access_plan = ParquetAccessPlan::new_all(2);
    access_plan.scan_selection(
        0,
        RowSelection::from(vec![RowSelector::select(10), RowSelector::skip(990)]),
    );

    let partitioned_file =
        PartitionedFile::new(test_file.path, test_file.size).with_extension(access_plan);

    let ctx = SessionContext::new();
    let predicate_expr = col("id").gt_eq(lit(0_i32));
    let df_schema = DFSchema::try_from(Arc::clone(&schema))?;
    let predicate = ctx.create_physical_expr(predicate_expr, &df_schema)?;

    let source =
        Arc::new(ParquetSource::new(Arc::clone(&schema)).with_predicate(predicate));
    let config = FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), source)
        .with_file(partitioned_file)
        .with_limit(Some(50))
        .build();

    let plan: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(config);
    let batches = datafusion::physical_plan::collect(plan, ctx.task_ctx()).await?;
    let actual_ids = collect_ids(&batches);

    let expected_ids = (0..10).chain(1000..1040).collect::<Vec<_>>();

    println!("actual ids:   {actual_ids:?}");
    println!("expected ids: {expected_ids:?}");

    assert_eq!(
        actual_ids, expected_ids,
        "LIMIT pruning should preserve RowSelection and count only selected rows"
    );

    Ok(())
}

struct TestFile {
    path: String,
    size: u64,
    schema: Arc<Schema>,
    _temp_file: NamedTempFile,
}

fn write_test_file() -> Result<TestFile> {
    let temp_file = NamedTempFile::new()?;
    let path = temp_file.path().to_string_lossy().to_string();
    let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]));

    let props = WriterProperties::builder()
        .set_max_row_group_row_count(Some(1000))
        .build();

    let file = File::create(&path)?;
    let mut writer = ArrowWriter::try_new(file, Arc::clone(&schema), Some(props))?;

    writer.write(&batch(0, Arc::clone(&schema))?)?;
    writer.write(&batch(1000, Arc::clone(&schema))?)?;
    writer.close()?;

    let size = temp_file.path().metadata()?.len();

    Ok(TestFile {
        path,
        size,
        schema,
        _temp_file: temp_file,
    })
}

fn batch(start: i32, schema: Arc<Schema>) -> Result<RecordBatch> {
    let values = (start..start + 1000).collect::<Vec<_>>();
    Ok(RecordBatch::try_new(
        schema,
        vec![Arc::new(Int32Array::from(values))],
    )?)
}

fn collect_ids(batches: &[RecordBatch]) -> Vec<i32> {
    batches
        .iter()
        .flat_map(|batch| {
            batch
                .column(0)
                .as_any()
                .downcast_ref::<Int32Array>()
                .expect("id column should be Int32")
                .iter()
                .map(|value| value.expect("id should be non-null"))
                .collect::<Vec<_>>()
        })
        .collect()
}

Expected behavior

No response

Additional context

find this when work on #22939

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No fields configured for Bug.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions