Skip to content

Commit d150af7

Browse files
BlakeOrthalamb
andauthored
Adds memory-bound DefaultListFilesCache (#18855)
## Which issue does this PR close? Initial cache implementation for - #18827 This does not fully close the issue, since this implementation currently lacks user configuration and does not enable the cache by default ## Rationale for this change This work lays the groundwork and initial implementation for the default cache. It's a necessary initial step to allow collaboration around this issue ## What changes are included in this PR? - Implements a POC version of a default ListFilesCache - Refactors the existing ListFilesCache to mirror the MetadataCache by defining a new trait instead of a fixed type wrapping a trait - Bounds the size of the cache based on number of entries - Expires entries in the cache after a default timeout duration ## Are these changes tested? Yes. New unit tests have been introduced to test these changes. The cache is not yet enabled by default and cannot be enabled through configuration, so integration tests don't yet apply. ## Are there any user-facing changes? Yes, this work breaks the existing `ListFilesCache` public API. ## cc @alamb @alchemist51 --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
1 parent 388db0e commit d150af7

File tree

9 files changed

+905
-113
lines changed

9 files changed

+905
-113
lines changed

datafusion/catalog-listing/src/table.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,11 @@ impl TableProvider for ListingTable {
578578
let keep_partition_by_columns =
579579
state.config_options().execution.keep_partition_by_columns;
580580

581+
// Invalidate cache entries for this table if they exist
582+
if let Some(lfc) = state.runtime_env().cache_manager.get_list_files_cache() {
583+
let _ = lfc.remove(table_path.prefix());
584+
}
585+
581586
// Sink related option, apart from format
582587
let config = FileSinkConfig {
583588
original_url: String::default(),

datafusion/core/tests/datasource/object_store_access.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,59 @@ async fn create_multi_file_csv_file() {
9898
);
9999
}
100100

101+
#[tokio::test]
102+
async fn multi_query_multi_file_csv_file() {
103+
let test = Test::new().with_multi_file_csv().await;
104+
assert_snapshot!(
105+
test.query("select * from csv_table").await,
106+
@r"
107+
------- Query Output (6 rows) -------
108+
+---------+-------+-------+
109+
| c1 | c2 | c3 |
110+
+---------+-------+-------+
111+
| 0.0 | 0.0 | true |
112+
| 0.00003 | 5e-12 | false |
113+
| 0.00001 | 1e-12 | true |
114+
| 0.00003 | 5e-12 | false |
115+
| 0.00002 | 2e-12 | true |
116+
| 0.00003 | 5e-12 | false |
117+
+---------+-------+-------+
118+
------- Object Store Request Summary -------
119+
RequestCountingObjectStore()
120+
Total Requests: 4
121+
- LIST prefix=data
122+
- GET (opts) path=data/file_0.csv
123+
- GET (opts) path=data/file_1.csv
124+
- GET (opts) path=data/file_2.csv
125+
"
126+
);
127+
128+
// the second query should re-use the cached LIST results and should not reissue LIST
129+
assert_snapshot!(
130+
test.query("select * from csv_table").await,
131+
@r"
132+
------- Query Output (6 rows) -------
133+
+---------+-------+-------+
134+
| c1 | c2 | c3 |
135+
+---------+-------+-------+
136+
| 0.0 | 0.0 | true |
137+
| 0.00003 | 5e-12 | false |
138+
| 0.00001 | 1e-12 | true |
139+
| 0.00003 | 5e-12 | false |
140+
| 0.00002 | 2e-12 | true |
141+
| 0.00003 | 5e-12 | false |
142+
+---------+-------+-------+
143+
------- Object Store Request Summary -------
144+
RequestCountingObjectStore()
145+
Total Requests: 4
146+
- LIST prefix=data
147+
- GET (opts) path=data/file_0.csv
148+
- GET (opts) path=data/file_1.csv
149+
- GET (opts) path=data/file_2.csv
150+
"
151+
);
152+
}
153+
101154
#[tokio::test]
102155
async fn query_multi_csv_file() {
103156
let test = Test::new().with_multi_file_csv().await;

datafusion/core/tests/parquet/file_statistics.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ use datafusion::prelude::SessionContext;
3030
use datafusion_common::stats::Precision;
3131
use datafusion_common::DFSchema;
3232
use datafusion_execution::cache::cache_manager::CacheManagerConfig;
33-
use datafusion_execution::cache::cache_unit::{
34-
DefaultFileStatisticsCache, DefaultListFilesCache,
35-
};
33+
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
34+
use datafusion_execution::cache::DefaultListFilesCache;
3635
use datafusion_execution::config::SessionConfig;
3736
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
3837
use datafusion_expr::{col, lit, Expr};

datafusion/execution/src/cache/cache_manager.rs

Lines changed: 70 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ use std::any::Any;
2424
use std::collections::HashMap;
2525
use std::fmt::{Debug, Formatter};
2626
use std::sync::Arc;
27+
use std::time::Duration;
28+
29+
use super::list_files_cache::DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT;
2730

2831
/// A cache for [`Statistics`].
2932
///
@@ -41,9 +44,19 @@ pub type FileStatisticsCache =
4144
/// command on the local filesystem. This operation can be expensive,
4245
/// especially when done over remote object stores.
4346
///
44-
/// See [`crate::runtime_env::RuntimeEnv`] for more details
45-
pub type ListFilesCache =
46-
Arc<dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>>;
47+
/// See [`crate::runtime_env::RuntimeEnv`] for more details.
48+
pub trait ListFilesCache:
49+
CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta>
50+
{
51+
/// Returns the cache's memory limit in bytes.
52+
fn cache_limit(&self) -> usize;
53+
54+
/// Returns the TTL (time-to-live) for cache entries, if configured.
55+
fn cache_ttl(&self) -> Option<Duration>;
56+
57+
/// Updates the cache with a new memory limit in bytes.
58+
fn update_cache_limit(&self, limit: usize);
59+
}
4760

4861
/// Generic file-embedded metadata used with [`FileMetadataCache`].
4962
///
@@ -109,7 +122,7 @@ impl Debug for dyn CacheAccessor<Path, Arc<Statistics>, Extra = ObjectMeta> {
109122
}
110123
}
111124

112-
impl Debug for dyn CacheAccessor<Path, Arc<Vec<ObjectMeta>>, Extra = ObjectMeta> {
125+
impl Debug for dyn ListFilesCache {
113126
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
114127
write!(f, "Cache name: {} with length: {}", self.name(), self.len())
115128
}
@@ -131,7 +144,7 @@ impl Debug for dyn FileMetadataCache {
131144
#[derive(Debug)]
132145
pub struct CacheManager {
133146
file_statistic_cache: Option<FileStatisticsCache>,
134-
list_files_cache: Option<ListFilesCache>,
147+
list_files_cache: Option<Arc<dyn ListFilesCache>>,
135148
file_metadata_cache: Arc<dyn FileMetadataCache>,
136149
}
137150

@@ -166,10 +179,22 @@ impl CacheManager {
166179
}
167180

168181
/// Get the cache for storing the result of listing [`ObjectMeta`]s under the same path.
169-
pub fn get_list_files_cache(&self) -> Option<ListFilesCache> {
182+
pub fn get_list_files_cache(&self) -> Option<Arc<dyn ListFilesCache>> {
170183
self.list_files_cache.clone()
171184
}
172185

186+
/// Get the memory limit of the list files cache.
187+
pub fn get_list_files_cache_limit(&self) -> usize {
188+
self.list_files_cache
189+
.as_ref()
190+
.map_or(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, |c| c.cache_limit())
191+
}
192+
193+
/// Get the TTL (time-to-live) of the list files cache.
194+
pub fn get_list_files_cache_ttl(&self) -> Option<Duration> {
195+
self.list_files_cache.as_ref().and_then(|c| c.cache_ttl())
196+
}
197+
173198
/// Get the file embedded metadata cache.
174199
pub fn get_file_metadata_cache(&self) -> Arc<dyn FileMetadataCache> {
175200
Arc::clone(&self.file_metadata_cache)
@@ -185,17 +210,24 @@ pub const DEFAULT_METADATA_CACHE_LIMIT: usize = 50 * 1024 * 1024; // 50M
185210

186211
#[derive(Clone)]
187212
pub struct CacheManagerConfig {
188-
/// Enable cache of files statistics when listing files.
189-
/// Avoid get same file statistics repeatedly in same datafusion session.
190-
/// Default is disable. Fow now only supports Parquet files.
213+
/// Enable caching of file statistics when listing files.
214+
/// Enabling the cache avoids repeatedly reading file statistics in a DataFusion session.
215+
/// Default is disabled. Currently only Parquet files are supported.
191216
pub table_files_statistics_cache: Option<FileStatisticsCache>,
192-
/// Enable cache of file metadata when listing files.
193-
/// This setting avoids listing file meta of the same path repeatedly
194-
/// in same session, which may be expensive in certain situations (e.g. remote object storage).
217+
/// Enable caching of file metadata when listing files.
218+
/// Enabling the cache avoids repeat list and object metadata fetch operations, which may be
219+
/// expensive in certain situations (e.g. remote object storage), for objects under paths that
220+
/// are cached.
195221
/// Note that if this option is enabled, DataFusion will not see any updates to the underlying
196-
/// location.
197-
/// Default is disable.
198-
pub list_files_cache: Option<ListFilesCache>,
222+
/// storage for at least `list_files_cache_ttl` duration.
223+
/// Default is disabled.
224+
pub list_files_cache: Option<Arc<dyn ListFilesCache>>,
225+
/// Limit of the `list_files_cache`, in bytes. Default: 1MiB.
226+
pub list_files_cache_limit: usize,
227+
/// The duration the list files cache will consider an entry valid after insertion. Note that
228+
/// changes to the underlying storage system, such as adding or removing data, will not be
229+
/// visible until an entry expires. Default: None (infinite).
230+
pub list_files_cache_ttl: Option<Duration>,
199231
/// Cache of file-embedded metadata, used to avoid reading it multiple times when processing a
200232
/// data file (e.g., Parquet footer and page metadata).
201233
/// If not provided, the [`CacheManager`] will create a [`DefaultFilesMetadataCache`].
@@ -209,6 +241,8 @@ impl Default for CacheManagerConfig {
209241
Self {
210242
table_files_statistics_cache: Default::default(),
211243
list_files_cache: Default::default(),
244+
list_files_cache_limit: DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT,
245+
list_files_cache_ttl: None,
212246
file_metadata_cache: Default::default(),
213247
metadata_cache_limit: DEFAULT_METADATA_CACHE_LIMIT,
214248
}
@@ -228,13 +262,32 @@ impl CacheManagerConfig {
228262
}
229263

230264
/// Set the cache for listing files.
231-
///
265+
///
232266
/// Default is `None` (disabled).
233-
pub fn with_list_files_cache(mut self, cache: Option<ListFilesCache>) -> Self {
267+
pub fn with_list_files_cache(
268+
mut self,
269+
cache: Option<Arc<dyn ListFilesCache>>,
270+
) -> Self {
234271
self.list_files_cache = cache;
235272
self
236273
}
237274

275+
/// Sets the limit of the list files cache, in bytes.
276+
///
277+
/// Default: 1MiB (1,048,576 bytes).
278+
pub fn with_list_files_cache_limit(mut self, limit: usize) -> Self {
279+
self.list_files_cache_limit = limit;
280+
self
281+
}
282+
283+
/// Sets the TTL (time-to-live) for entries in the list files cache.
284+
///
285+
/// Default: None (infinite).
286+
pub fn with_list_files_cache_ttl(mut self, ttl: Duration) -> Self {
287+
self.list_files_cache_ttl = Some(ttl);
288+
self
289+
}
290+
238291
/// Sets the cache for file-embedded metadata.
239292
///
240293
/// Default is a [`DefaultFilesMetadataCache`].

datafusion/execution/src/cache/cache_unit.rs

Lines changed: 1 addition & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -107,71 +107,6 @@ impl CacheAccessor<Path, Arc<Statistics>> for DefaultFileStatisticsCache {
107107
}
108108
}
109109

110-
/// Default implementation of [`ListFilesCache`]
111-
///
112-
/// Collected files metadata for listing files.
113-
///
114-
/// Cache is not invalided until user calls [`Self::remove`] or [`Self::clear`].
115-
///
116-
/// [`ListFilesCache`]: crate::cache::cache_manager::ListFilesCache
117-
#[derive(Default)]
118-
pub struct DefaultListFilesCache {
119-
statistics: DashMap<Path, Arc<Vec<ObjectMeta>>>,
120-
}
121-
122-
impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
123-
type Extra = ObjectMeta;
124-
125-
fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
126-
self.statistics.get(k).map(|x| Arc::clone(x.value()))
127-
}
128-
129-
fn get_with_extra(
130-
&self,
131-
_k: &Path,
132-
_e: &Self::Extra,
133-
) -> Option<Arc<Vec<ObjectMeta>>> {
134-
panic!("Not supported DefaultListFilesCache get_with_extra")
135-
}
136-
137-
fn put(
138-
&self,
139-
key: &Path,
140-
value: Arc<Vec<ObjectMeta>>,
141-
) -> Option<Arc<Vec<ObjectMeta>>> {
142-
self.statistics.insert(key.clone(), value)
143-
}
144-
145-
fn put_with_extra(
146-
&self,
147-
_key: &Path,
148-
_value: Arc<Vec<ObjectMeta>>,
149-
_e: &Self::Extra,
150-
) -> Option<Arc<Vec<ObjectMeta>>> {
151-
panic!("Not supported DefaultListFilesCache put_with_extra")
152-
}
153-
154-
fn remove(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
155-
self.statistics.remove(k).map(|x| x.1)
156-
}
157-
158-
fn contains_key(&self, k: &Path) -> bool {
159-
self.statistics.contains_key(k)
160-
}
161-
162-
fn len(&self) -> usize {
163-
self.statistics.len()
164-
}
165-
166-
fn clear(&self) {
167-
self.statistics.clear()
168-
}
169-
170-
fn name(&self) -> String {
171-
"DefaultListFilesCache".to_string()
172-
}
173-
}
174-
175110
/// Handles the inner state of the [`DefaultFilesMetadataCache`] struct.
176111
struct DefaultFilesMetadataCacheState {
177112
lru_queue: LruQueue<Path, (ObjectMeta, Arc<dyn FileMetadata>)>,
@@ -434,7 +369,7 @@ mod tests {
434369
FileMetadata, FileMetadataCache, FileMetadataCacheEntry,
435370
};
436371
use crate::cache::cache_unit::{
437-
DefaultFileStatisticsCache, DefaultFilesMetadataCache, DefaultListFilesCache,
372+
DefaultFileStatisticsCache, DefaultFilesMetadataCache,
438373
};
439374
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
440375
use chrono::DateTime;
@@ -486,28 +421,6 @@ mod tests {
486421
assert!(cache.get_with_extra(&meta2.location, &meta2).is_none());
487422
}
488423

489-
#[test]
490-
fn test_list_file_cache() {
491-
let meta = ObjectMeta {
492-
location: Path::from("test"),
493-
last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
494-
.unwrap()
495-
.into(),
496-
size: 1024,
497-
e_tag: None,
498-
version: None,
499-
};
500-
501-
let cache = DefaultListFilesCache::default();
502-
assert!(cache.get(&meta.location).is_none());
503-
504-
cache.put(&meta.location, vec![meta.clone()].into());
505-
assert_eq!(
506-
cache.get(&meta.location).unwrap().first().unwrap().clone(),
507-
meta.clone()
508-
);
509-
}
510-
511424
pub struct TestFileMetadata {
512425
metadata: String,
513426
}

0 commit comments

Comments
 (0)