Skip to content

Add declared file scan output partitioning#22657

Open
gene-bordegaray wants to merge 5 commits into
apache:mainfrom
gene-bordegaray:gene.bordegaray/2026/05/file-scan-output-partitioning
Open

Add declared file scan output partitioning#22657
gene-bordegaray wants to merge 5 commits into
apache:mainfrom
gene-bordegaray:gene.bordegaray/2026/05/file-scan-output-partitioning

Conversation

@gene-bordegaray

@gene-bordegaray gene-bordegaray commented May 30, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Rationale for this change

This follows up on #22607 by replacing range-partitioning sqllogictest boilerplate with a general file/listing scan API for declared output partitioning.

Related: #21992, #22607, #22607 (comment)

What changes are included in this PR?

  • Add declared output_partitioning to file scan and listing table configuration.
  • Preserve declared partition counts during listing-table file grouping.
  • Serialize scan output_partitioning through physical plan proto.
  • Refactor range_partitioning.slt to use a CSV ListingTable instead of a custom test-only TableProvider / DataSource.

Contract:

  • Declared partitioning expressions are written against the full table schema before scan projection. For example, Range([range_key@0], [(10), (20)], 3) remains valid if the scan projects range_key and falls back to UnknownPartitioning(3) if range_key is not projected.
  • Listing tables create one file group per declared output partition (which can exceed target_partitions). It is up to the user to plan their partitioning. For example, a 4-partition range declaration creates four scan file groups, adding empty trailing groups when fewer files are present.
  • File group index is part of the contract: file group i must contain rows for declared output partition i. DataFusion does not validate row placement, matching other user-declared properties such as sortedness.

Are these changes tested?

Yes.

Are there any user-facing changes?

Yes. This adds public API for declaring file/listing scan output partitioning. No breaking API changes.

@github-actions github-actions Bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) catalog Related to the catalog crate proto Related to proto crate datasource Changes to the datasource crate labels May 30, 2026
@github-actions

github-actions Bot commented May 30, 2026

Copy link
Copy Markdown

Thank you for opening this pull request!

Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch).

Details
error: `cargo metadata` exited with an error:     Updating crates.io index
error: failed to get `windows` as a dependency of package `sysinfo v0.39.3`
    ... which satisfies dependency `sysinfo = "^0.39.3"` (locked to 0.39.3) of package `datafusion v54.0.0 (/home/runner/work/datafusion/datafusion/datafusion/core)`

Caused by:
  failed to load source for dependency `windows`

Caused by:
  unable to update registry `crates-io`

Caused by:
  download of wi/nd/windows failed

Caused by:
  curl failed

Caused by:
  [55] Failed sending data to the peer (OpenSSL SSL_read: SSL_ERROR_SYSCALL, errno 0)

@github-actions github-actions Bot added the auto detected api change Auto detected API change label May 30, 2026
@gene-bordegaray gene-bordegaray marked this pull request as ready for review June 2, 2026 12:33
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/05/file-scan-output-partitioning branch from 4be99d8 to e3e6a51 Compare June 2, 2026 13:34
@gene-bordegaray gene-bordegaray changed the title [WIP] Add declared file scan output partitioning Add declared file scan output partitioning Jun 2, 2026
@gene-bordegaray

Copy link
Copy Markdown
Contributor Author

cc: @alamb @stuhood @gabotechs @NGA-TRAN follow up for adding output_partitioning apis

@gabotechs

Copy link
Copy Markdown
Contributor

Nice! will take a look soon

@stuhood stuhood left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread datafusion/catalog-listing/src/table.rs Outdated
Comment on lines +512 to +515
// Partition pruning can remove files before grouping. Without a
// stable file-to-declared-partition mapping, regrouping the
// remaining files could shift them into the wrong partition index.
None

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should/can the partition filters be calculated after having assigned files to partitions? You'd essentially bucket the files into partitions during with_output_partitioning, I think?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed. For declared output partitioning, listing now keeps the full file set through file-group assignment and applies partition filters within each declared group afterward. That preserves the declared partition indexes while still pruning files.

/// If [`ListingOptions::output_partitioning`] is set, the returned file
/// groups preserve that declared partition count, including empty trailing
/// groups when needed, rather than using
/// [`ListingOptions::target_partitions`].

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it should be documented on the target_partitions and output_partitioning setters/options rather than here? e.g. that only one may be set.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed with tighter docs on ListingOptions::target_partitions / output_partitioning. The current behavior is that output_partitioning is authoritative when set, and target_partitions is only used when no output partitioning is declared.

