spin_wasi_async/
stream.rs1use spin_core::wasmtime;
2
3pub fn producer<T: Send + Sync + 'static>(rx: tokio::sync::mpsc::Receiver<T>) -> StreamProducer<T> {
4 StreamProducer { rx }
5}
6
7pub struct StreamProducer<T> {
8 rx: tokio::sync::mpsc::Receiver<T>,
9}
10
11impl<D, T: Send + Sync + 'static> wasmtime::component::StreamProducer<D> for StreamProducer<T> {
12 type Item = T;
13
14 type Buffer = Option<Self::Item>;
15
16 fn poll_produce<'a>(
17 self: std::pin::Pin<&mut Self>,
18 cx: &mut std::task::Context<'_>,
19 store: wasmtime::StoreContextMut<'a, D>,
20 mut destination: wasmtime::component::Destination<'a, Self::Item, Self::Buffer>,
21 finish: bool,
22 ) -> std::task::Poll<wasmtime::Result<wasmtime::component::StreamResult>> {
23 use std::task::Poll;
24 use wasmtime::component::StreamResult;
25
26 let remaining = destination.remaining(store);
27 if remaining.is_some_and(|r| r == 0) {
28 return Poll::Ready(Ok(StreamResult::Completed));
29 }
30
31 let recv = self.get_mut().rx.poll_recv(cx);
32 match recv {
33 Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
34 Poll::Pending => {
35 if finish {
36 Poll::Ready(Ok(StreamResult::Cancelled))
37 } else {
38 Poll::Pending
39 }
40 }
41 Poll::Ready(Some(row)) => {
42 destination.set_buffer(Some(row));
43 Poll::Ready(Ok(StreamResult::Completed))
44 }
45 }
46 }
47}