Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion vortex-file/src/strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ pub struct WriteStrategyBuilder {
field_writers: HashMap<FieldPath, Arc<dyn LayoutStrategy>>,
allow_encodings: Option<HashSet<ArrayId>>,
flat_strategy: Option<Arc<dyn LayoutStrategy>>,
probe_compressor: Option<Arc<dyn CompressorPlugin>>,
}

impl Default for WriteStrategyBuilder {
Expand All @@ -161,6 +162,7 @@ impl Default for WriteStrategyBuilder {
field_writers: HashMap::new(),
allow_encodings: Some(ALLOWED_ENCODINGS.clone()),
flat_strategy: None,
probe_compressor: None,
}
}
}
Expand Down Expand Up @@ -215,6 +217,12 @@ impl WriteStrategyBuilder {
self
}

/// Override the compressor used to probe whether a column is dict-eligible.
pub fn with_probe_compressor<C: CompressorPlugin>(mut self, compressor: C) -> Self {
self.probe_compressor = Some(Arc::new(compressor));
self
}

/// Builds the canonical [`LayoutStrategy`] implementation, with the configured overrides
/// applied.
pub fn build(self) -> Arc<dyn LayoutStrategy> {
Expand Down Expand Up @@ -268,14 +276,20 @@ impl WriteStrategyBuilder {
CompressorConfig::BtrBlocks(builder) => Arc::new(builder.build()),
CompressorConfig::Opaque(compressor) => compressor,
};
let compress_then_flat = CompressingStrategy::new(flat, stats_compressor);
let compress_then_flat = CompressingStrategy::new(flat, Arc::clone(&stats_compressor));

// 3. apply dict encoding or fallback
let probe_compressor = if let Some(probe_compressor) = self.probe_compressor {
probe_compressor
} else {
Arc::clone(&stats_compressor)
};
let dict = DictStrategy::new(
coalescing.clone(),
compress_then_flat.clone(),
coalescing,
Default::default(),
probe_compressor,
);

// 2. calculate stats for each row group
Expand Down
82 changes: 82 additions & 0 deletions vortex-file/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ use vortex_array::stats::PRUNING_STATS;
use vortex_array::stream::ArrayStreamAdapter;
use vortex_array::stream::ArrayStreamExt;
use vortex_array::validity::Validity;
use vortex_btrblocks::BtrBlocksCompressorBuilder;
use vortex_btrblocks::SchemeExt;
use vortex_btrblocks::schemes::string::StringDictScheme;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBufferMut;
use vortex_buffer::buffer;
Expand Down Expand Up @@ -1813,6 +1816,16 @@ fn assert_offsets_ordered(before: &[u64], after: &[u64], context: &str) {
}
}