Comment on lines +803 to +810
if output_partitioning.partition_count() != self.file_groups.len() {
debug!(
"Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.",
output_partitioning.partition_count(),
self.file_groups.len()
);
return Partitioning::UnknownPartitioning(self.file_groups.len());
}

@stuhood stuhood Jun 4, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In which case would this happen? The case where partition filters have eliminated some files...? It feels like it should be a louder error.

@gene-bordegaray gene-bordegaray Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed for the ListingTable path: mismatched declared partition counts now produce a planning error before FileScanConfig is built. I kept FileScanConfig defensive for direct construction and documented that it fall back to UnknownPartitioning.

@gabotechs gabotechs left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job here! this seems to be going in the right direction, left some comments, let me know what you think

Comment thread datafusion/catalog-listing/src/options.rs Outdated
Comment thread datafusion/catalog-listing/src/table.rs Outdated
Comment thread datafusion/catalog-listing/src/options.rs
Comment thread datafusion/catalog-listing/src/table.rs
Comment thread datafusion/catalog-listing/src/table.rs Outdated
Comment thread datafusion/catalog-listing/src/table.rs Outdated
Comment thread datafusion/catalog-listing/src/table.rs Outdated
Comment on lines 825 to 828
if preserve_partition_count && !file_groups.is_empty() {
file_groups.resize_with(target_partitions, || FileGroup::new(vec![]));
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit weird that if preserve_partition_count == true then we are willing to increase partitioning by resizing file_groups anyway.

Maybe it's just a matter of a better name for the preserve_partition_count variable?

@gene-bordegaray gene-bordegaray Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the preserve_partition_count flag/path and left grouping behavior based on whether output_partitioning is declared.

Comment on lines +1300 to +1302
.with_target_partitions(1)
.with_output_partitioning(Some(Partitioning::RoundRobinBatch(4)));

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not look very clean from a public API standpoint, the fact that users are allowed to pass contradicting information.

One idea that comes to mind is to deprecate .with_target_partitions(), and have an implementation that does something like:

pub fn with_target_partitions(n: usize) -> Self {
    self.output_partitioning = Some(Partitioning::RoundRobinBatch(n));
    self
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is a bit contradictory which is why I tried to make it as explicit as ossible in docs and tests. I don’t think mapping with_target_partitions to RoundRobinBatch is the best approach though since it kinda muddles parallelism with declaring round robin partitioning semantics.

I am suggesting a good amount of follow-up in my comments I realize but I think it's the best way to handle these decisions so there is more dedicated thought to them. What would you think about for this PR I keepin the output_partitioning authoritative when set and that target_partitions is aligned in that case.

Then I think deprecating or redesigning with_target_partitions is a discussion we can have in a n issue as its a pretty big knob people use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 Is it? I'm not convince this is very used, I'm actually surprised this even exists here.

The "proper" way of telling DF how many partitions to use is with the standard config parameter datafusion.execution.target_partitions. Given this, I'm surprised there's this alternative path for choosing the partitions.

I'll dig in a bit more in the git history trying to get some context on when was this creating and how it's expected to be used taking into account that datafusion.execution.target_partitions already exists, maybe we find that this is actually something that should have been deprecated a long time ago.

/// projection or filtering. If the partition count does not match the
/// number of file groups, [`DataSource::output_partitioning`] falls back to
/// [`Partitioning::UnknownPartitioning`].
pub output_partitioning: Option<Partitioning>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see an opportunity for removing partitioned_by_file_group now?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, removing the partitioned_by_file_group field, and doing something like this?

    pub fn with_partitioned_by_file_group(
        mut self,
        partitioned_by_file_group: bool,
    ) -> Self {
        self.output_partitioning = partitioned_by_file_group
            .then(|| {
                hash_partitioning_from_partition_fields(
                    self.file_source.table_schema().table_schema(),
                    self.file_source.table_schema().table_partition_cols(),
                    self.file_groups.len(),
                )
            })
            .flatten();
        self
    }

@gene-bordegaray gene-bordegaray Jun 12, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this as follow-up. I think that output_partitioning gives opens up to remove partitioned_by_file_group, but doing it here is changing the Hive partition grouping path and don't know if we should do that in same PR as this guy is quite large

@gene-bordegaray

Copy link
Copy Markdown
Contributor Author

Talked with @gabotechs offline. TLDR the representation of this should most likely be behind the logical representation of partitioninig, not the physical one just as output_ordering does. Thus I will create a PR to support Range variant in the local enum before moving forward with this

cc: @stuhood

@alamb alamb marked this pull request as draft June 4, 2026 14:20
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/05/file-scan-output-partitioning branch from e3e6a51 to bc1e2fc Compare June 5, 2026 20:37
@github-actions github-actions Bot added logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates substrait Changes to the substrait crate common Related to common crate labels Jun 5, 2026
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/05/file-scan-output-partitioning branch from bc1e2fc to b22530a Compare June 6, 2026 13:17
pull Bot pushed a commit to buraksenn/datafusion that referenced this pull request Jun 10, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->

- Closes apache#22778.
- Related: apache#21992, apache#22395.
- Needed by apache#22657.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

Declared scan output partitioning should use logical partitioning
metadata, not physical partitioning types. This adds logical range
partitioning so range-partitioned sources can declare their layout at
the logical layer.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

- Add logical `Partitioning::Range` and `RangePartitioning`.
- Move `SplitPoint` and shared split-point validation to
`datafusion-common`.
- Wire logical range partitioning through expression traversal,
rewrites, and display.
- Keep planning, logical proto, and Substrait support explicitly
unsupported for now.

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

Yes. Unit tests added

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

Yes. This adds public logical range partitioning API. No breaking API
changes.

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
AdamGS pushed a commit to AdamGS/arrow-datafusion that referenced this pull request Jun 11, 2026
## Which issue does this PR close?

<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes apache#123` indicates that this PR will close issue apache#123.
-->

- Closes apache#22778.
- Related: apache#21992, apache#22395.
- Needed by apache#22657.

## Rationale for this change

<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->

Declared scan output partitioning should use logical partitioning
metadata, not physical partitioning types. This adds logical range
partitioning so range-partitioned sources can declare their layout at
the logical layer.

## What changes are included in this PR?

<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->

- Add logical `Partitioning::Range` and `RangePartitioning`.
- Move `SplitPoint` and shared split-point validation to
`datafusion-common`.
- Wire logical range partitioning through expression traversal,
rewrites, and display.
- Keep planning, logical proto, and Substrait support explicitly
unsupported for now.

## Are these changes tested?

<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code

If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->

Yes. Unit tests added

## Are there any user-facing changes?

<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->

Yes. This adds public logical range partitioning API. No breaking API
changes.

<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/05/file-scan-output-partitioning branch from b22530a to 05d7512 Compare June 12, 2026 08:08
@github-actions github-actions Bot removed logical-expr Logical plan and expressions physical-expr Changes to the physical-expr crates substrait Changes to the substrait crate common Related to common crate labels Jun 12, 2026
@gene-bordegaray gene-bordegaray marked this pull request as ready for review June 12, 2026 08:10
@gene-bordegaray gene-bordegaray force-pushed the gene.bordegaray/2026/05/file-scan-output-partitioning branch from 05d7512 to 1820234 Compare June 12, 2026 14:52
@gene-bordegaray

Copy link
Copy Markdown
Contributor Author

this is ready for another glance. I took the following approach to declaring the partitioning:

  • target_partitions is used as a scan parallelism hint. It is used when no declared output_partitioning is set.
  • output_partitioning overrides target_partitions when set. The listing table creates one file group per output partition and preserves empty groups when needed, validates the final group count, and then advertises the physical partitioning.

From comments this seems to be the topics for follow-up work / discussion:

  1. Replace the split between output_partitioning output_partitioning: Option and partitioned_by_file_group: bool some better internal structure. I was thinking unifying them with something like an internal enum:
enum FilePartitioning {
  None, 
  Declared(Partitioning), 
  DeriveFromPartitionedFileGroups,
}
  1. with_target_partitions. For now, keeping it as parallelism API, but maybe a rename or deprecation if we want a clearer distinction between scan parallelism and output partitioning layout.

cc: @gabotechs @NGA-TRAN @alamb @stuhood

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

auto detected api change Auto detected API change catalog Related to the catalog crate core Core DataFusion crate datasource Changes to the datasource crate proto Related to proto crate sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support declared output partitioning for file/listing scans

3 participants