Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/wasi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiImpl, WasiView};
pub use self::error::{I32Exit, TrappableError};
pub use self::filesystem::{DirPerms, FileInputStream, FilePerms, FsError, FsResult, ReaddirIterator};
pub use self::network::{Network, SocketAddrUse, SocketError, SocketResult};
pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe};
pub use self::poll::{dynamic_subscribe, subscribe, ClosureFuture, DynamicSubscribe, OverrideSelf, MakeFuture, Pollable, PollableFuture, Subscribe};
pub use self::random::{thread_rng, Deterministic};
pub use self::stdio::{
stderr, stdin, stdout, AsyncStdinStream, AsyncStdoutStream, IsATTY, OutputFile, Stderr, Stdin,
Expand Down
107 changes: 95 additions & 12 deletions crates/wasi/src/poll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use wasmtime::component::{Resource, ResourceTable};
use std::time::Instant;
use wasmtime::component::{Resource, ResourceTable};

pub type PollableFuture<'a> = Pin<Box<dyn Future<Output=()> + Send + 'a>>;
pub type PollableFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>;
pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + 'static>;

pub type OverrideSelf = fn(&dyn Any) -> Option<u32>;

/// A host representation of the `wasi:io/poll.pollable` resource.
///
/// A pollable is not the same thing as a Rust Future: the same pollable may be used to
Expand All @@ -20,6 +22,7 @@ pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + 'stati
/// Pollable contains a way to create a Future in each call to `poll`.
pub struct Pollable {
index: u32,
override_self: Option<OverrideSelf>,
make_future: MakeFuture,
remove_index_on_delete: Option<fn(&mut ResourceTable, u32) -> Result<()>>,
pub supports_suspend: Option<Instant>,
Expand Down Expand Up @@ -52,7 +55,7 @@ pub struct Pollable {
/// let end = Instant::now() + dur;
/// let sleep = MySleep { end };
/// let sleep_resource = cx.table().push(sleep)?;
/// subscribe(cx.table(), sleep_resource)
/// subscribe(cx.table(), sleep_resource, None)
/// }
///
/// struct MySleep {
Expand Down Expand Up @@ -95,18 +98,71 @@ pub fn subscribe<T>(
resource: Resource<T>,
supports_suspend: Option<Instant>,
) -> Result<Resource<Pollable>>
where
T: Subscribe,
{
fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a>
where
T: Subscribe,
{
stream.downcast_mut::<T>().unwrap().ready()
}

let pollable = Pollable {
index: resource.rep(),
override_self: None,
remove_index_on_delete: if resource.owned() {
Some(|table, idx| {
let resource = Resource::<T>::new_own(idx);
table.delete(resource)?;
Ok(())
})
} else {
None
},
make_future: make_future::<T>,
supports_suspend,
};

Ok(table.push_child(pollable, &resource)?)
}

/// An advanced version of Subscribe supporting dynamically switching the underlying table entry
///
/// This can be used to implement "lazy initialized" pollables.
#[async_trait::async_trait]
pub trait DynamicSubscribe: Subscribe {
/// Returns the table index to another `Pollable` entry in case the override should happen
fn override_index(&self) -> Option<u32>;
}

/// Creates a `pollable` resource from a `DynamicSubscribe` implementation
pub fn dynamic_subscribe<T>(
table: &mut ResourceTable,
resource: Resource<T>,
supports_suspend: Option<Instant>,
) -> Result<Resource<Pollable>>
where
T: DynamicSubscribe,
{
fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a>
where
T: Subscribe,
where
T: DynamicSubscribe,
{
stream.downcast_mut::<T>().unwrap().ready()
}

fn override_self<'a, T>(entry: &'a dyn Any) -> Option<u32>
where
T: DynamicSubscribe,
{
let entry = entry.downcast_ref::<T>().unwrap();
entry.override_index()
}

let pollable = Pollable {
index: resource.rep(),
override_self: Some(override_self::<T>),
remove_index_on_delete: if resource.owned() {
Some(|table, idx| {
let resource = Resource::<T>::new_own(idx);
Expand Down Expand Up @@ -134,14 +190,16 @@ where
if pollables.is_empty() {
return Err(anyhow!("empty poll list"));
}

let mut table_futures: HashMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = HashMap::new();
let mut all_supports_suspend = Some(None);

for (ix, p) in pollables.iter().enumerate() {
let ix: u32 = ix.try_into()?;

let pollable = self.table().get(p)?;
let table = self.table();
let pollable = get_pollable_following_overrides(table, p)?;

let (_, list) = table_futures
.entry(pollable.index)
.or_insert((pollable.make_future, Vec::new()));
Expand All @@ -166,10 +224,9 @@ where
return Err((self.ctx().suspend_signal)(duration));
}
}

let mut futures: Vec<(PollableFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
for (entry, (make_future, readylist_indices)) in
self.table().iter_entries(table_futures)
{
for (entry, (make_future, readylist_indices)) in self.table().iter_entries(table_futures) {
let entry = entry?;
futures.push((make_future(entry), readylist_indices));
}
Expand Down Expand Up @@ -211,14 +268,16 @@ where
{
async fn block(&mut self, pollable: Resource<Pollable>) -> Result<()> {
let table = self.table();
let pollable = table.get(&pollable)?;
let pollable = get_pollable_following_overrides(table, &pollable)?;

let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
ready.await;
Ok(())
}
async fn ready(&mut self, pollable: Resource<Pollable>) -> Result<bool> {
let table = self.table();
let pollable = table.get(&pollable)?;
let pollable = get_pollable_following_overrides(table, &pollable)?;

let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
futures::pin_mut!(ready);
Ok(matches!(
Expand All @@ -235,6 +294,30 @@ where
}
}

fn get_pollable_following_overrides<'a>(
table: &'a ResourceTable,
pollable: &Resource<Pollable>,
) -> Result<&'a Pollable> {
let mut pollable = table.get(&pollable)?;
loop {
if let Some(override_self) = &pollable.override_self {
let entry = table.get_any(pollable.index)?;
let pollable_override = override_self(entry);
if let Some(overridden_idx) = pollable_override {
pollable = table
.get_any(overridden_idx)?
.downcast_ref()
.ok_or_else(|| anyhow!("Pollable override does not point to a Pollable"))?;
} else {
break;
}
} else {
break;
}
}
Ok(pollable)
}

pub mod sync {
use crate::{
bindings::io::poll as async_poll,
Expand Down
6 changes: 6 additions & 0 deletions crates/wasmtime/src/runtime/component/resource_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,12 @@ impl ResourceTable {
.ok_or(ResourceTableError::WrongType)
}

/// Returns the raw `Any` at the `key` index provided.
pub fn get_any(&self, key: u32) -> Result<&dyn Any, ResourceTableError> {
let r = self.occupied(key)?;
Ok(&*r.entry)
}

/// Returns the raw `Any` at the `key` index provided.
pub fn get_any_mut(&mut self, key: u32) -> Result<&mut dyn Any, ResourceTableError> {
let r = self.occupied_mut(key)?;
Expand Down