/// Whether any node in the layout tree is a dict layout.
fn layout_has_dict(layout: &dyn Layout) -> bool {
layout.encoding_id().as_ref() == "vortex.dict"
|| layout
.children()
.unwrap()
.iter()
.any(|child| layout_has_dict(child.as_ref()))
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> {
Expand Down Expand Up @@ -1861,6 +1874,75 @@ async fn test_segment_ordering_dict_codes_before_values() -> VortexResult<()> {
Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn dict_probe_honours_configured_compressor() -> VortexResult<()> {
// Low-cardinality strings so the default cascade picks a dictionary.
let n = 32_768;
let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect();
let strings = VarBinArray::from(values).into_array();

let mut buf = ByteBufferMut::empty();
let summary = SESSION
.write_options()
.with_strategy(crate::strategy::WriteStrategyBuilder::default().build())
.write(&mut buf, strings.clone().to_array_stream())
.await?;
assert!(
layout_has_dict(summary.footer().layout().as_ref()),
"default builder should produce a dict layout for low-cardinality strings"
);

let no_string_dict =
BtrBlocksCompressorBuilder::default().exclude_schemes([StringDictScheme.id()]);
let mut buf = ByteBufferMut::empty();
let summary = SESSION
.write_options()
.with_strategy(
crate::strategy::WriteStrategyBuilder::default()
.with_btrblocks_builder(no_string_dict)
.build(),
)
.write(&mut buf, strings.to_array_stream())
.await?;
assert!(
!layout_has_dict(summary.footer().layout().as_ref()),
"excluding StringDict from the configured compressor should disable the dict layout"
);

Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn probe_compressor_override_is_independent() -> VortexResult<()> {
// Low-cardinality strings the default cascade would dict-encode.
let n = 32_768;
let values: Vec<&str> = (0..n).map(|i| ["alpha", "beta", "gamma"][i % 3]).collect();
let strings = VarBinArray::from(values).into_array();

let probe_without_dict = BtrBlocksCompressorBuilder::default()
.exclude_schemes([StringDictScheme.id()])
.build();

let mut buf = ByteBufferMut::empty();
let summary = SESSION
.write_options()
.with_strategy(
crate::strategy::WriteStrategyBuilder::default()
.with_probe_compressor(probe_without_dict)
.build(),
)
.write(&mut buf, strings.to_array_stream())
.await?;
assert!(
!layout_has_dict(summary.footer().layout().as_ref()),
"probe override should disable the dict layout independently of the data/stats compressor"
);

Ok(())
}

#[tokio::test]
#[cfg_attr(miri, ignore)]
async fn test_segment_ordering_zonemaps_after_data() -> VortexResult<()> {
Expand Down
4 changes: 4 additions & 0 deletions vortex-layout/src/layouts/dict/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ mod tests {
use vortex_array::scalar_fn::session::ScalarFnSession;
use vortex_array::session::ArraySession;
use vortex_array::validity::Validity;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_error::VortexExpect;
use vortex_io::runtime::Handle;
use vortex_io::runtime::single::block_on;
Expand Down Expand Up @@ -329,6 +330,7 @@ mod tests {
FlatLayoutStrategy::default(),
FlatLayoutStrategy::default(),
DictLayoutOptions::default(),
Arc::new(BtrBlocksCompressor::default()),
);

let array = VarBinArray::from_iter(
Expand Down Expand Up @@ -428,6 +430,7 @@ mod tests {
FlatLayoutStrategy::default(),
FlatLayoutStrategy::default(),
DictLayoutOptions::default(),
Arc::new(BtrBlocksCompressor::default()),
);

let array =
Expand Down Expand Up @@ -479,6 +482,7 @@ mod tests {
FlatLayoutStrategy::default(),
FlatLayoutStrategy::default(),
DictLayoutOptions::default(),
Arc::new(BtrBlocksCompressor::default()),
);

let array = VarBinArray::from_iter(
Expand Down
9 changes: 7 additions & 2 deletions vortex-layout/src/layouts/dict/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use vortex_array::builders::dict::dict_encoder;
use vortex_array::dtype::DType;
use vortex_array::dtype::Nullability;
use vortex_array::dtype::PType;
use vortex_btrblocks::BtrBlocksCompressor;
use vortex_error::VortexError;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
Expand All @@ -42,6 +41,7 @@ use crate::LayoutRef;
use crate::LayoutStrategy;
use crate::OwnedLayoutChildren;
use crate::layouts::chunked::ChunkedLayout;
use crate::layouts::compressed::CompressorPlugin;
use crate::layouts::dict::DictLayout;
use crate::segments::SegmentSinkRef;
use crate::sequence::SendableSequentialStream;
Expand Down Expand Up @@ -106,6 +106,7 @@ pub struct DictStrategy {
values: Arc<dyn LayoutStrategy>,
fallback: Arc<dyn LayoutStrategy>,
options: DictLayoutOptions,
probe_compressor: Arc<dyn CompressorPlugin>,
}

impl DictStrategy {
Expand All @@ -114,12 +115,14 @@ impl DictStrategy {
values: Values,
fallback: Fallback,
options: DictLayoutOptions,
probe_compressor: Arc<dyn CompressorPlugin>,
) -> Self {
Self {
codes: Arc::new(codes),
values: Arc::new(values),
fallback: Arc::new(fallback),
options,
probe_compressor,
}
}
}
Expand Down Expand Up @@ -153,7 +156,9 @@ impl LayoutStrategy for DictStrategy {
None => true, // empty stream
Some(chunk) => {
let mut exec_ctx = session.create_execution_ctx();
let compressed = BtrBlocksCompressor::default().compress(&chunk, &mut exec_ctx)?;
let compressed = self
.probe_compressor
.compress_chunk(&chunk, &mut exec_ctx)?;
!compressed.is::<Dict>()
}
};
Expand Down