Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion_datasource::{
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -187,6 +188,12 @@ pub struct ListingTable {
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
/// Cache scoped to this [`ListingTable`].
///
/// Anonymous tables do not have a stable table id in the shared cache key
/// and may read the same path with different explicit schemas. Use this
/// cache for those tables rather than populating the shared session cache.
local_statistics_cache: Arc<dyn FileStatisticsCache>,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this fixes the anonymous ListingTable statistics reuse issue, but it does so by giving each anonymous table its own DefaultFileStatisticsCache.

That seems to bypass the intended global datafusion.runtime.file_statistics_cache_limit, since every anonymous table gets a separate cache and with_cache copies the full shared cache limit into each instance.

The invariant that was failing appears to be narrower: anonymous reads with SchemaSource::Specified should not reuse statistics that were computed for the same path under a different schema.

Could we avoid caching entirely for anonymous specified-schema tables instead? Registered tables could continue using the shared cache through their table reference, and anonymous inferred-schema reads could still share statistics by path when the schema is inferred consistently.

/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -231,6 +238,7 @@ impl ListingTable {
options,
definition: None,
collected_statistics: None,
local_statistics_cache: Arc::new(DefaultFileStatisticsCache::default()),
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
Expand Down Expand Up @@ -260,10 +268,26 @@ impl ListingTable {
/// multiple times in the same session.
///
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
if let Some(cache) = &cache {
self.local_statistics_cache
.update_cache_limit(cache.cache_limit());
}
self.collected_statistics = cache;
self
}

fn statistics_cache(
&self,
has_table_reference: bool,
) -> Option<&Arc<dyn FileStatisticsCache>> {
let shared_cache = self.collected_statistics.as_ref()?;
if has_table_reference {
Some(shared_cache)
} else {
Some(&self.local_statistics_cache)
}
}

/// Specify the SQL definition for this table, if any
pub fn with_definition(mut self, definition: Option<String>) -> Self {
self.definition = definition;
Expand Down Expand Up @@ -807,7 +831,7 @@ impl ListingTable {
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cache) = &self.collected_statistics
if let Some(cache) = self.statistics_cache(path.table.is_some())
&& let Some(cached) = cache.get(&path)
&& cached.is_valid_for(meta)
{
Expand All @@ -825,7 +849,7 @@ impl ListingTable {
let statistics = Arc::new(file_meta.statistics);

// Store in cache
if let Some(cache) = &self.collected_statistics {
if let Some(cache) = self.statistics_cache(path.table.is_some()) {
cache.put(
&path,
CachedFileMetadata::new(
Expand Down
61 changes: 58 additions & 3 deletions datafusion/core/tests/parquet/file_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use std::fs;
use std::sync::Arc;

use arrow::datatypes::{DataType, Field, Schema};
use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
Expand All @@ -26,9 +27,9 @@ use datafusion::datasource::listing::{
use datafusion::datasource::source::DataSourceExec;
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::prelude::SessionContext;
use datafusion_common::DFSchema;
use datafusion::prelude::{ParquetReadOptions, SessionContext};
use datafusion_common::stats::Precision;
use datafusion_common::{DFSchema, TableReference};
use datafusion_execution::cache::DefaultListFilesCache;
use datafusion_execution::cache::cache_manager::{
CacheManagerConfig, FileStatisticsCache,
Expand Down Expand Up @@ -105,7 +106,9 @@ async fn check_stats_precision_with_filter_pushdown() {
async fn load_table_stats_with_session_level_cache() {
let testdata = datafusion::test_util::parquet_test_data();
let filename = format!("{}/{}", testdata, "alltypes_plain.parquet");
let table_path = ListingTableUrl::parse(filename).unwrap();
let table_path = ListingTableUrl::parse(filename)
.unwrap()
.with_table_ref(TableReference::bare("alltypes_plain"));

let (cache1, _, state1) = get_cache_runtime_state();

Expand Down Expand Up @@ -166,6 +169,58 @@ async fn load_table_stats_with_session_level_cache() {
assert_eq!(get_static_cache_size(&state1), 1);
}

#[tokio::test]
async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() {
let temp_dir = tempdir().unwrap();
let parquet_path = temp_dir.path().join("data.parquet");
let parquet_path = parquet_path.to_string_lossy().to_string();

let ctx = SessionContext::new_with_config(
SessionConfig::new().with_collect_statistics(true),
);

ctx.sql(&format!(
"COPY (
SELECT 1::BIGINT AS id, 1000::BIGINT AS population
) TO '{parquet_path}' STORED AS PARQUET"
))
.await
.unwrap()
.collect()
.await
.unwrap();

ctx.read_parquet(&parquet_path, ParquetReadOptions::default())
.await
.unwrap()
.collect()
.await
.unwrap();

let wider_schema = Schema::new(vec![
Field::new("id", DataType::Int64, true),
Field::new("population", DataType::Int64, true),
Field::new("extra", DataType::Int64, true),
]);

let plan = ctx
.read_parquet(
&parquet_path,
ParquetReadOptions::default().schema(&wider_schema),
)
.await
.unwrap()
.select_columns(&["id", "extra"])
.unwrap()
.create_physical_plan()
.await
.unwrap();

let stats = plan.partition_statistics(None).unwrap();
assert_eq!(stats.column_statistics.len(), 2);
assert_eq!(stats.column_statistics[1].null_count, Precision::Exact(1));
}

#[tokio::test]
async fn list_files_with_session_level_cache() {
let p_name = "alltypes_plain.parquet";
Expand Down
Loading