1use std::io::{Read, Write};
2use std::sync::{Arc, Mutex};
3
4use async_trait::async_trait;
5use spin_factors::anyhow;
6use wasmtime_wasi::p2::{
7 InputStream, OutputStream, Pollable, StdinStream, StdoutStream, StreamError,
8};
9
10pub struct PipedWriteStream<T>(Arc<Mutex<T>>);
22
23impl<T> PipedWriteStream<T> {
24 pub fn new(inner: T) -> Self {
25 Self(Arc::new(Mutex::new(inner)))
26 }
27}
28
29impl<T> Clone for PipedWriteStream<T> {
30 fn clone(&self) -> Self {
31 Self(self.0.clone())
32 }
33}
34
35impl<T: Write + Send + Sync + 'static> OutputStream for PipedWriteStream<T> {
36 fn write(&mut self, bytes: bytes::Bytes) -> Result<(), StreamError> {
37 self.0
38 .lock()
39 .unwrap()
40 .write_all(&bytes)
41 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
42 }
43
44 fn flush(&mut self) -> Result<(), StreamError> {
45 self.0
46 .lock()
47 .unwrap()
48 .flush()
49 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
50 }
51
52 fn check_write(&mut self) -> Result<usize, StreamError> {
53 Ok(1024 * 1024)
54 }
55}
56
57impl<T: Write + Send + Sync + 'static> StdoutStream for PipedWriteStream<T> {
58 fn stream(&self) -> Box<dyn OutputStream> {
59 Box::new(self.clone())
60 }
61
62 fn isatty(&self) -> bool {
63 false
64 }
65}
66
67#[async_trait]
68impl<T: Write + Send + Sync + 'static> Pollable for PipedWriteStream<T> {
69 async fn ready(&mut self) {}
70}
71
72pub struct PipeReadStream<T> {
76 buffer: Vec<u8>,
77 inner: Arc<Mutex<T>>,
78}
79
80impl<T> PipeReadStream<T> {
81 pub fn new(inner: T) -> Self {
82 Self {
83 buffer: vec![0_u8; 64 * 1024],
84 inner: Arc::new(Mutex::new(inner)),
85 }
86 }
87}
88
89impl<T> Clone for PipeReadStream<T> {
90 fn clone(&self) -> Self {
91 Self {
92 buffer: vec![0_u8; 64 * 1024],
93 inner: self.inner.clone(),
94 }
95 }
96}
97
98impl<T: Read + Send + Sync + 'static> InputStream for PipeReadStream<T> {
99 fn read(&mut self, size: usize) -> wasmtime_wasi::p2::StreamResult<bytes::Bytes> {
100 let size = size.min(self.buffer.len());
101
102 let count = self
103 .inner
104 .lock()
105 .unwrap()
106 .read(&mut self.buffer[..size])
107 .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;
108 if count == 0 {
109 return Err(wasmtime_wasi::p2::StreamError::Closed);
110 }
111
112 Ok(bytes::Bytes::copy_from_slice(&self.buffer[..count]))
113 }
114}
115
116#[async_trait]
117impl<T: Read + Send + Sync + 'static> Pollable for PipeReadStream<T> {
118 async fn ready(&mut self) {}
119}
120
121impl<T: Read + Send + Sync + 'static> StdinStream for PipeReadStream<T> {
122 fn stream(&self) -> Box<dyn InputStream> {
123 Box::new(self.clone())
124 }
125
126 fn isatty(&self) -> bool {
127 false
128 }
129}