spin_factor_wasi/
io.rs

1use std::io::{Read, Write};
2use std::sync::{Arc, Mutex};
3
4use async_trait::async_trait;
5use spin_factors::anyhow;
6use wasmtime_wasi::p2::{
7    InputStream, OutputStream, Pollable, StdinStream, StdoutStream, StreamError,
8};
9
10/// A [`OutputStream`] that writes to a `Write` type.
11///
12/// `StdinStream::stream` and `StdoutStream::new` can be called more than once in components
13/// which are composed of multiple subcomponents, since each subcomponent will potentially want
14/// its own handle. This means the streams need to be shareable. The easiest way to do that is
15/// provide cloneable implementations of streams which operate synchronously.
16///
17/// Note that this amounts to doing synchronous I/O in an asynchronous context, which we'd normally
18/// prefer to avoid, but the properly asynchronous implementations Host{In|Out}putStream based on
19/// `AsyncRead`/`AsyncWrite`` are quite hairy and probably not worth it for "normal" stdio streams in
20/// Spin. If this does prove to be a performance bottleneck, though, we can certainly revisit it.
21pub struct PipedWriteStream<T>(Arc<Mutex<T>>);
22
23impl<T> PipedWriteStream<T> {
24    pub fn new(inner: T) -> Self {
25        Self(Arc::new(Mutex::new(inner)))
26    }
27}
28
29impl<T> Clone for PipedWriteStream<T> {
30    fn clone(&self) -> Self {
31        Self(self.0.clone())
32    }
33}
34
35impl<T: Write + Send + Sync + 'static> OutputStream for PipedWriteStream<T> {
36    fn write(&mut self, bytes: bytes::Bytes) -> Result<(), StreamError> {
37        self.0
38            .lock()
39            .unwrap()
40            .write_all(&bytes)
41            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
42    }
43
44    fn flush(&mut self) -> Result<(), StreamError> {
45        self.0
46            .lock()
47            .unwrap()
48            .flush()
49            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
50    }
51
52    fn check_write(&mut self) -> Result<usize, StreamError> {
53        Ok(1024 * 1024)
54    }
55}
56
57impl<T: Write + Send + Sync + 'static> StdoutStream for PipedWriteStream<T> {
58    fn stream(&self) -> Box<dyn OutputStream> {
59        Box::new(self.clone())
60    }
61
62    fn isatty(&self) -> bool {
63        false
64    }
65}
66
67#[async_trait]
68impl<T: Write + Send + Sync + 'static> Pollable for PipedWriteStream<T> {
69    async fn ready(&mut self) {}
70}
71
72/// A [`InputStream`] that reads to a `Read` type.
73///
74/// See [`PipedWriteStream`] for more information on why this is synchronous.
75pub struct PipeReadStream<T> {
76    buffer: Vec<u8>,
77    inner: Arc<Mutex<T>>,
78}
79
80impl<T> PipeReadStream<T> {
81    pub fn new(inner: T) -> Self {
82        Self {
83            buffer: vec![0_u8; 64 * 1024],
84            inner: Arc::new(Mutex::new(inner)),
85        }
86    }
87}
88
89impl<T> Clone for PipeReadStream<T> {
90    fn clone(&self) -> Self {
91        Self {
92            buffer: vec![0_u8; 64 * 1024],
93            inner: self.inner.clone(),
94        }
95    }
96}
97
98impl<T: Read + Send + Sync + 'static> InputStream for PipeReadStream<T> {
99    fn read(&mut self, size: usize) -> wasmtime_wasi::p2::StreamResult<bytes::Bytes> {
100        let size = size.min(self.buffer.len());
101
102        let count = self
103            .inner
104            .lock()
105            .unwrap()
106            .read(&mut self.buffer[..size])
107            .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
108        if count == 0 {
109            return Err(wasmtime_wasi::p2::StreamError::Closed);
110        }
111
112        Ok(bytes::Bytes::copy_from_slice(&self.buffer[..count]))
113    }
114}
115
116#[async_trait]
117impl<T: Read + Send + Sync + 'static> Pollable for PipeReadStream<T> {
118    async fn ready(&mut self) {}
119}
120
121impl<T: Read + Send + Sync + 'static> StdinStream for PipeReadStream<T> {
122    fn stream(&self) -> Box<dyn InputStream> {
123        Box::new(self.clone())
124    }
125
126    fn isatty(&self) -> bool {
127        false
128    }
129}