Add declared file scan output partitioning#22657
Conversation
|
Thank you for opening this pull request! Reviewer note: cargo-semver-checks reported the current version number is not SemVer-compatible with the changes in this pull request (compared against the base branch). Details |
4be99d8 to
e3e6a51
Compare
|
cc: @alamb @stuhood @gabotechs @NGA-TRAN follow up for adding |
|
Nice! will take a look soon |
| // Partition pruning can remove files before grouping. Without a | ||
| // stable file-to-declared-partition mapping, regrouping the | ||
| // remaining files could shift them into the wrong partition index. | ||
| None |
There was a problem hiding this comment.
Should/can the partition filters be calculated after having assigned files to partitions? You'd essentially bucket the files into partitions during with_output_partitioning, I think?
There was a problem hiding this comment.
Addressed. For declared output partitioning, listing now keeps the full file set through file-group assignment and applies partition filters within each declared group afterward. That preserves the declared partition indexes while still pruning files.
| /// If [`ListingOptions::output_partitioning`] is set, the returned file | ||
| /// groups preserve that declared partition count, including empty trailing | ||
| /// groups when needed, rather than using | ||
| /// [`ListingOptions::target_partitions`]. |
There was a problem hiding this comment.
This feels like it should be documented on the target_partitions and output_partitioning setters/options rather than here? e.g. that only one may be set.
There was a problem hiding this comment.
Addressed with tighter docs on ListingOptions::target_partitions / output_partitioning. The current behavior is that output_partitioning is authoritative when set, and target_partitions is only used when no output partitioning is declared.
| if output_partitioning.partition_count() != self.file_groups.len() { | ||
| debug!( | ||
| "Declared output partitioning has {} partitions, but file scan has {} file groups. Falling back to UnknownPartitioning.", | ||
| output_partitioning.partition_count(), | ||
| self.file_groups.len() | ||
| ); | ||
| return Partitioning::UnknownPartitioning(self.file_groups.len()); | ||
| } |
There was a problem hiding this comment.
In which case would this happen? The case where partition filters have eliminated some files...? It feels like it should be a louder error.
There was a problem hiding this comment.
Addressed for the ListingTable path: mismatched declared partition counts now produce a planning error before FileScanConfig is built. I kept FileScanConfig defensive for direct construction and documented that it fall back to UnknownPartitioning.
gabotechs
left a comment
There was a problem hiding this comment.
Nice job here! this seems to be going in the right direction, left some comments, let me know what you think
| if preserve_partition_count && !file_groups.is_empty() { | ||
| file_groups.resize_with(target_partitions, || FileGroup::new(vec![])); | ||
| } | ||
|
|
There was a problem hiding this comment.
It's a bit weird that if preserve_partition_count == true then we are willing to increase partitioning by resizing file_groups anyway.
Maybe it's just a matter of a better name for the preserve_partition_count variable?
There was a problem hiding this comment.
I removed the preserve_partition_count flag/path and left grouping behavior based on whether output_partitioning is declared.
| .with_target_partitions(1) | ||
| .with_output_partitioning(Some(Partitioning::RoundRobinBatch(4))); | ||
|
|
There was a problem hiding this comment.
This does not look very clean from a public API standpoint, the fact that users are allowed to pass contradicting information.
One idea that comes to mind is to deprecate .with_target_partitions(), and have an implementation that does something like:
pub fn with_target_partitions(n: usize) -> Self {
self.output_partitioning = Some(Partitioning::RoundRobinBatch(n));
self
}There was a problem hiding this comment.
I agree this is a bit contradictory which is why I tried to make it as explicit as ossible in docs and tests. I don’t think mapping with_target_partitions to RoundRobinBatch is the best approach though since it kinda muddles parallelism with declaring round robin partitioning semantics.
I am suggesting a good amount of follow-up in my comments I realize but I think it's the best way to handle these decisions so there is more dedicated thought to them. What would you think about for this PR I keepin the output_partitioning authoritative when set and that target_partitions is aligned in that case.
Then I think deprecating or redesigning with_target_partitions is a discussion we can have in a n issue as its a pretty big knob people use.
There was a problem hiding this comment.
🤔 Is it? I'm not convince this is very used, I'm actually surprised this even exists here.
The "proper" way of telling DF how many partitions to use is with the standard config parameter datafusion.execution.target_partitions. Given this, I'm surprised there's this alternative path for choosing the partitions.
I'll dig in a bit more in the git history trying to get some context on when was this creating and how it's expected to be used taking into account that datafusion.execution.target_partitions already exists, maybe we find that this is actually something that should have been deprecated a long time ago.
| /// projection or filtering. If the partition count does not match the | ||
| /// number of file groups, [`DataSource::output_partitioning`] falls back to | ||
| /// [`Partitioning::UnknownPartitioning`]. | ||
| pub output_partitioning: Option<Partitioning>, |
There was a problem hiding this comment.
Do you see an opportunity for removing partitioned_by_file_group now?
There was a problem hiding this comment.
For example, removing the partitioned_by_file_group field, and doing something like this?
pub fn with_partitioned_by_file_group(
mut self,
partitioned_by_file_group: bool,
) -> Self {
self.output_partitioning = partitioned_by_file_group
.then(|| {
hash_partitioning_from_partition_fields(
self.file_source.table_schema().table_schema(),
self.file_source.table_schema().table_partition_cols(),
self.file_groups.len(),
)
})
.flatten();
self
}There was a problem hiding this comment.
Leaving this as follow-up. I think that output_partitioning gives opens up to remove partitioned_by_file_group, but doing it here is changing the Hive partition grouping path and don't know if we should do that in same PR as this guy is quite large
|
Talked with @gabotechs offline. TLDR the representation of this should most likely be behind the logical representation of partitioninig, not the physical one just as cc: @stuhood |
e3e6a51 to
bc1e2fc
Compare
bc1e2fc to
b22530a
Compare
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes apache#22778. - Related: apache#21992, apache#22395. - Needed by apache#22657. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Declared scan output partitioning should use logical partitioning metadata, not physical partitioning types. This adds logical range partitioning so range-partitioned sources can declare their layout at the logical layer. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Add logical `Partitioning::Range` and `RangePartitioning`. - Move `SplitPoint` and shared split-point validation to `datafusion-common`. - Wire logical range partitioning through expression traversal, rewrites, and display. - Keep planning, logical proto, and Substrait support explicitly unsupported for now. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes. Unit tests added ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes. This adds public logical range partitioning API. No breaking API changes. <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes apache#123` indicates that this PR will close issue apache#123. --> - Closes apache#22778. - Related: apache#21992, apache#22395. - Needed by apache#22657. ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Declared scan output partitioning should use logical partitioning metadata, not physical partitioning types. This adds logical range partitioning so range-partitioned sources can declare their layout at the logical layer. ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> - Add logical `Partitioning::Range` and `RangePartitioning`. - Move `SplitPoint` and shared split-point validation to `datafusion-common`. - Wire logical range partitioning through expression traversal, rewrites, and display. - Keep planning, logical proto, and Substrait support explicitly unsupported for now. ## Are these changes tested? <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> Yes. Unit tests added ## Are there any user-facing changes? <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> Yes. This adds public logical range partitioning API. No breaking API changes. <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
b22530a to
05d7512
Compare
05d7512 to
1820234
Compare
|
this is ready for another glance. I took the following approach to declaring the partitioning:
From comments this seems to be the topics for follow-up work / discussion:
enum FilePartitioning {
None,
Declared(Partitioning),
DeriveFromPartitionedFileGroups,
}
|
Which issue does this PR close?
Rationale for this change
This follows up on #22607 by replacing range-partitioning sqllogictest boilerplate with a general file/listing scan API for declared output partitioning.
Related: #21992, #22607, #22607 (comment)
What changes are included in this PR?
output_partitioningto file scan and listing table configuration.output_partitioningthrough physical plan proto.range_partitioning.sltto use a CSVListingTableinstead of a custom test-onlyTableProvider/DataSource.Contract:
Range([range_key@0], [(10), (20)], 3)remains valid if the scan projectsrange_keyand falls back toUnknownPartitioning(3)ifrange_keyis not projected.target_partitions). It is up to the user to plan their partitioning. For example, a 4-partition range declaration creates four scan file groups, adding empty trailing groups when fewer files are present.imust contain rows for declared output partitioni. DataFusion does not validate row placement, matching other user-declared properties such as sortedness.Are these changes tested?
Yes.
Are there any user-facing changes?
Yes. This adds public API for declaring file/listing scan output partitioning. No breaking API changes.