Skip to main content

spin_factor_outbound_http/
lib.rs

1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12    uri::{Authority, Parts, PathAndQuery, Scheme},
13    HeaderValue, Uri,
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_otel::OtelFactorState;
18use spin_factor_outbound_networking::{
19    config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
20    ComponentTlsClientConfigs, OutboundNetworkingFactor,
21};
22use spin_factors::{
23    anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
24    SelfInstanceBuilder,
25};
26use tokio::sync::Semaphore;
27use wasmtime_wasi_http::WasiHttpCtx;
28
29pub use wasmtime_wasi_http::p2::{
30    bindings::http::types::ErrorCode,
31    body::HyperOutgoingBody,
32    types::{HostFutureIncomingResponse, OutgoingRequestConfig},
33    HttpResult,
34};
35
36pub use wasi::{p2_to_p3_error_code, p3_to_p2_error_code, MutexBody};
37
38#[derive(Default)]
39pub struct OutboundHttpFactor {
40    _priv: (),
41}
42
43impl Factor for OutboundHttpFactor {
44    type RuntimeConfig = RuntimeConfig;
45    type AppState = AppState;
46    type InstanceBuilder = InstanceState;
47
48    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
49        ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
50        wasi::add_to_linker(ctx)?;
51        Ok(())
52    }
53
54    fn configure_app<T: RuntimeFactors>(
55        &self,
56        mut ctx: ConfigureAppContext<T, Self>,
57    ) -> anyhow::Result<Self::AppState> {
58        let config = ctx.take_runtime_config().unwrap_or_default();
59        Ok(AppState {
60            wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
61            connection_pooling_enabled: config.connection_pooling_enabled,
62            concurrent_outbound_connections_semaphore: config
63                .max_concurrent_connections
64                // Permit count is the max concurrent connections + 1.
65                // i.e., 0 concurrent connections means 1 total connection.
66                .map(|n| Arc::new(Semaphore::new(n + 1))),
67        })
68    }
69
70    fn prepare<T: RuntimeFactors>(
71        &self,
72        mut ctx: PrepareContext<T, Self>,
73    ) -> anyhow::Result<Self::InstanceBuilder> {
74        let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
75        let allowed_hosts = outbound_networking.allowed_hosts();
76        let blocked_networks = outbound_networking.blocked_networks();
77        let component_tls_configs = outbound_networking.component_tls_configs();
78        let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
79        Ok(InstanceState {
80            wasi_http_ctx: WasiHttpCtx::new(),
81            hooks: InstanceHttpHooks {
82                allowed_hosts,
83                blocked_networks,
84                component_tls_configs,
85                self_request_origin: None,
86                request_interceptor: None,
87                spin_http_client: None,
88                wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
89                connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
90                concurrent_outbound_connections_semaphore: ctx
91                    .app_state()
92                    .concurrent_outbound_connections_semaphore
93                    .clone(),
94                otel,
95            },
96        })
97    }
98}
99
100pub struct InstanceState {
101    wasi_http_ctx: WasiHttpCtx,
102    hooks: InstanceHttpHooks,
103}
104
105struct InstanceHttpHooks {
106    allowed_hosts: OutboundAllowedHosts,
107    blocked_networks: BlockedNetworks,
108    component_tls_configs: ComponentTlsClientConfigs,
109    self_request_origin: Option<SelfRequestOrigin>,
110    request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
111    // Connection-pooling client for 'fermyon:spin/http' interface
112    //
113    // TODO: We could move this to `AppState` like the
114    // `wasi:http/outgoing-handler` pool for consistency, although it's probably
115    // not a high priority given that `fermyon:spin/http` is deprecated anyway.
116    spin_http_client: Option<reqwest::Client>,
117    // Connection pooling clients for `wasi:http/outgoing-handler` interface
118    //
119    // This is a clone of `AppState::wasi_http_clients`, meaning it is shared
120    // among all instances of the app.
121    wasi_http_clients: wasi::HttpClients,
122    /// Whether connection pooling is enabled for this instance.
123    connection_pooling_enabled: bool,
124    /// A semaphore to limit the number of concurrent outbound connections.
125    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
126    /// Manages access to the OtelFactor state.
127    otel: OtelFactorState,
128}
129
130impl InstanceState {
131    /// Sets the [`SelfRequestOrigin`] for this instance.
132    ///
133    /// This is used to handle outbound requests to relative URLs. If unset,
134    /// those requests will fail.
135    pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
136        self.hooks.self_request_origin = Some(origin);
137    }
138
139    /// Sets a [`OutboundHttpInterceptor`] for this instance.
140    ///
141    /// Returns an error if it has already been called for this instance.
142    pub fn set_request_interceptor(
143        &mut self,
144        interceptor: impl OutboundHttpInterceptor + 'static,
145    ) -> anyhow::Result<()> {
146        if self.hooks.request_interceptor.is_some() {
147            anyhow::bail!("set_request_interceptor can only be called once");
148        }
149        self.hooks.request_interceptor = Some(Arc::new(interceptor));
150        Ok(())
151    }
152}
153
154impl SelfInstanceBuilder for InstanceState {}
155
156/// Helper module for acquiring permits from the outbound connections semaphore.
157///
158/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
159mod concurrent_outbound_connections {
160    use super::*;
161
162    /// Acquires a semaphore permit for the given interface, if a semaphore is configured.
163    pub async fn acquire_semaphore<'a>(
164        interface: &str,
165        semaphore: &'a Option<Arc<Semaphore>>,
166    ) -> Option<tokio::sync::SemaphorePermit<'a>> {
167        let s = semaphore.as_ref()?;
168        acquire(interface, || s.try_acquire(), async || s.acquire().await).await
169    }
170
171    /// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
172    pub async fn acquire_owned_semaphore(
173        interface: &str,
174        semaphore: &Option<Arc<Semaphore>>,
175    ) -> Option<tokio::sync::OwnedSemaphorePermit> {
176        let s = semaphore.as_ref()?;
177        acquire(
178            interface,
179            || s.clone().try_acquire_owned(),
180            async || s.clone().acquire_owned().await,
181        )
182        .await
183    }
184
185    /// Helper function to acquire a semaphore permit, either immediately or by waiting.
186    ///
187    /// Allows getting either a borrowed or owned permit.
188    async fn acquire<T>(
189        interface: &str,
190        try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
191        acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
192    ) -> Option<T> {
193        // Try to acquire a permit without waiting first
194        // Keep track of whether we had to wait for metrics purposes.
195        let mut waited = false;
196        let permit = match try_acquire() {
197            Ok(p) => Ok(p),
198            // No available permits right now; wait for one
199            Err(tokio::sync::TryAcquireError::NoPermits) => {
200                waited = true;
201                acquire().await.map_err(|_| ())
202            }
203            Err(_) => Err(()),
204        };
205        if permit.is_ok() {
206            spin_telemetry::monotonic_counter!(
207                outbound_http.concurrent_connection_permits_acquired = 1,
208                interface = interface,
209                waited = waited
210            );
211        }
212        permit.ok()
213    }
214}
215
216pub type Request = http::Request<wasmtime_wasi_http::p2::body::HyperOutgoingBody>;
217pub type Response = http::Response<wasmtime_wasi_http::p2::body::HyperIncomingBody>;
218
219/// SelfRequestOrigin indicates the base URI to use for "self" requests.
220#[derive(Clone, Debug)]
221pub struct SelfRequestOrigin {
222    pub scheme: Scheme,
223    pub authority: Authority,
224}
225
226impl SelfRequestOrigin {
227    pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
228        Ok(SelfRequestOrigin {
229            scheme,
230            authority: auth
231                .parse()
232                .with_context(|| format!("address '{auth}' is not a valid authority"))?,
233        })
234    }
235
236    pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
237        Ok(Self {
238            scheme: uri.scheme().context("URI missing scheme")?.clone(),
239            authority: uri.authority().context("URI missing authority")?.clone(),
240        })
241    }
242
243    fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
244        let mut parts = Parts::default();
245        parts.scheme = Some(self.scheme);
246        parts.authority = Some(self.authority);
247        parts.path_and_query = path_and_query;
248        Uri::from_parts(parts).unwrap()
249    }
250
251    fn use_tls(&self) -> bool {
252        self.scheme == Scheme::HTTPS
253    }
254
255    fn host_header(&self) -> HeaderValue {
256        HeaderValue::from_str(self.authority.as_str()).unwrap()
257    }
258}
259
260impl std::fmt::Display for SelfRequestOrigin {
261    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
262        write!(f, "{}://{}", self.scheme, self.authority)
263    }
264}
265
266pub struct AppState {
267    // Connection pooling clients for `wasi:http/outgoing-handler` interface
268    wasi_http_clients: wasi::HttpClients,
269    /// Whether connection pooling is enabled for this app.
270    connection_pooling_enabled: bool,
271    /// A semaphore to limit the number of concurrent outbound connections.
272    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
273}
274
275/// Removes IPs in the given [`BlockedNetworks`].
276///
277/// Returns [`ErrorCode::DestinationIpProhibited`] if all IPs are removed.
278fn remove_blocked_addrs(
279    blocked_networks: &BlockedNetworks,
280    addrs: &mut Vec<SocketAddr>,
281) -> Result<(), ErrorCode> {
282    if addrs.is_empty() {
283        return Ok(());
284    }
285    let blocked_addrs = blocked_networks.remove_blocked(addrs);
286    if addrs.is_empty() && !blocked_addrs.is_empty() {
287        tracing::error!(
288            "error.type" = "destination_ip_prohibited",
289            ?blocked_addrs,
290            "all destination IP(s) prohibited by runtime config"
291        );
292        return Err(ErrorCode::DestinationIpProhibited);
293    }
294    Ok(())
295}