Skip to main content

spin_factor_outbound_http/
lib.rs

1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12    uri::{Authority, Parts, PathAndQuery, Scheme},
13    HeaderValue, Uri,
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_otel::OtelFactorState;
18use spin_factor_outbound_networking::{
19    config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
20    ComponentTlsClientConfigs, OutboundNetworkingFactor,
21};
22use spin_factors::{
23    anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
24    SelfInstanceBuilder,
25};
26use tokio::sync::Semaphore;
27use wasmtime_wasi_http::WasiHttpCtx;
28
29pub use wasmtime_wasi_http::{
30    bindings::http::types::ErrorCode,
31    body::HyperOutgoingBody,
32    types::{HostFutureIncomingResponse, OutgoingRequestConfig},
33    HttpResult,
34};
35
36pub use wasi::{p2_to_p3_error_code, p3_to_p2_error_code, MutexBody};
37
38#[derive(Default)]
39pub struct OutboundHttpFactor {
40    _priv: (),
41}
42
43impl Factor for OutboundHttpFactor {
44    type RuntimeConfig = RuntimeConfig;
45    type AppState = AppState;
46    type InstanceBuilder = InstanceState;
47
48    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
49        ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
50        wasi::add_to_linker(ctx)?;
51        Ok(())
52    }
53
54    fn configure_app<T: RuntimeFactors>(
55        &self,
56        mut ctx: ConfigureAppContext<T, Self>,
57    ) -> anyhow::Result<Self::AppState> {
58        let config = ctx.take_runtime_config().unwrap_or_default();
59        Ok(AppState {
60            wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
61            connection_pooling_enabled: config.connection_pooling_enabled,
62            concurrent_outbound_connections_semaphore: config
63                .max_concurrent_connections
64                // Permit count is the max concurrent connections + 1.
65                // i.e., 0 concurrent connections means 1 total connection.
66                .map(|n| Arc::new(Semaphore::new(n + 1))),
67        })
68    }
69
70    fn prepare<T: RuntimeFactors>(
71        &self,
72        mut ctx: PrepareContext<T, Self>,
73    ) -> anyhow::Result<Self::InstanceBuilder> {
74        let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
75        let allowed_hosts = outbound_networking.allowed_hosts();
76        let blocked_networks = outbound_networking.blocked_networks();
77        let component_tls_configs = outbound_networking.component_tls_configs();
78        let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
79        Ok(InstanceState {
80            wasi_http_ctx: WasiHttpCtx::new(),
81            allowed_hosts,
82            blocked_networks,
83            component_tls_configs,
84            self_request_origin: None,
85            request_interceptor: None,
86            spin_http_client: None,
87            wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
88            connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
89            concurrent_outbound_connections_semaphore: ctx
90                .app_state()
91                .concurrent_outbound_connections_semaphore
92                .clone(),
93            otel,
94        })
95    }
96}
97
98pub struct InstanceState {
99    wasi_http_ctx: WasiHttpCtx,
100    allowed_hosts: OutboundAllowedHosts,
101    blocked_networks: BlockedNetworks,
102    component_tls_configs: ComponentTlsClientConfigs,
103    self_request_origin: Option<SelfRequestOrigin>,
104    request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
105    // Connection-pooling client for 'fermyon:spin/http' interface
106    //
107    // TODO: We could move this to `AppState` like the
108    // `wasi:http/outgoing-handler` pool for consistency, although it's probably
109    // not a high priority given that `fermyon:spin/http` is deprecated anyway.
110    spin_http_client: Option<reqwest::Client>,
111    // Connection pooling clients for `wasi:http/outgoing-handler` interface
112    //
113    // This is a clone of `AppState::wasi_http_clients`, meaning it is shared
114    // among all instances of the app.
115    wasi_http_clients: wasi::HttpClients,
116    /// Whether connection pooling is enabled for this instance.
117    connection_pooling_enabled: bool,
118    /// A semaphore to limit the number of concurrent outbound connections.
119    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
120    /// Manages access to the OtelFactor state.
121    otel: OtelFactorState,
122}
123
124impl InstanceState {
125    /// Sets the [`SelfRequestOrigin`] for this instance.
126    ///
127    /// This is used to handle outbound requests to relative URLs. If unset,
128    /// those requests will fail.
129    pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
130        self.self_request_origin = Some(origin);
131    }
132
133    /// Sets a [`OutboundHttpInterceptor`] for this instance.
134    ///
135    /// Returns an error if it has already been called for this instance.
136    pub fn set_request_interceptor(
137        &mut self,
138        interceptor: impl OutboundHttpInterceptor + 'static,
139    ) -> anyhow::Result<()> {
140        if self.request_interceptor.is_some() {
141            anyhow::bail!("set_request_interceptor can only be called once");
142        }
143        self.request_interceptor = Some(Arc::new(interceptor));
144        Ok(())
145    }
146}
147
148impl SelfInstanceBuilder for InstanceState {}
149
150/// Helper module for acquiring permits from the outbound connections semaphore.
151///
152/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
153mod concurrent_outbound_connections {
154    use super::*;
155
156    /// Acquires a semaphore permit for the given interface, if a semaphore is configured.
157    pub async fn acquire_semaphore<'a>(
158        interface: &str,
159        semaphore: &'a Option<Arc<Semaphore>>,
160    ) -> Option<tokio::sync::SemaphorePermit<'a>> {
161        let s = semaphore.as_ref()?;
162        acquire(interface, || s.try_acquire(), async || s.acquire().await).await
163    }
164
165    /// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
166    pub async fn acquire_owned_semaphore(
167        interface: &str,
168        semaphore: &Option<Arc<Semaphore>>,
169    ) -> Option<tokio::sync::OwnedSemaphorePermit> {
170        let s = semaphore.as_ref()?;
171        acquire(
172            interface,
173            || s.clone().try_acquire_owned(),
174            async || s.clone().acquire_owned().await,
175        )
176        .await
177    }
178
179    /// Helper function to acquire a semaphore permit, either immediately or by waiting.
180    ///
181    /// Allows getting either a borrowed or owned permit.
182    async fn acquire<T>(
183        interface: &str,
184        try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
185        acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
186    ) -> Option<T> {
187        // Try to acquire a permit without waiting first
188        // Keep track of whether we had to wait for metrics purposes.
189        let mut waited = false;
190        let permit = match try_acquire() {
191            Ok(p) => Ok(p),
192            // No available permits right now; wait for one
193            Err(tokio::sync::TryAcquireError::NoPermits) => {
194                waited = true;
195                acquire().await.map_err(|_| ())
196            }
197            Err(_) => Err(()),
198        };
199        if permit.is_ok() {
200            spin_telemetry::monotonic_counter!(
201                outbound_http.concurrent_connection_permits_acquired = 1,
202                interface = interface,
203                waited = waited
204            );
205        }
206        permit.ok()
207    }
208}
209
210pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
211pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
212
213/// SelfRequestOrigin indicates the base URI to use for "self" requests.
214#[derive(Clone, Debug)]
215pub struct SelfRequestOrigin {
216    pub scheme: Scheme,
217    pub authority: Authority,
218}
219
220impl SelfRequestOrigin {
221    pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
222        Ok(SelfRequestOrigin {
223            scheme,
224            authority: auth
225                .parse()
226                .with_context(|| format!("address '{auth}' is not a valid authority"))?,
227        })
228    }
229
230    pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
231        Ok(Self {
232            scheme: uri.scheme().context("URI missing scheme")?.clone(),
233            authority: uri.authority().context("URI missing authority")?.clone(),
234        })
235    }
236
237    fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
238        let mut parts = Parts::default();
239        parts.scheme = Some(self.scheme);
240        parts.authority = Some(self.authority);
241        parts.path_and_query = path_and_query;
242        Uri::from_parts(parts).unwrap()
243    }
244
245    fn use_tls(&self) -> bool {
246        self.scheme == Scheme::HTTPS
247    }
248
249    fn host_header(&self) -> HeaderValue {
250        HeaderValue::from_str(self.authority.as_str()).unwrap()
251    }
252}
253
254impl std::fmt::Display for SelfRequestOrigin {
255    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
256        write!(f, "{}://{}", self.scheme, self.authority)
257    }
258}
259
260pub struct AppState {
261    // Connection pooling clients for `wasi:http/outgoing-handler` interface
262    wasi_http_clients: wasi::HttpClients,
263    /// Whether connection pooling is enabled for this app.
264    connection_pooling_enabled: bool,
265    /// A semaphore to limit the number of concurrent outbound connections.
266    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
267}
268
269/// Removes IPs in the given [`BlockedNetworks`].
270///
271/// Returns [`ErrorCode::DestinationIpProhibited`] if all IPs are removed.
272fn remove_blocked_addrs(
273    blocked_networks: &BlockedNetworks,
274    addrs: &mut Vec<SocketAddr>,
275) -> Result<(), ErrorCode> {
276    if addrs.is_empty() {
277        return Ok(());
278    }
279    let blocked_addrs = blocked_networks.remove_blocked(addrs);
280    if addrs.is_empty() && !blocked_addrs.is_empty() {
281        tracing::error!(
282            "error.type" = "destination_ip_prohibited",
283            ?blocked_addrs,
284            "all destination IP(s) prohibited by runtime config"
285        );
286        return Err(ErrorCode::DestinationIpProhibited);
287    }
288    Ok(())
289}