Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions datafusion-cli/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,3 +581,119 @@ impl TableFunctionImpl for MetadataCacheFunc {
Ok(Arc::new(metadata_cache))
}
}

/// STATISTICS_CACHE table function
#[derive(Debug)]
struct StatisticsCacheTable {
schema: SchemaRef,
batch: RecordBatch,
}

#[async_trait]
impl TableProvider for StatisticsCacheTable {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> arrow::datatypes::SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> datafusion::logical_expr::TableType {
datafusion::logical_expr::TableType::Base
}

async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(MemorySourceConfig::try_new_exec(
&[vec![self.batch.clone()]],
TableProvider::schema(self),
projection.cloned(),
)?)
}
}

#[derive(Debug)]
pub struct StatisticsCacheFunc {
cache_manager: Arc<CacheManager>,
}

impl StatisticsCacheFunc {
pub fn new(cache_manager: Arc<CacheManager>) -> Self {
Self { cache_manager }
}
}

impl TableFunctionImpl for StatisticsCacheFunc {
fn call(&self, exprs: &[Expr]) -> Result<Arc<dyn TableProvider>> {
if !exprs.is_empty() {
return plan_err!("statistics_cache should have no arguments");
}

let schema = Arc::new(Schema::new(vec![
Field::new("path", DataType::Utf8, false),
Field::new(
"file_modified",
DataType::Timestamp(TimeUnit::Millisecond, None),
false,
),
Field::new("file_size_bytes", DataType::UInt64, false),
Field::new("e_tag", DataType::Utf8, true),
Field::new("version", DataType::Utf8, true),
Field::new("num_rows", DataType::Utf8, false),
Field::new("num_columns", DataType::UInt64, false),
Field::new("table_size_bytes", DataType::Utf8, false),
Field::new("statistics_size_bytes", DataType::UInt64, false),
]));

// construct record batch from metadata
let mut path_arr = vec![];
let mut file_modified_arr = vec![];
let mut file_size_bytes_arr = vec![];
let mut e_tag_arr = vec![];
let mut version_arr = vec![];
let mut num_rows_arr = vec![];
let mut num_columns_arr = vec![];
let mut table_size_bytes_arr = vec![];
let mut statistics_size_bytes_arr = vec![];

if let Some(file_statistics_cache) = self.cache_manager.get_file_statistic_cache()
{
for (path, entry) in file_statistics_cache.list_entries() {
path_arr.push(path.to_string());
file_modified_arr
.push(Some(entry.object_meta.last_modified.timestamp_millis()));
file_size_bytes_arr.push(entry.object_meta.size);
e_tag_arr.push(entry.object_meta.e_tag);
version_arr.push(entry.object_meta.version);
num_rows_arr.push(entry.num_rows.to_string());
num_columns_arr.push(entry.num_columns as u64);
table_size_bytes_arr.push(entry.table_size_bytes.to_string());
statistics_size_bytes_arr.push(entry.statistics_size_bytes as u64);
}
}

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(StringArray::from(path_arr)),
Arc::new(TimestampMillisecondArray::from(file_modified_arr)),
Arc::new(UInt64Array::from(file_size_bytes_arr)),
Arc::new(StringArray::from(e_tag_arr)),
Arc::new(StringArray::from(version_arr)),
Arc::new(StringArray::from(num_rows_arr)),
Arc::new(UInt64Array::from(num_columns_arr)),
Arc::new(StringArray::from(table_size_bytes_arr)),
Arc::new(UInt64Array::from(statistics_size_bytes_arr)),
],
)?;

let statistics_cache = StatisticsCacheTable { schema, batch };
Ok(Arc::new(statistics_cache))
}
}
106 changes: 104 additions & 2 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::logical_expr::ExplainFormat;
use datafusion::prelude::SessionContext;
use datafusion_cli::catalog::DynamicObjectStoreCatalog;
use datafusion_cli::functions::{MetadataCacheFunc, ParquetMetadataFunc};
use datafusion_cli::functions::{
MetadataCacheFunc, ParquetMetadataFunc, StatisticsCacheFunc,
};
use datafusion_cli::object_storage::instrumented::{
InstrumentedObjectStoreMode, InstrumentedObjectStoreRegistry,
};
Expand Down Expand Up @@ -244,6 +246,14 @@ async fn main_inner() -> Result<()> {
)),
);

// register `statistics_cache` table function to get the contents of the file statistics cache
ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

