spin_factor_outbound_http/
lib.rs1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12 uri::{Authority, Parts, PathAndQuery, Scheme},
13 HeaderValue, Uri,
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_otel::OtelFactorState;
18use spin_factor_outbound_networking::{
19 config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
20 ComponentTlsClientConfigs, OutboundNetworkingFactor,
21};
22use spin_factors::{
23 anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
24 SelfInstanceBuilder,
25};
26use tokio::sync::Semaphore;
27use wasmtime_wasi_http::WasiHttpCtx;
28
29pub use wasmtime_wasi_http::{
30 bindings::http::types::ErrorCode,
31 body::HyperOutgoingBody,
32 types::{HostFutureIncomingResponse, OutgoingRequestConfig},
33 HttpResult,
34};
35
36pub use wasi::{p2_to_p3_error_code, p3_to_p2_error_code, MutexBody};
37
38#[derive(Default)]
39pub struct OutboundHttpFactor {
40 _priv: (),
41}
42
43impl Factor for OutboundHttpFactor {
44 type RuntimeConfig = RuntimeConfig;
45 type AppState = AppState;
46 type InstanceBuilder = InstanceState;
47
48 fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
49 ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
50 wasi::add_to_linker(ctx)?;
51 Ok(())
52 }
53
54 fn configure_app<T: RuntimeFactors>(
55 &self,
56 mut ctx: ConfigureAppContext<T, Self>,
57 ) -> anyhow::Result<Self::AppState> {
58 let config = ctx.take_runtime_config().unwrap_or_default();
59 Ok(AppState {
60 wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
61 connection_pooling_enabled: config.connection_pooling_enabled,
62 concurrent_outbound_connections_semaphore: config
63 .max_concurrent_connections
64 .map(|n| Arc::new(Semaphore::new(n + 1))),
67 })
68 }
69
70 fn prepare<T: RuntimeFactors>(
71 &self,
72 mut ctx: PrepareContext<T, Self>,
73 ) -> anyhow::Result<Self::InstanceBuilder> {
74 let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
75 let allowed_hosts = outbound_networking.allowed_hosts();
76 let blocked_networks = outbound_networking.blocked_networks();
77 let component_tls_configs = outbound_networking.component_tls_configs();
78 let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
79 Ok(InstanceState {
80 wasi_http_ctx: WasiHttpCtx::new(),
81 allowed_hosts,
82 blocked_networks,
83 component_tls_configs,
84 self_request_origin: None,
85 request_interceptor: None,
86 spin_http_client: None,
87 wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
88 connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
89 concurrent_outbound_connections_semaphore: ctx
90 .app_state()
91 .concurrent_outbound_connections_semaphore
92 .clone(),
93 otel,
94 })
95 }
96}
97
98pub struct InstanceState {
99 wasi_http_ctx: WasiHttpCtx,
100 allowed_hosts: OutboundAllowedHosts,
101 blocked_networks: BlockedNetworks,
102 component_tls_configs: ComponentTlsClientConfigs,
103 self_request_origin: Option<SelfRequestOrigin>,
104 request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
105 spin_http_client: Option<reqwest::Client>,
111 wasi_http_clients: wasi::HttpClients,
116 connection_pooling_enabled: bool,
118 concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
120 otel: OtelFactorState,
122}
123
124impl InstanceState {
125 pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
130 self.self_request_origin = Some(origin);
131 }
132
133 pub fn set_request_interceptor(
137 &mut self,
138 interceptor: impl OutboundHttpInterceptor + 'static,
139 ) -> anyhow::Result<()> {
140 if self.request_interceptor.is_some() {
141 anyhow::bail!("set_request_interceptor can only be called once");
142 }
143 self.request_interceptor = Some(Arc::new(interceptor));
144 Ok(())
145 }
146}
147
148impl SelfInstanceBuilder for InstanceState {}
149
150mod concurrent_outbound_connections {
154 use super::*;
155
156 pub async fn acquire_semaphore<'a>(
158 interface: &str,
159 semaphore: &'a Option<Arc<Semaphore>>,
160 ) -> Option<tokio::sync::SemaphorePermit<'a>> {
161 let s = semaphore.as_ref()?;
162 acquire(interface, || s.try_acquire(), async || s.acquire().await).await
163 }
164
165 pub async fn acquire_owned_semaphore(
167 interface: &str,
168 semaphore: &Option<Arc<Semaphore>>,
169 ) -> Option<tokio::sync::OwnedSemaphorePermit> {
170 let s = semaphore.as_ref()?;
171 acquire(
172 interface,
173 || s.clone().try_acquire_owned(),
174 async || s.clone().acquire_owned().await,
175 )
176 .await
177 }
178
179 async fn acquire<T>(
183 interface: &str,
184 try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
185 acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
186 ) -> Option<T> {
187 let mut waited = false;
190 let permit = match try_acquire() {
191 Ok(p) => Ok(p),
192 Err(tokio::sync::TryAcquireError::NoPermits) => {
194 waited = true;
195 acquire().await.map_err(|_| ())
196 }
197 Err(_) => Err(()),
198 };
199 if permit.is_ok() {
200 spin_telemetry::monotonic_counter!(
201 outbound_http.concurrent_connection_permits_acquired = 1,
202 interface = interface,
203 waited = waited
204 );
205 }
206 permit.ok()
207 }
208}
209
210pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
211pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
212
213#[derive(Clone, Debug)]
215pub struct SelfRequestOrigin {
216 pub scheme: Scheme,
217 pub authority: Authority,
218}
219
220impl SelfRequestOrigin {
221 pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
222 Ok(SelfRequestOrigin {
223 scheme,
224 authority: auth
225 .parse()
226 .with_context(|| format!("address '{auth}' is not a valid authority"))?,
227 })
228 }
229
230 pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
231 Ok(Self {
232 scheme: uri.scheme().context("URI missing scheme")?.clone(),
233 authority: uri.authority().context("URI missing authority")?.clone(),
234 })
235 }
236
237 fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
238 let mut parts = Parts::default();
239 parts.scheme = Some(self.scheme);
240 parts.authority = Some(self.authority);
241 parts.path_and_query = path_and_query;
242 Uri::from_parts(parts).unwrap()
243 }
244
245 fn use_tls(&self) -> bool {
246 self.scheme == Scheme::HTTPS
247 }
248
249 fn host_header(&self) -> HeaderValue {
250 HeaderValue::from_str(self.authority.as_str()).unwrap()
251 }
252}
253
254impl std::fmt::Display for SelfRequestOrigin {
255 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256 write!(f, "{}://{}", self.scheme, self.authority)
257 }
258}
259
260pub struct AppState {
261 wasi_http_clients: wasi::HttpClients,
263 connection_pooling_enabled: bool,
265 concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
267}
268
269fn remove_blocked_addrs(
273 blocked_networks: &BlockedNetworks,
274 addrs: &mut Vec<SocketAddr>,
275) -> Result<(), ErrorCode> {
276 if addrs.is_empty() {
277 return Ok(());
278 }
279 let blocked_addrs = blocked_networks.remove_blocked(addrs);
280 if addrs.is_empty() && !blocked_addrs.is_empty() {
281 tracing::error!(
282 "error.type" = "destination_ip_prohibited",
283 ?blocked_addrs,
284 "all destination IP(s) prohibited by runtime config"
285 );
286 return Err(ErrorCode::DestinationIpProhibited);
287 }
288 Ok(())
289}