Skip to main content

spin_wasi_async/
stream.rs

1use spin_core::wasmtime;
2
3pub fn producer<T: Send + Sync + 'static>(rx: tokio::sync::mpsc::Receiver<T>) -> StreamProducer<T> {
4    StreamProducer { rx }
5}
6
7pub struct StreamProducer<T> {
8    rx: tokio::sync::mpsc::Receiver<T>,
9}
10
11impl<D, T: Send + Sync + 'static> wasmtime::component::StreamProducer<D> for StreamProducer<T> {
12    type Item = T;
13
14    type Buffer = Option<Self::Item>;
15
16    fn poll_produce<'a>(
17        self: std::pin::Pin<&mut Self>,
18        cx: &mut std::task::Context<'_>,
19        store: wasmtime::StoreContextMut<'a, D>,
20        mut destination: wasmtime::component::Destination<'a, Self::Item, Self::Buffer>,
21        finish: bool,
22    ) -> std::task::Poll<wasmtime::Result<wasmtime::component::StreamResult>> {
23        use std::task::Poll;
24        use wasmtime::component::StreamResult;
25
26        let remaining = destination.remaining(store);
27        if remaining.is_some_and(|r| r == 0) {
28            return Poll::Ready(Ok(StreamResult::Completed));
29        }
30
31        let recv = self.get_mut().rx.poll_recv(cx);
32        match recv {
33            Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
34            Poll::Pending => {
35                if finish {
36                    Poll::Ready(Ok(StreamResult::Cancelled))
37                } else {
38                    Poll::Pending
39                }
40            }
41            Poll::Ready(Some(row)) => {
42                destination.set_buffer(Some(row));
43                Poll::Ready(Ok(StreamResult::Completed))
44            }
45        }
46    }
47}