let mut print_options = PrintOptions {
format: args.format,
quiet: args.quiet,
Expand Down Expand Up @@ -423,7 +433,13 @@ pub fn extract_disk_limit(size: &str) -> Result<usize, String> {
#[cfg(test)]
mod tests {
use super::*;
use datafusion::{common::test_util::batches_to_string, prelude::ParquetReadOptions};
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
cache_manager::CacheManagerConfig, cache_unit::DefaultFileStatisticsCache,
},
prelude::ParquetReadOptions,
};
use insta::assert_snapshot;

fn assert_conversion(input: &str, expected: Result<usize, String>) {
Expand Down Expand Up @@ -631,4 +647,90 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_statistics_cache() -> Result<(), DataFusionError> {
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

ctx.sql(
"
create external table alltypes_plain
stored as parquet
location '../parquet-testing/data/alltypes_plain.parquet'",
)
.await?
.collect()
.await?;

ctx.sql(
"
create external table alltypes_tiny_pages
stored as parquet
location '../parquet-testing/data/alltypes_tiny_pages.parquet'",
)
.await?
.collect()
.await?;

ctx.sql(
"
create external table lz4_raw_compressed_larger
stored as parquet
location '../parquet-testing/data/lz4_raw_compressed_larger.parquet'",
)
.await?
.collect()
.await?;

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This confirms that the file statistics cache is not populated when the table is created, only after accessing it once.

Copy link
Contributor

@alchemist51 alchemist51 Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to provide a pre-warming option to it? Similar to metadataCache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, it's being implemented here: #18971.


// access each table once to collect statistics
ctx.sql("select count(*) from alltypes_plain")
.await?
.collect()
.await?;
ctx.sql("select count(*) from alltypes_tiny_pages")
.await?
.collect()
.await?;
ctx.sql("select count(*) from lz4_raw_compressed_larger")
.await?
.collect()
.await?;

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
+-----------------------------------+-----------------+--------------+-------------+------------------+
| filename | file_size_bytes | num_rows | num_columns | table_size_bytes |
+-----------------------------------+-----------------+--------------+-------------+------------------+
| alltypes_plain.parquet | 1851 | Exact(8) | 11 | Exact(671) |
| alltypes_tiny_pages.parquet | 454233 | Exact(7300) | 13 | Exact(323579) |
| lz4_raw_compressed_larger.parquet | 380836 | Exact(10000) | 1 | Exact(400103) |
+-----------------------------------+-----------------+--------------+-------------+------------------+
");

Ok(())
}
}
4 changes: 2 additions & 2 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: FileStatisticsCache,
collected_statistics: Arc<dyn FileStatisticsCache>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -255,7 +255,7 @@ impl ListingTable {
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<FileStatisticsCache>) -> Self {
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self
Expand Down
34 changes: 27 additions & 7 deletions datafusion/execution/src/cache/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

use crate::cache::cache_unit::DefaultFilesMetadataCache;
use crate::cache::CacheAccessor;
use datafusion_common::stats::Precision;
use datafusion_common::{Result, Statistics};
use object_store::path::Path;
use object_store::ObjectMeta;
Expand All @@ -32,8 +33,27 @@ use std::sync::Arc;
/// session lifetime.
///
/// See [`crate::runtime_env::RuntimeEnv`] for more details
pub type FileStatisticsCache =
Arc<dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>>;
pub trait FileStatisticsCache:
CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta>
{
/// Retrieves the information about the entries currently cached.
fn list_entries(&self) -> HashMap<Path, FileStatisticsCacheEntry>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make this thread safe? When we bring in size limit, we might want to evict the cache object as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand, this should be thread safe as is right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I got confused with an inner object created as HashMap, this makes sense here.

}

#[derive(Debug, Clone, PartialEq, Eq)]
/// Represents information about a cached statistics entry.
/// This is used to expose the statistics cache contents to outside modules.
pub struct FileStatisticsCacheEntry {
pub object_meta: ObjectMeta,
/// Number of table rows.
pub num_rows: Precision<usize>,
/// Number of table columns.
pub num_columns: usize,
/// Total table size, in bytes.
pub table_size_bytes: Precision<usize>,
/// Size of the statistics entry, in bytes.
pub statistics_size_bytes: usize,
}

/// Cache for storing the [`ObjectMeta`]s that result from listing a path
///
Expand Down Expand Up @@ -103,7 +123,7 @@ pub struct FileMetadataCacheEntry {
pub extra: HashMap<String, String>,
}

impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
impl Debug for dyn FileStatisticsCache {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
}
Expand All @@ -130,7 +150,7 @@ impl Debug for dyn FileMetadataCache {
/// See [`CacheManagerConfig`] for configuration options.
#[derive(Debug)]
pub struct CacheManager {
file_statistic_cache: Option<FileStatisticsCache>,
file_statistic_cache: Option<Arc<dyn FileStatisticsCache>>,
list_files_cache: Option<ListFilesCache>,
file_metadata_cache: Arc<dyn FileMetadataCache>,
}
Expand Down Expand Up @@ -161,7 +181,7 @@ impl CacheManager {
}

/// Get the cache of listing files statistics.
pub fn get_file_statistic_cache(&self) -> Option<FileStatisticsCache> {
pub fn get_file_statistic_cache(&self) -> Option<Arc<dyn FileStatisticsCache>> {
self.file_statistic_cache.clone()
}

Expand All @@ -188,7 +208,7 @@ pub struct CacheManagerConfig {
/// Enable cache of files statistics when listing files.
/// Avoid get same file statistics repeatedly in same datafusion session.
/// Default is disable. Fow now only supports Parquet files.
pub table_files_statistics_cache: Option<FileStatisticsCache>,
pub table_files_statistics_cache: Option<Arc<dyn FileStatisticsCache>>,
/// Enable cache of file metadata when listing files.
/// This setting avoids listing file meta of the same path repeatedly
/// in same session, which may be expensive in certain situations (e.g. remote object storage).
Expand Down Expand Up @@ -221,7 +241,7 @@ impl CacheManagerConfig {
/// Default is `None` (disabled).
pub fn with_files_statistics_cache(
mut self,
cache: Option<FileStatisticsCache>,
cache: Option<Arc<dyn FileStatisticsCache>>,
) -> Self {
self.table_files_statistics_cache = cache;
self
Expand Down
Loading
Loading