spin_factor_outbound_http/
lib.rs1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12 uri::{Authority, Parts, PathAndQuery, Scheme},
13 HeaderValue, Uri,
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_outbound_networking::{
18 config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
19 ComponentTlsClientConfigs, OutboundNetworkingFactor,
20};
21use spin_factors::{
22 anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
23 SelfInstanceBuilder,
24};
25use tokio::sync::Semaphore;
26use wasmtime_wasi_http::WasiHttpCtx;
27
28pub use wasmtime_wasi_http::{
29 bindings::http::types::ErrorCode,
30 body::HyperOutgoingBody,
31 types::{HostFutureIncomingResponse, OutgoingRequestConfig},
32 HttpResult,
33};
34
35pub use wasi::{p2_to_p3_error_code, p3_to_p2_error_code, MutexBody};
36
37#[derive(Default)]
38pub struct OutboundHttpFactor {
39 _priv: (),
40}
41
42impl Factor for OutboundHttpFactor {
43 type RuntimeConfig = RuntimeConfig;
44 type AppState = AppState;
45 type InstanceBuilder = InstanceState;
46
47 fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
48 ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
49 wasi::add_to_linker(ctx)?;
50 Ok(())
51 }
52
53 fn configure_app<T: RuntimeFactors>(
54 &self,
55 mut ctx: ConfigureAppContext<T, Self>,
56 ) -> anyhow::Result<Self::AppState> {
57 let config = ctx.take_runtime_config().unwrap_or_default();
58 Ok(AppState {
59 wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
60 connection_pooling_enabled: config.connection_pooling_enabled,
61 concurrent_outbound_connections_semaphore: config
62 .max_concurrent_connections
63 .map(|n| Arc::new(Semaphore::new(n + 1))),
66 })
67 }
68
69 fn prepare<T: RuntimeFactors>(
70 &self,
71 mut ctx: PrepareContext<T, Self>,
72 ) -> anyhow::Result<Self::InstanceBuilder> {
73 let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
74 let allowed_hosts = outbound_networking.allowed_hosts();
75 let blocked_networks = outbound_networking.blocked_networks();
76 let component_tls_configs = outbound_networking.component_tls_configs();
77 Ok(InstanceState {
78 wasi_http_ctx: WasiHttpCtx::new(),
79 allowed_hosts,
80 blocked_networks,
81 component_tls_configs,
82 self_request_origin: None,
83 request_interceptor: None,
84 spin_http_client: None,
85 wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
86 connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
87 concurrent_outbound_connections_semaphore: ctx
88 .app_state()
89 .concurrent_outbound_connections_semaphore
90 .clone(),
91 })
92 }
93}
94
95pub struct InstanceState {
96 wasi_http_ctx: WasiHttpCtx,
97 allowed_hosts: OutboundAllowedHosts,
98 blocked_networks: BlockedNetworks,
99 component_tls_configs: ComponentTlsClientConfigs,
100 self_request_origin: Option<SelfRequestOrigin>,
101 request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
102 spin_http_client: Option<reqwest::Client>,
108 wasi_http_clients: wasi::HttpClients,
113 connection_pooling_enabled: bool,
115 concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
117}
118
119impl InstanceState {
120 pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
125 self.self_request_origin = Some(origin);
126 }
127
128 pub fn set_request_interceptor(
132 &mut self,
133 interceptor: impl OutboundHttpInterceptor + 'static,
134 ) -> anyhow::Result<()> {
135 if self.request_interceptor.is_some() {
136 anyhow::bail!("set_request_interceptor can only be called once");
137 }
138 self.request_interceptor = Some(Arc::new(interceptor));
139 Ok(())
140 }
141}
142
143impl SelfInstanceBuilder for InstanceState {}
144
145mod concurrent_outbound_connections {
149 use super::*;
150
151 pub async fn acquire_semaphore<'a>(
153 interface: &str,
154 semaphore: &'a Option<Arc<Semaphore>>,
155 ) -> Option<tokio::sync::SemaphorePermit<'a>> {
156 let s = semaphore.as_ref()?;
157 acquire(interface, || s.try_acquire(), async || s.acquire().await).await
158 }
159
160 pub async fn acquire_owned_semaphore(
162 interface: &str,
163 semaphore: &Option<Arc<Semaphore>>,
164 ) -> Option<tokio::sync::OwnedSemaphorePermit> {
165 let s = semaphore.as_ref()?;
166 acquire(
167 interface,
168 || s.clone().try_acquire_owned(),
169 async || s.clone().acquire_owned().await,
170 )
171 .await
172 }
173
174 async fn acquire<T>(
178 interface: &str,
179 try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
180 acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
181 ) -> Option<T> {
182 let mut waited = false;
185 let permit = match try_acquire() {
186 Ok(p) => Ok(p),
187 Err(tokio::sync::TryAcquireError::NoPermits) => {
189 waited = true;
190 acquire().await.map_err(|_| ())
191 }
192 Err(_) => Err(()),
193 };
194 if permit.is_ok() {
195 spin_telemetry::monotonic_counter!(
196 outbound_http.concurrent_connection_permits_acquired = 1,
197 interface = interface,
198 waited = waited
199 );
200 }
201 permit.ok()
202 }
203}
204
205pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
206pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
207
208#[derive(Clone, Debug)]
210pub struct SelfRequestOrigin {
211 pub scheme: Scheme,
212 pub authority: Authority,
213}
214
215impl SelfRequestOrigin {
216 pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
217 Ok(SelfRequestOrigin {
218 scheme,
219 authority: auth
220 .parse()
221 .with_context(|| format!("address '{auth}' is not a valid authority"))?,
222 })
223 }
224
225 pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
226 Ok(Self {
227 scheme: uri.scheme().context("URI missing scheme")?.clone(),
228 authority: uri.authority().context("URI missing authority")?.clone(),
229 })
230 }
231
232 fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
233 let mut parts = Parts::default();
234 parts.scheme = Some(self.scheme);
235 parts.authority = Some(self.authority);
236 parts.path_and_query = path_and_query;
237 Uri::from_parts(parts).unwrap()
238 }
239
240 fn use_tls(&self) -> bool {
241 self.scheme == Scheme::HTTPS
242 }
243
244 fn host_header(&self) -> HeaderValue {
245 HeaderValue::from_str(self.authority.as_str()).unwrap()
246 }
247}
248
249impl std::fmt::Display for SelfRequestOrigin {
250 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251 write!(f, "{}://{}", self.scheme, self.authority)
252 }
253}
254
255pub struct AppState {
256 wasi_http_clients: wasi::HttpClients,
258 connection_pooling_enabled: bool,
260 concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
262}
263
264fn remove_blocked_addrs(
268 blocked_networks: &BlockedNetworks,
269 addrs: &mut Vec<SocketAddr>,
270) -> Result<(), ErrorCode> {
271 if addrs.is_empty() {
272 return Ok(());
273 }
274 let blocked_addrs = blocked_networks.remove_blocked(addrs);
275 if addrs.is_empty() && !blocked_addrs.is_empty() {
276 tracing::error!(
277 "error.type" = "destination_ip_prohibited",
278 ?blocked_addrs,
279 "all destination IP(s) prohibited by runtime config"
280 );
281 return Err(ErrorCode::DestinationIpProhibited);
282 }
283 Ok(())
284}