Skip to content

Support spilling for WindowAggExec#22947

Draft
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:spill-window-agg-exec
Draft

Support spilling for WindowAggExec#22947
wirybeaver wants to merge 1 commit into
apache:mainfrom
wirybeaver:spill-window-agg-exec

Conversation

@wirybeaver

@wirybeaver wirybeaver commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

Closes #22946.

Rationale for this change

WindowAggExec currently buffers the entire child input in memory, concatenates all buffered batches after the child stream is exhausted, then evaluates every window partition and emits a single output batch. This means memory usage scales with the full input size, not only with the largest window partition. Large inputs can therefore exhaust the memory pool even when individual partitions are small and DataFusion is configured with spill storage.

This PR changes WindowAggExec to process completed window partitions incrementally and adds spill support for the active partition when it cannot fit in the memory reservation.

Before:
  buffer all input -> concat all input -> compute all partitions -> emit once

After:
  buffer one active partition -> spill it if needed -> compute completed partition -> emit partition output

Design overview

The implementation keeps the operator's sorted-input contract: rows are grouped into logical window partitions using the existing partition-boundary evaluation. The execution model changes from whole-input buffering to partition-at-a-time buffering. Each completed partition is either retained in memory or moved to a spill file if the memory reservation cannot grow, then evaluated and emitted independently.

Child ExecutionPlan
        |
        v
WindowAggStream
  - polls sorted RecordBatches
  - splits batches into partition slices
  - keeps one ActivePartition at a time
        |
        +--> MemoryReservation
        |       |
        |       +-- reservation succeeds
        |       |       |
        |       |       v
        |       |  ActivePartition.in_memory_batches
        |       |
        |       +-- reservation fails
        |               |
        |               +-- disk manager disabled --> ResourcesExhausted
        |               |
        |               +-- disk manager enabled
        |                       |
        |                       v
        |                 SpillManager
        |                       |
        |                       v
        |                 InProgressSpillFile
        |                 - writes buffered batches
        |                 - appends later slices
        |                 - updates SpillMetrics
        |
        +--> partition boundary reached
                |
                +-- memory-only partition
                |       |
                |       v
                |  CompletedPartition::InMemory
                |
                +-- spilled partition
                        |
                        v
                   CompletedPartition::Spilled
                        |
                        v
                   SpillManager::read_spill_as_stream

CompletedPartition batches
        |
        v
WindowExpr evaluator
        |
        v
Output RecordBatch
  - original input columns
  - appended window expression columns

Memory reservations are released after each completed partition is evaluated. For spilled partitions, the operator still materializes the complete partition before evaluating window expressions; spilling is used to keep buffered input out of memory while waiting for the partition boundary.

Tradeoff and alternatives

The previous whole-input model can be faster for small or moderate inputs when memory is sufficient: it concatenates once, evaluates over larger contiguous batches, and emits fewer output batches. The partition-at-a-time model is more robust for large or skewed inputs and memory-limited execution, but it may add per-partition state management, repeated aggregation calls, and more output batches.

If preserving the current fully in-memory behavior is preferred, an alternative design is to introduce a separate operator such as SpillingWindowAggExec. The planner could choose the existing in-memory WindowAggExec for small/fast-path workloads and choose SpillingWindowAggExec when spill is enabled or when the query is expected to run under memory pressure.

Another possible direction is integrating spill or more streaming evaluation into BoundedWindowAggExec for bounded frames. I am open to that direction, but it seems like a separate design problem: BoundedWindowAggExec is already optimized around stateful in-memory partition buffers and pruning, so adding disk-backed state there likely needs a focused API/design rather than being folded into this PR.

What changes are included in this PR?

This PR adds spill support to WindowAggExec while changing the buffering and emission model from whole-input to completed-partition execution:

  • Replaces the previous whole-input self.batches buffer with active and completed partition state.
  • Registers a spill-capable MemoryConsumer for each WindowAggStream partition.
  • Tracks the active window partition separately from completed partitions.
  • Buffers rows by partition key and emits each completed partition independently instead of waiting for all input to finish.
  • Spills active partition batches through the existing SpillManager when memory reservation growth fails and a disk manager is configured.
  • Reads spilled partitions back before computing window expressions.
  • Releases memory reservations after each in-memory or spilled partition is evaluated.
  • Surfaces a more specific OOM context when spilling is unavailable or the partition still cannot be materialized for evaluation.
  • Reports standard spill metrics through SpillMetrics.

Are these changes tested?

Yes. Added focused unit coverage for:

  • spilling a partition that crosses input batch boundaries;
  • avoiding spill when memory is sufficient;
  • returning a resources-exhausted error when memory is constrained and the disk manager is disabled;
  • preserving empty-input behavior.

Locally verified after rebasing onto apache/datafusion/main:

/home/user/.cargo/bin/cargo fmt --check
/home/user/.cargo/bin/cargo test -p datafusion-physical-plan window_agg_exec --lib

running 8 tests
...
test result: ok. 8 passed; 0 failed; 0 ignored; 0 measured; 1474 filtered out; finished in 2.10s

@github-actions github-actions Bot added the physical-plan Changes to the physical-plan crate label Jun 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

physical-plan Changes to the physical-plan crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support spilling for WindowAggExec

1 participant