diff --git a/datafusion/catalog-listing/src/table.rs b/datafusion/catalog-listing/src/table.rs index dd3675bd2b39d..92490f7470355 100644 --- a/datafusion/catalog-listing/src/table.rs +++ b/datafusion/catalog-listing/src/table.rs @@ -36,6 +36,7 @@ use datafusion_datasource::{ }; use datafusion_execution::cache::TableScopedPath; use datafusion_execution::cache::cache_manager::FileStatisticsCache; +use datafusion_execution::cache::file_statistics_cache::DefaultFileStatisticsCache; use datafusion_expr::dml::InsertOp; use datafusion_expr::execution_props::ExecutionProps; use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType}; @@ -187,6 +188,12 @@ pub struct ListingTable { definition: Option, /// Cache for collected file statistics collected_statistics: Option>, + /// Cache scoped to this [`ListingTable`]. + /// + /// Anonymous tables do not have a stable table id in the shared cache key + /// and may read the same path with different explicit schemas. Use this + /// cache for those tables rather than populating the shared session cache. + local_statistics_cache: Arc, /// Constraints applied to this table constraints: Constraints, /// Column default expressions for columns that are not physically present in the data files @@ -231,6 +238,7 @@ impl ListingTable { options, definition: None, collected_statistics: None, + local_statistics_cache: Arc::new(DefaultFileStatisticsCache::default()), constraints: Constraints::default(), column_defaults: HashMap::new(), expr_adapter_factory: config.expr_adapter_factory, @@ -260,10 +268,26 @@ impl ListingTable { /// multiple times in the same session. /// pub fn with_cache(mut self, cache: Option>) -> Self { + if let Some(cache) = &cache { + self.local_statistics_cache + .update_cache_limit(cache.cache_limit()); + } self.collected_statistics = cache; self } + fn statistics_cache( + &self, + has_table_reference: bool, + ) -> Option<&Arc> { + let shared_cache = self.collected_statistics.as_ref()?; + if has_table_reference { + Some(shared_cache) + } else { + Some(&self.local_statistics_cache) + } + } + /// Specify the SQL definition for this table, if any pub fn with_definition(mut self, definition: Option) -> Self { self.definition = definition; @@ -807,7 +831,7 @@ impl ListingTable { let meta = &part_file.object_meta; // Check cache first - if we have valid cached statistics and ordering - if let Some(cache) = &self.collected_statistics + if let Some(cache) = self.statistics_cache(path.table.is_some()) && let Some(cached) = cache.get(&path) && cached.is_valid_for(meta) { @@ -825,7 +849,7 @@ impl ListingTable { let statistics = Arc::new(file_meta.statistics); // Store in cache - if let Some(cache) = &self.collected_statistics { + if let Some(cache) = self.statistics_cache(path.table.is_some()) { cache.put( &path, CachedFileMetadata::new( diff --git a/datafusion/core/tests/parquet/file_statistics.rs b/datafusion/core/tests/parquet/file_statistics.rs index 3e3b90a348b04..8735ae161f5f1 100644 --- a/datafusion/core/tests/parquet/file_statistics.rs +++ b/datafusion/core/tests/parquet/file_statistics.rs @@ -18,6 +18,7 @@ use std::fs; use std::sync::Arc; +use arrow::datatypes::{DataType, Field, Schema}; use datafusion::datasource::TableProvider; use datafusion::datasource::file_format::parquet::ParquetFormat; use datafusion::datasource::listing::{ @@ -26,9 +27,9 @@ use datafusion::datasource::listing::{ use datafusion::datasource::source::DataSourceExec; use datafusion::execution::context::SessionState; use datafusion::execution::session_state::SessionStateBuilder; -use datafusion::prelude::SessionContext; -use datafusion_common::DFSchema; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; use datafusion_common::stats::Precision; +use datafusion_common::{DFSchema, TableReference}; use datafusion_execution::cache::DefaultListFilesCache; use datafusion_execution::cache::cache_manager::{ CacheManagerConfig, FileStatisticsCache, @@ -105,7 +106,9 @@ async fn check_stats_precision_with_filter_pushdown() { async fn load_table_stats_with_session_level_cache() { let testdata = datafusion::test_util::parquet_test_data(); let filename = format!("{}/{}", testdata, "alltypes_plain.parquet"); - let table_path = ListingTableUrl::parse(filename).unwrap(); + let table_path = ListingTableUrl::parse(filename) + .unwrap() + .with_table_ref(TableReference::bare("alltypes_plain")); let (cache1, _, state1) = get_cache_runtime_state(); @@ -166,6 +169,58 @@ async fn load_table_stats_with_session_level_cache() { assert_eq!(get_static_cache_size(&state1), 1); } +#[tokio::test] +async fn anonymous_parquet_stats_cache_with_explicit_wider_schema() { + let temp_dir = tempdir().unwrap(); + let parquet_path = temp_dir.path().join("data.parquet"); + let parquet_path = parquet_path.to_string_lossy().to_string(); + + let ctx = SessionContext::new_with_config( + SessionConfig::new().with_collect_statistics(true), + ); + + ctx.sql(&format!( + "COPY ( + SELECT 1::BIGINT AS id, 1000::BIGINT AS population + ) TO '{parquet_path}' STORED AS PARQUET" + )) + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.read_parquet(&parquet_path, ParquetReadOptions::default()) + .await + .unwrap() + .collect() + .await + .unwrap(); + + let wider_schema = Schema::new(vec![ + Field::new("id", DataType::Int64, true), + Field::new("population", DataType::Int64, true), + Field::new("extra", DataType::Int64, true), + ]); + + let plan = ctx + .read_parquet( + &parquet_path, + ParquetReadOptions::default().schema(&wider_schema), + ) + .await + .unwrap() + .select_columns(&["id", "extra"]) + .unwrap() + .create_physical_plan() + .await + .unwrap(); + + let stats = plan.partition_statistics(None).unwrap(); + assert_eq!(stats.column_statistics.len(), 2); + assert_eq!(stats.column_statistics[1].null_count, Precision::Exact(1)); +} + #[tokio::test] async fn list_files_with_session_level_cache() { let p_name = "alltypes_plain.parquet";