Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ fn eval(c: &mut Criterion) {
// A benchmark runner for the new query engine
let bench_query = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Subscribe);
let mut tx = raw.db.begin_tx(Workload::Subscribe);
let auth = AuthCtx::for_testing();
let schema_viewer = &SchemaViewer::new(&tx, &auth);
let (plans, table_id, table_name, _) = compile_subscription(sql, schema_viewer, &auth).unwrap();
let mut schema_viewer = SchemaViewer::new(&mut tx, &auth);
let (plans, table_id, table_name, _) = compile_subscription(sql, &mut schema_viewer, &auth).unwrap();
let plans = plans
.into_iter()
.map(|plan| plan.optimize(&auth).unwrap())
Expand All @@ -155,8 +155,8 @@ fn eval(c: &mut Criterion) {

let bench_eval = |c: &mut Criterion, name, sql| {
c.bench_function(name, |b| {
let tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &tx, sql).unwrap();
let mut tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, sql).unwrap();
let query: ExecutionSet = query.into();

b.iter(|| {
Expand Down Expand Up @@ -207,11 +207,11 @@ fn eval(c: &mut Criterion) {
// A passthru executed independently of the database.
let select_lhs = "select * from footprint";
let select_rhs = "select * from location";
let tx = &raw.db.begin_tx(Workload::Update);
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_lhs).unwrap();
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, select_rhs).unwrap();
let mut tx = raw.db.begin_tx(Workload::Update);
let query_lhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, select_lhs).unwrap();
let query_rhs = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, select_rhs).unwrap();
let query = ExecutionSet::from_iter(query_lhs.into_iter().chain(query_rhs));
let tx = &tx.into();
let tx = &(&mut tx).into();

b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))))
});
Expand All @@ -226,10 +226,10 @@ fn eval(c: &mut Criterion) {
from footprint join location on footprint.entity_id = location.entity_id \
where location.chunk_index = {chunk_index}"
);
let tx = &raw.db.begin_tx(Workload::Update);
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), tx, &join).unwrap();
let mut tx = raw.db.begin_tx(Workload::Update);
let query = compile_read_only_queryset(&raw.db, &AuthCtx::for_testing(), &mut tx, &join).unwrap();
let query: ExecutionSet = query.into();
let tx = &tx.into();
let tx = &(&mut tx).into();

b.iter(|| drop(black_box(query.eval_incr_for_test(&raw.db, tx, &update, None))));
});
Expand Down
7 changes: 6 additions & 1 deletion crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,10 @@ impl RelationalDB {
Ok(tx.create_view(module_def, view_def)?)
}

pub fn create_or_get_params(&self, tx: &mut MutTx, params: &ProductValue) -> Result<ArgId, DBError> {
Ok(tx.create_or_get_params(params)?)
}

