diff --git a/crates/wasi/src/lib.rs b/crates/wasi/src/lib.rs index e7dfbe1286bb..69d8b3e5ef7a 100644 --- a/crates/wasi/src/lib.rs +++ b/crates/wasi/src/lib.rs @@ -207,7 +207,7 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiImpl, WasiView}; pub use self::error::{I32Exit, TrappableError}; pub use self::filesystem::{DirPerms, FileInputStream, FilePerms, FsError, FsResult, ReaddirIterator}; pub use self::network::{Network, SocketAddrUse, SocketError, SocketResult}; -pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe}; +pub use self::poll::{dynamic_subscribe, subscribe, ClosureFuture, DynamicSubscribe, OverrideSelf, MakeFuture, Pollable, PollableFuture, Subscribe}; pub use self::random::{thread_rng, Deterministic}; pub use self::stdio::{ stderr, stdin, stdout, AsyncStdinStream, AsyncStdoutStream, IsATTY, OutputFile, Stderr, Stdin, diff --git a/crates/wasi/src/poll.rs b/crates/wasi/src/poll.rs index aa9b44bb6d96..e06731de6f5d 100644 --- a/crates/wasi/src/poll.rs +++ b/crates/wasi/src/poll.rs @@ -5,13 +5,15 @@ use std::collections::HashMap; use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use wasmtime::component::{Resource, ResourceTable}; use std::time::Instant; +use wasmtime::component::{Resource, ResourceTable}; -pub type PollableFuture<'a> = Pin + Send + 'a>>; +pub type PollableFuture<'a> = Pin + Send + 'a>>; pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>; pub type ClosureFuture = Box PollableFuture<'static> + Send + 'static>; +pub type OverrideSelf = fn(&dyn Any) -> Option; + /// A host representation of the `wasi:io/poll.pollable` resource. /// /// A pollable is not the same thing as a Rust Future: the same pollable may be used to @@ -20,6 +22,7 @@ pub type ClosureFuture = Box PollableFuture<'static> + Send + 'stati /// Pollable contains a way to create a Future in each call to `poll`. pub struct Pollable { index: u32, + override_self: Option, make_future: MakeFuture, remove_index_on_delete: Option Result<()>>, pub supports_suspend: Option, @@ -52,7 +55,7 @@ pub struct Pollable { /// let end = Instant::now() + dur; /// let sleep = MySleep { end }; /// let sleep_resource = cx.table().push(sleep)?; -/// subscribe(cx.table(), sleep_resource) +/// subscribe(cx.table(), sleep_resource, None) /// } /// /// struct MySleep { @@ -95,18 +98,71 @@ pub fn subscribe( resource: Resource, supports_suspend: Option, ) -> Result> +where + T: Subscribe, +{ + fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a> where T: Subscribe, + { + stream.downcast_mut::().unwrap().ready() + } + + let pollable = Pollable { + index: resource.rep(), + override_self: None, + remove_index_on_delete: if resource.owned() { + Some(|table, idx| { + let resource = Resource::::new_own(idx); + table.delete(resource)?; + Ok(()) + }) + } else { + None + }, + make_future: make_future::, + supports_suspend, + }; + + Ok(table.push_child(pollable, &resource)?) +} + +/// An advanced version of Subscribe supporting dynamically switching the underlying table entry +/// +/// This can be used to implement "lazy initialized" pollables. +#[async_trait::async_trait] +pub trait DynamicSubscribe: Subscribe { + /// Returns the table index to another `Pollable` entry in case the override should happen + fn override_index(&self) -> Option; +} + +/// Creates a `pollable` resource from a `DynamicSubscribe` implementation +pub fn dynamic_subscribe( + table: &mut ResourceTable, + resource: Resource, + supports_suspend: Option, +) -> Result> +where + T: DynamicSubscribe, { fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a> - where - T: Subscribe, + where + T: DynamicSubscribe, { stream.downcast_mut::().unwrap().ready() } + fn override_self<'a, T>(entry: &'a dyn Any) -> Option + where + T: DynamicSubscribe, + { + let entry = entry.downcast_ref::().unwrap(); + entry.override_index() + } + let pollable = Pollable { index: resource.rep(), + override_self: Some(override_self::), remove_index_on_delete: if resource.owned() { Some(|table, idx| { let resource = Resource::::new_own(idx); @@ -134,14 +190,16 @@ where if pollables.is_empty() { return Err(anyhow!("empty poll list")); } - + let mut table_futures: HashMap)> = HashMap::new(); let mut all_supports_suspend = Some(None); for (ix, p) in pollables.iter().enumerate() { let ix: u32 = ix.try_into()?; - let pollable = self.table().get(p)?; + let table = self.table(); + let pollable = get_pollable_following_overrides(table, p)?; + let (_, list) = table_futures .entry(pollable.index) .or_insert((pollable.make_future, Vec::new())); @@ -166,10 +224,9 @@ where return Err((self.ctx().suspend_signal)(duration)); } } + let mut futures: Vec<(PollableFuture<'_>, Vec)> = Vec::new(); - for (entry, (make_future, readylist_indices)) in - self.table().iter_entries(table_futures) - { + for (entry, (make_future, readylist_indices)) in self.table().iter_entries(table_futures) { let entry = entry?; futures.push((make_future(entry), readylist_indices)); } @@ -211,14 +268,16 @@ where { async fn block(&mut self, pollable: Resource) -> Result<()> { let table = self.table(); - let pollable = table.get(&pollable)?; + let pollable = get_pollable_following_overrides(table, &pollable)?; + let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); ready.await; Ok(()) } async fn ready(&mut self, pollable: Resource) -> Result { let table = self.table(); - let pollable = table.get(&pollable)?; + let pollable = get_pollable_following_overrides(table, &pollable)?; + let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?); futures::pin_mut!(ready); Ok(matches!( @@ -235,6 +294,30 @@ where } } +fn get_pollable_following_overrides<'a>( + table: &'a ResourceTable, + pollable: &Resource, +) -> Result<&'a Pollable> { + let mut pollable = table.get(&pollable)?; + loop { + if let Some(override_self) = &pollable.override_self { + let entry = table.get_any(pollable.index)?; + let pollable_override = override_self(entry); + if let Some(overridden_idx) = pollable_override { + pollable = table + .get_any(overridden_idx)? + .downcast_ref() + .ok_or_else(|| anyhow!("Pollable override does not point to a Pollable"))?; + } else { + break; + } + } else { + break; + } + } + Ok(pollable) +} + pub mod sync { use crate::{ bindings::io::poll as async_poll, diff --git a/crates/wasmtime/src/runtime/component/resource_table.rs b/crates/wasmtime/src/runtime/component/resource_table.rs index 2314a970413a..e977292fc4db 100644 --- a/crates/wasmtime/src/runtime/component/resource_table.rs +++ b/crates/wasmtime/src/runtime/component/resource_table.rs @@ -249,6 +249,12 @@ impl ResourceTable { .ok_or(ResourceTableError::WrongType) } + /// Returns the raw `Any` at the `key` index provided. + pub fn get_any(&self, key: u32) -> Result<&dyn Any, ResourceTableError> { + let r = self.occupied(key)?; + Ok(&*r.entry) + } + /// Returns the raw `Any` at the `key` index provided. pub fn get_any_mut(&mut self, key: u32) -> Result<&mut dyn Any, ResourceTableError> { let r = self.occupied_mut(key)?;