Skip to content

Commit 9377204

Browse files
graph, store: Add experimental job to automatically set account-like flag on eligible tables
Signed-off-by: Maksim Dimitrov <dimitrov.maksim@gmail.com>
1 parent 1f3050f commit 9377204

File tree

4 files changed

+155
-1
lines changed

4 files changed

+155
-1
lines changed

graph/src/env/store.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,18 @@ pub struct EnvVarsStore {
149149
/// The number of rows to fetch from the foreign data wrapper in one go,
150150
/// this will be set as the option 'fetch_size' on all foreign servers
151151
pub fdw_fetch_size: usize,
152+
153+
/// Experimental feature to automatically set the account-like flag on eligible tables
154+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS`
155+
/// If not set, the job is disabled.
156+
/// Utilizes materialized view stats that refresh every 6 hours to discover heavy-write tables.
157+
pub account_like_scan_interval_hours: Option<u32>,
158+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT`
159+
/// Tables must have at least this many total versions to be considered.
160+
pub account_like_min_versions_count: Option<u64>,
161+
/// Set by the environment variable `GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO`
162+
/// Defines the maximum share of unique entities (e.g. 0.01 for a 1:100 entity-to-version ratio).
163+
pub account_like_max_unique_ratio: Option<f64>,
152164
}
153165