pub fn drop_view(&self, tx: &mut MutTx, view_id: ViewId) -> Result<(), DBError> {
Ok(tx.drop_view(view_id)?)
}
Expand Down Expand Up @@ -2217,6 +2221,7 @@ pub mod tests_utils {
db: &RelationalDB,
name: &str,
schema: &[(&str, AlgebraicType)],
params: ProductType,
is_anonymous: bool,
) -> Result<(ViewId, TableId), DBError> {
let mut builder = RawModuleDefV9Builder::new();
Expand All @@ -2234,7 +2239,7 @@ pub mod tests_utils {
0,
true,
is_anonymous,
ProductType::unit(),
params,
AlgebraicType::array(AlgebraicType::Ref(type_ref)),
);

Expand Down
10 changes: 5 additions & 5 deletions crates/core/src/estimation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ mod tests {
}

fn num_rows_for(db: &RelationalDB, sql: &str) -> u64 {
let tx = begin_tx(db);
match &*compile_sql(db, &AuthCtx::for_testing(), &tx, sql).expect("Failed to compile sql") {
let mut tx = begin_tx(db);
match &*compile_sql(db, &AuthCtx::for_testing(), &mut tx, sql).expect("Failed to compile sql") {
[CrudExpr::Query(expr)] => num_rows(&tx, expr),
exprs => panic!("unexpected result from compilation: {exprs:#?}"),
}
Expand All @@ -191,10 +191,10 @@ mod tests {
/// Using the new query plan
fn new_row_estimate(db: &RelationalDB, sql: &str) -> u64 {
let auth = AuthCtx::for_testing();
let tx = begin_tx(db);
let tx = SchemaViewer::new(&tx, &auth);
let mut tx = begin_tx(db);
let mut tx = SchemaViewer::new(&mut tx, &auth);

compile_subscription(sql, &tx, &auth)
compile_subscription(sql, &mut tx, &auth)
.map(|(plans, ..)| plans)
.expect("failed to compile sql query")
.into_iter()
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,7 @@ impl ModuleHost {
let metrics = self
.on_module_thread("one_off_query", move || {
let (tx_offset_sender, tx_offset_receiver) = oneshot::channel();
let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
let mut tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| {
let (tx_offset, tx_metrics, reducer) = db.release_tx(tx);
let _ = tx_offset_sender.send(tx_offset);
db.report_read_tx_metrics(reducer, tx_metrics);
Expand All @@ -1878,7 +1878,7 @@ impl ModuleHost {
// We wrap the actual query in a closure so we can use ? to handle errors without making
// the entire transaction abort with an error.
let result: Result<(OneOffTable<F>, ExecutionMetrics), anyhow::Error> = (|| {
let tx = SchemaViewer::new(&*tx, &auth);
let mut tx = SchemaViewer::new(&mut *tx, &auth);

let (
// A query may compile down to several plans.
Expand All @@ -1888,7 +1888,7 @@ impl ModuleHost {
_,
table_name,
_,
) = compile_subscription(&query, &tx, &auth)?;
) = compile_subscription(&query, &mut tx, &auth)?;

// Optimize each fragment
let optimized = plans
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,10 +1126,10 @@ impl InstanceCommon {

// Views bypass RLS, since views should enforce their own access control procedurally.
let auth = AuthCtx::for_current(self.info.database_identity);
let schema_view = SchemaViewer::new(&*tx, &auth);
let mut schema_view = SchemaViewer::new(&mut *tx, &auth);

// Compile to subscription plans.
let (plans, has_params) = SubscriptionPlan::compile(the_query, &schema_view, &auth)?;
let (plans, has_params) = SubscriptionPlan::compile(the_query, &mut schema_view, &auth)?;
ensure!(
!has_params,
"parameterized SQL is not supported for view materialization yet"
Expand Down
29 changes: 20 additions & 9 deletions crates/core/src/sql/ast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ use anyhow::Context;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::locking_tx_datastore::state_view::StateView;
use spacetimedb_datastore::system_tables::{StRowLevelSecurityFields, ST_ROW_LEVEL_SECURITY_ID};
use spacetimedb_expr::check::SchemaView;
use spacetimedb_expr::check::{SchemaView, TypingResult};
use spacetimedb_expr::errors::TypingError;
use spacetimedb_expr::statement::compile_sql_stmt;
use spacetimedb_lib::identity::AuthCtx;
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue};
use spacetimedb_primitives::{ArgId, ColId, TableId};
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductValue};
use spacetimedb_schema::def::error::RelationError;
use spacetimedb_schema::relation::{ColExpr, FieldName};
use spacetimedb_schema::schema::{ColumnSchema, TableOrViewSchema, TableSchema};
Expand All @@ -22,7 +23,7 @@ use sqlparser::ast::{
};
use sqlparser::dialect::PostgreSqlDialect;
use sqlparser::parser::Parser;
use std::ops::Deref;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;

/// Simplify to detect features of the syntax we don't support yet
Expand Down Expand Up @@ -477,7 +478,7 @@ fn compile_where(table: &From, filter: Option<SqlExpr>) -> Result<Option<Selecti
}

pub struct SchemaViewer<'a, T> {
tx: &'a T,
tx: &'a mut T,
auth: &'a AuthCtx,
}

Expand All @@ -489,6 +490,12 @@ impl<T> Deref for SchemaViewer<'_, T> {
}
}

impl<T> DerefMut for SchemaViewer<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.tx
}
}

impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
fn table_id(&self, name: &str) -> Option<TableId> {
// Get the schema from the in-memory state instead of fetching from the database for speed
Expand Down Expand Up @@ -536,10 +543,15 @@ impl<T: StateView> SchemaView for SchemaViewer<'_, T> {
})
.collect::<anyhow::Result<_>>()
}

fn get_or_create_params(&mut self, _params: ProductValue) -> TypingResult<ArgId> {
// Caller should have used `SchemaViewerMut` on crate `core`
Err(TypingError::ParamsReadOnly)
}
}

impl<'a, T> SchemaViewer<'a, T> {
pub fn new(tx: &'a T, auth: &'a AuthCtx) -> Self {
pub fn new(tx: &'a mut T, auth: &'a AuthCtx) -> Self {
Self { tx, auth }
}
}
Expand Down Expand Up @@ -1000,13 +1012,12 @@ fn compile_statement<T: TableSchemaView + StateView>(
pub(crate) fn compile_to_ast<T: TableSchemaView + StateView>(
db: &RelationalDB,
auth: &AuthCtx,
tx: &T,
tx: &mut T,
sql_text: &str,
) -> Result<Vec<SqlAst>, DBError> {
// NOTE: The following ensures compliance with the 1.0 sql api.
// Come 1.0, it will have replaced the current compilation stack.
compile_sql_stmt(sql_text, &SchemaViewer::new(tx, auth), auth)?;

compile_sql_stmt(sql_text, &mut SchemaViewer::new(tx, auth), auth)?;
let dialect = PostgreSqlDialect {};
let ast = Parser::parse_sql(&dialect, sql_text).map_err(|error| DBError::SqlParser {
sql: sql_text.to_string(),
Expand Down
Loading
Loading