Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/core/src/sql/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,12 @@ pub fn run_from_module<I: WasmInstance>(
return Err(anyhow!("Caller {} is not authorized to run SQL DML statements", auth.caller()).into());
}

stmt.for_each_return_field(|col_name, col_type| {
head.push((col_name.into(), col_type.clone()));
});

// Evaluate the mutation
let (mut tx, _) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(&auth, stmt, tx, &mut metrics))?;
let (mut tx, rows) = db.with_auto_rollback(tx, |tx| execute_dml_stmt(&auth, stmt, tx, &mut metrics))?;

// Update transaction metrics
tx.metrics.merge(metrics);
Expand Down Expand Up @@ -344,7 +348,7 @@ pub fn run_from_module<I: WasmInstance>(
Ok((
SqlResult {
tx_offset: res.tx_offset,
rows: vec![],
rows,
metrics,
},
trapped,
Expand Down
51 changes: 41 additions & 10 deletions crates/execution/src/dml.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use anyhow::Result;
use spacetimedb_lib::{metrics::ExecutionMetrics, AlgebraicValue, ProductValue};
use spacetimedb_physical_plan::dml::{DeletePlan, InsertPlan, MutationPlan, UpdatePlan};
use spacetimedb_physical_plan::{dml::{DeletePlan, InsertPlan, MutationPlan, UpdatePlan}, plan::{ProjectField, ProjectListPlan}};
use spacetimedb_primitives::{ColId, TableId};
use spacetimedb_sats::size_of::SizeOf;

use crate::{pipelined::PipelinedProject, Datastore, DeltaStore};
use crate::{pipelined::PipelinedProject, Datastore, DeltaStore, Row};

/// A mutable datastore can read as well as insert and delete rows
pub trait MutDatastore: Datastore + DeltaStore {
Expand All @@ -30,7 +30,7 @@ impl From<MutationPlan> for MutExecutor {
}

impl MutExecutor {
pub fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
pub fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<Vec<ProductValue>> {
match self {
Self::Insert(exec) => exec.execute(tx, metrics),
Self::Delete(exec) => exec.execute(tx, metrics),
Expand All @@ -39,52 +39,73 @@ impl MutExecutor {
}
}

fn project_returning_row(returning: &ProjectListPlan, row: ProductValue) -> Option<ProductValue> {
match returning {
ProjectListPlan::Name(_) => {
Some(row)
}
ProjectListPlan::List(_, fields) => {
let row = Row::Ref(&row);
Some(ProductValue::from_iter(fields.iter().map(|field| row.project(field))))
}
_ => None
}
}

/// Executes row insertions
pub struct InsertExecutor {
table_id: TableId,
rows: Vec<ProductValue>,
returning: Option<ProjectListPlan>,
}

impl From<InsertPlan> for InsertExecutor {
fn from(plan: InsertPlan) -> Self {
Self {
rows: plan.rows,
table_id: plan.table.table_id,
rows: plan.rows,
returning: plan.returning,
}
}
}

impl InsertExecutor {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<Vec<ProductValue>> {
let mut results = vec![];
for row in &self.rows {
if tx.insert_product_value(self.table_id, row)? {
metrics.rows_inserted += 1;
if let Some(returning) = &self.returning {
project_returning_row(returning, row.clone()).map(|res| results.push(res));
}
}
}
// TODO: It would be better to get this metric from the bsatn buffer.
// But we haven't been concerned with optimizing DML up to this point.
metrics.bytes_written += self.rows.iter().map(|row| row.size_of()).sum::<usize>();
Ok(())
Ok(results)
}
}

/// Executes row deletions
pub struct DeleteExecutor {
table_id: TableId,
filter: PipelinedProject,
returning: Option<ProjectListPlan>,
}

impl From<DeletePlan> for DeleteExecutor {
fn from(plan: DeletePlan) -> Self {
Self {
table_id: plan.table.table_id,
filter: plan.filter.into(),
returning: plan.returning,
}
}
}

impl DeleteExecutor {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<Vec<ProductValue>> {
// TODO: Delete by row id instead of product value
let mut deletes = vec![];
self.filter.execute(tx, metrics, &mut |row| {
Expand All @@ -95,12 +116,16 @@ impl DeleteExecutor {
// Note, that we don't update bytes written,
// because deletes don't actually write out any bytes.
metrics.bytes_scanned += deletes.iter().map(|row| row.size_of()).sum::<usize>();
let mut results = vec![];
for row in &deletes {
if tx.delete_product_value(self.table_id, row)? {
metrics.rows_deleted += 1;
if let Some(returning) = &self.returning {
project_returning_row(returning, row.clone()).map(|res| results.push(res));
}
}
}
Ok(())
Ok(results)
}
}

Expand All @@ -109,6 +134,7 @@ pub struct UpdateExecutor {
table_id: TableId,
columns: Vec<(ColId, AlgebraicValue)>,
filter: PipelinedProject,
returning: Option<ProjectListPlan>,
}

impl From<UpdatePlan> for UpdateExecutor {
Expand All @@ -117,12 +143,13 @@ impl From<UpdatePlan> for UpdateExecutor {
columns: plan.columns,
table_id: plan.table.table_id,
filter: plan.filter.into(),
returning: plan.returning,
}
}
}

impl UpdateExecutor {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<()> {
fn execute<Tx: MutDatastore>(&self, tx: &mut Tx, metrics: &mut ExecutionMetrics) -> Result<Vec<ProductValue>> {
let mut deletes = vec![];
self.filter.execute(tx, metrics, &mut |row| {
deletes.push(row.to_product_value());
Expand All @@ -134,6 +161,7 @@ impl UpdateExecutor {
// TODO: This metric should be updated inline when we serialize.
metrics.bytes_scanned = deletes.iter().map(|row| row.size_of()).sum::<usize>();
metrics.rows_updated += deletes.len() as u64;
let mut results = vec![];
for row in &deletes {
let row = ProductValue::from_iter(
row
Expand All @@ -151,7 +179,10 @@ impl UpdateExecutor {
);
tx.insert_product_value(self.table_id, &row)?;
metrics.bytes_written += row.size_of();
if let Some(returning) = &self.returning {
project_returning_row(returning, row).map(|res| results.push(res));
}
}
Ok(())
Ok(results)
}
}
53 changes: 51 additions & 2 deletions crates/expr/src/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,38 @@ impl DML {
pub fn table_name(&self) -> Box<str> {
self.table_schema().table_name.clone()
}

/// Iterate over the projected column names and types
pub fn for_each_return_field(&self, f: impl FnMut(&str, &AlgebraicType)) {
match self {
Self::Insert(TableInsert { returning, .. })
| Self::Update(TableUpdate { returning, .. })
| Self::Delete(TableDelete { returning, .. }) => {
if let Some(returning) = returning {
returning.for_each_return_field(f);
}
}
}
}
}

pub struct TableInsert {
pub table: Arc<TableOrViewSchema>,
pub rows: Box<[ProductValue]>,
pub returning: Option<ProjectList>,
}

pub struct TableDelete {
pub table: Arc<TableOrViewSchema>,
pub filter: Option<Expr>,
pub returning: Option<ProjectList>,
}

pub struct TableUpdate {
pub table: Arc<TableOrViewSchema>,
pub columns: Box<[(ColId, AlgebraicValue)]>,
pub filter: Option<Expr>,
pub returning: Option<ProjectList>,
}

pub struct SetVar {
Expand All @@ -89,6 +105,7 @@ pub fn type_insert(insert: SqlInsert, tx: &impl SchemaView) -> TypingResult<Tabl
table: SqlIdent(table_name),
fields,
values,
returning,
} = insert;

let schema = tx
Expand Down Expand Up @@ -148,16 +165,30 @@ pub fn type_insert(insert: SqlInsert, tx: &impl SchemaView) -> TypingResult<Tabl
}
rows.push(ProductValue::from(values));
}
let into = schema;
let into = schema.clone();
let rows = rows.into_boxed_slice();
Ok(TableInsert { table: into, rows })
let mut vars = Relvars::default();
vars.insert(table_name.clone(), into.clone());
let returning = returning
.map(|proj| type_proj(RelExpr::RelVar(Relvar {
schema: schema,
alias: ST_VAR_NAME.into(),
delta: None,
}), proj, &vars))
.transpose()?;
Ok(TableInsert {
table: into,
rows,
returning,
})
}

/// Type check a DELETE statement
pub fn type_delete(delete: SqlDelete, tx: &impl SchemaView) -> TypingResult<TableDelete> {
let SqlDelete {
table: SqlIdent(table_name),
filter,
returning,
} = delete;
let from = tx
.schema(&table_name)
Expand All @@ -172,9 +203,17 @@ pub fn type_delete(delete: SqlDelete, tx: &impl SchemaView) -> TypingResult<Tabl
let expr = filter
.map(|expr| type_expr(&vars, expr, Some(&AlgebraicType::Bool)))
.transpose()?;
let returning = returning
.map(|proj| type_proj(RelExpr::RelVar(Relvar {
schema: from.clone(),
alias: ST_VAR_NAME.into(),
delta: None,
}), proj, &vars))
.transpose()?;
Ok(TableDelete {
table: from,
filter: expr,
returning,
})
}

Expand All @@ -184,6 +223,7 @@ pub fn type_update(update: SqlUpdate, tx: &impl SchemaView) -> TypingResult<Tabl
table: SqlIdent(table_name),
assignments,
filter,
returning,
} = update;
let schema = tx
.schema(&table_name)
Expand Down Expand Up @@ -230,10 +270,18 @@ pub fn type_update(update: SqlUpdate, tx: &impl SchemaView) -> TypingResult<Tabl
let filter = filter
.map(|expr| type_expr(&vars, expr, Some(&AlgebraicType::Bool)))
.transpose()?;
let returning = returning
.map(|proj| type_proj(RelExpr::RelVar(Relvar {
schema: schema.clone(),
alias: ST_VAR_NAME.into(),
delta: None,
}), proj, &vars))
.transpose()?;
Ok(TableUpdate {
table: schema,
columns: values,
filter,
returning,
})
}

Expand Down Expand Up @@ -293,6 +341,7 @@ pub fn type_and_rewrite_set(set: SqlSet, tx: &impl SchemaView) -> TypingResult<T
Ok(TableInsert {
table,
rows: Box::new([ProductValue::from_iter([var_name, sum_value])]),
returning: None,
})
}
}
Expand Down
Loading