154166
// This does not print any values avoid accidentally leaking any sensitive env vars
@@ -206,6 +218,9 @@ impl TryFrom<InnerStore> for EnvVarsStore {
206218
disable_block_cache_for_lookup: x.disable_block_cache_for_lookup,
207219
insert_extra_cols: x.insert_extra_cols,
208220
fdw_fetch_size: x.fdw_fetch_size,
221+
account_like_scan_interval_hours: x.account_like_scan_interval_hours,
222+
account_like_min_versions_count: x.account_like_min_versions_count,
223+
account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0),
209224
};
210225
if let Some(timeout) = vars.batch_timeout {
211226
if timeout < 2 * vars.batch_target_duration {
@@ -217,6 +232,16 @@ impl TryFrom<InnerStore> for EnvVarsStore {
217232
if vars.batch_workers < 1 {
218233
bail!("GRAPH_STORE_BATCH_WORKERS must be at least 1");
219234
}
235+
if vars.account_like_scan_interval_hours.is_some()
236+
&& (vars.account_like_min_versions_count.is_none()
237+
|| vars.account_like_max_unique_ratio.is_none())
238+
{
239+
bail!(
240+
"Both GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT and \
241+
GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO must be set when \
242+
GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS is set"
243+
);
244+
}
220245
Ok(vars)
221246
}
222247
}
@@ -295,6 +320,12 @@ pub struct InnerStore {
295320
insert_extra_cols: usize,
296321
#[envconfig(from = "GRAPH_STORE_FDW_FETCH_SIZE", default = "1000")]
297322
fdw_fetch_size: usize,
323+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_SCAN_INTERVAL_HOURS")]
324+
account_like_scan_interval_hours: Option<u32>,
325+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MIN_VERSIONS_COUNT")]
326+
account_like_min_versions_count: Option<u64>,
327+
#[envconfig(from = "GRAPH_STORE_ACCOUNT_LIKE_MAX_UNIQUE_RATIO")]
328+
account_like_max_unique_ratio: Option<ZeroToOneF64>,
298329
}
299330

300331
#[derive(Clone, Copy, Debug)]

store/postgres/src/deployment_store.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,50 @@ impl DeploymentStore {
811811
.await
812812
}
813813

814+
pub(crate) async fn identify_account_like_candidates(
815+
&self,
816+
min_versions: u64,
817+
ratio: f64,
818+
) -> Result<Vec<(String, String)>, StoreError> {
819+
#[derive(QueryableByName)]
820+
struct TableStat {
821+
#[diesel(sql_type = diesel::sql_types::Text)]
822+
subgraph: String,
823+
#[diesel(sql_type = diesel::sql_types::Text)]
824+
table_name: String,
825+
}
826+
let result = self
827+
.with_conn(move |conn, _| {
828+
let query = r#"
829+
SELECT
830+
stats.subgraph,
831+
stats.table_name
832+
FROM info.table_stats AS stats
833+
LEFT JOIN subgraphs.table_stats ts
834+
ON ts.deployment = stats.deployment
835+
AND ts.table_name = stats.table_name
836+
WHERE
837+
stats.versions > $1
838+
AND stats.ratio < $2
839+
AND ts.is_account_like IS NOT TRUE
840+
"#;
841+
842+
diesel::sql_query(query)
843+
.bind::<diesel::sql_types::BigInt, _>(min_versions as i64)
844+
.bind::<diesel::sql_types::Double, _>(ratio)
845+
.load::<TableStat>(conn)
846+
.map_err(Into::into)
847+
})
848+
.await;
849+
850+
result.map(|tables| {
851+
tables
852+
.into_iter()
853+
.map(|table_stat| (table_stat.subgraph, table_stat.table_name))
854+
.collect()
855+
})
856+
}
857+
814858
pub(crate) async fn set_account_like(
815859
&self,
816860
site: Arc<Site>,
@@ -1845,10 +1889,11 @@ impl DeploymentStore {
18451889
// We hardcode our materialized views, but could also use
18461890
// pg_matviews to list all of them, though that might inadvertently
18471891
// refresh materialized views that operators created themselves
1848-
const VIEWS: [&str; 3] = [
1892+
const VIEWS: [&str; 4] = [
18491893
"info.table_sizes",
18501894
"info.subgraph_sizes",
18511895
"info.chain_sizes",
1896+
"info.table_stats",
18521897
];
18531898
store
18541899
.with_conn(|conn, cancel| {

store/postgres/src/jobs.rs

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ pub fn register(
4747
Arc::new(RefreshMaterializedView::new(store.subgraph_store())),
4848
6 * ONE_HOUR,
4949
);
50+
51+
if let Some(interval) = ENV_VARS.store.account_like_scan_interval_hours {
52+
runner.register(
53+
Arc::new(AccountLikeJob::new(store.subgraph_store())),
54+
interval * ONE_HOUR,
55+
);
56+
}
5057
}
5158

5259
/// A job that vacuums `subgraphs.deployment` and `subgraphs.head`. With a
@@ -234,3 +241,37 @@ impl Job for UnusedJob {
234241
}
235242
}
236243
}
244+
245+
struct AccountLikeJob {
246+
store: Arc<SubgraphStore>,
247+
}
248+
249+
impl AccountLikeJob {
250+
fn new(store: Arc<SubgraphStore>) -> AccountLikeJob {
251+
AccountLikeJob { store }
252+
}
253+
}
254+
255+
#[async_trait]
256+
impl Job for AccountLikeJob {
257+
fn name(&self) -> &str {
258+
"Set account-like flag on eligible tables"
259+
}
260+
261+
async fn run(&self, logger: &Logger) {
262+
// Safe to unwrap due to a startup validation
263+
// which ensures these values are present when account_like_scan_interval_hours is set.
264+
let min_versions_count = ENV_VARS.store.account_like_min_versions_count.unwrap();
265+
let ratio = ENV_VARS.store.account_like_max_unique_ratio.unwrap();
266+
267+
self.store
268+
.identify_and_set_account_like(logger, min_versions_count, ratio)
269+
.await
270+
.unwrap_or_else(|e| {
271+
error!(
272+
logger,
273+
"Failed to set account-like flag on eligible tables: {}", e
274+
)
275+
});
276+
}
277+
}

store/postgres/src/subgraph_store.rs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,6 +1220,43 @@ impl SubgraphStoreInner {
12201220
store.drop_index(site, index_name).await
12211221
}
12221222

1223+
pub(crate) async fn identify_and_set_account_like(
1224+
&self,
1225+
logger: &Logger,
1226+
min_records: u64,
1227+
ratio: f64,
1228+
) -> Result<(), StoreError> {
1229+
for (_shard, store) in &self.stores {
1230+
let candidates = store
1231+
.identify_account_like_candidates(min_records, ratio)
1232+
.await?;
1233+
1234+
graph::slog::debug!(
1235+
logger,
1236+
"Found {} account-like candidates in shard {}",
1237+
candidates.len(),
1238+
_shard
1239+
);
1240+
1241+
for (subgraph, table_name) in candidates {
1242+
graph::slog::debug!(
1243+
logger,
1244+
"Setting table {} as account-like for deployment {}",
1245+
table_name,
1246+
subgraph
1247+
);
1248+
1249+
let hash = DeploymentHash::new(subgraph.clone()).map_err(|_| {
1250+
anyhow!("Failed to create deployment hash for subgraph: {subgraph}")
1251+
})?;
1252+
let (store, site) = self.store(&hash)?;
1253+
store.set_account_like(site, &table_name, true).await?;
1254+
}
1255+
}
1256+
1257+
Ok(())
1258+
}
1259+
12231260
pub async fn set_account_like(
12241261
&self,
12251262
deployment: &DeploymentLocator,

0 commit comments

Comments
 (0)