spin_factor_outbound_http/
lib.rs

1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12    uri::{Authority, Parts, PathAndQuery, Scheme},
13    HeaderValue, Uri,
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_outbound_networking::{
18    config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
19    ComponentTlsClientConfigs, OutboundNetworkingFactor,
20};
21use spin_factors::{
22    anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
23    SelfInstanceBuilder,
24};
25use tokio::sync::Semaphore;
26use wasmtime_wasi_http::WasiHttpCtx;
27
28pub use wasmtime_wasi_http::{
29    bindings::http::types::ErrorCode,
30    body::HyperOutgoingBody,
31    types::{HostFutureIncomingResponse, OutgoingRequestConfig},
32    HttpResult,
33};
34
35pub use wasi::{p2_to_p3_error_code, p3_to_p2_error_code, MutexBody};
36
37#[derive(Default)]
38pub struct OutboundHttpFactor {
39    _priv: (),
40}
41
42impl Factor for OutboundHttpFactor {
43    type RuntimeConfig = RuntimeConfig;
44    type AppState = AppState;
45    type InstanceBuilder = InstanceState;
46
47    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
48        ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
49        wasi::add_to_linker(ctx)?;
50        Ok(())
51    }
52
53    fn configure_app<T: RuntimeFactors>(
54        &self,
55        mut ctx: ConfigureAppContext<T, Self>,
56    ) -> anyhow::Result<Self::AppState> {
57        let config = ctx.take_runtime_config().unwrap_or_default();
58        Ok(AppState {
59            wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
60            connection_pooling_enabled: config.connection_pooling_enabled,
61            concurrent_outbound_connections_semaphore: config
62                .max_concurrent_connections
63                // Permit count is the max concurrent connections + 1.
64                // i.e., 0 concurrent connections means 1 total connection.
65                .map(|n| Arc::new(Semaphore::new(n + 1))),
66        })
67    }
68
69    fn prepare<T: RuntimeFactors>(
70        &self,
71        mut ctx: PrepareContext<T, Self>,
72    ) -> anyhow::Result<Self::InstanceBuilder> {
73        let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
74        let allowed_hosts = outbound_networking.allowed_hosts();
75        let blocked_networks = outbound_networking.blocked_networks();
76        let component_tls_configs = outbound_networking.component_tls_configs();
77        Ok(InstanceState {
78            wasi_http_ctx: WasiHttpCtx::new(),
79            allowed_hosts,
80            blocked_networks,
81            component_tls_configs,
82            self_request_origin: None,
83            request_interceptor: None,
84            spin_http_client: None,
85            wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
86            connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
87            concurrent_outbound_connections_semaphore: ctx
88                .app_state()
89                .concurrent_outbound_connections_semaphore
90                .clone(),
91        })
92    }
93}
94
95pub struct InstanceState {
96    wasi_http_ctx: WasiHttpCtx,
97    allowed_hosts: OutboundAllowedHosts,
98    blocked_networks: BlockedNetworks,
99    component_tls_configs: ComponentTlsClientConfigs,
100    self_request_origin: Option<SelfRequestOrigin>,
101    request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
102    // Connection-pooling client for 'fermyon:spin/http' interface
103    //
104    // TODO: We could move this to `AppState` like the
105    // `wasi:http/outgoing-handler` pool for consistency, although it's probably
106    // not a high priority given that `fermyon:spin/http` is deprecated anyway.
107    spin_http_client: Option<reqwest::Client>,
108    // Connection pooling clients for `wasi:http/outgoing-handler` interface
109    //
110    // This is a clone of `AppState::wasi_http_clients`, meaning it is shared
111    // among all instances of the app.
112    wasi_http_clients: wasi::HttpClients,
113    /// Whether connection pooling is enabled for this instance.
114    connection_pooling_enabled: bool,
115    /// A semaphore to limit the number of concurrent outbound connections.
116    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
117}
118
119impl InstanceState {
120    /// Sets the [`SelfRequestOrigin`] for this instance.
121    ///
122    /// This is used to handle outbound requests to relative URLs. If unset,
123    /// those requests will fail.
124    pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
125        self.self_request_origin = Some(origin);
126    }
127
128    /// Sets a [`OutboundHttpInterceptor`] for this instance.
129    ///
130    /// Returns an error if it has already been called for this instance.
131    pub fn set_request_interceptor(
132        &mut self,
133        interceptor: impl OutboundHttpInterceptor + 'static,
134    ) -> anyhow::Result<()> {
135        if self.request_interceptor.is_some() {
136            anyhow::bail!("set_request_interceptor can only be called once");
137        }
138        self.request_interceptor = Some(Arc::new(interceptor));
139        Ok(())
140    }
141}
142
143impl SelfInstanceBuilder for InstanceState {}
144
145/// Helper module for acquiring permits from the outbound connections semaphore.
146///
147/// This is used by the outbound HTTP implementations to limit concurrent outbound connections.
148mod concurrent_outbound_connections {
149    use super::*;
150
151    /// Acquires a semaphore permit for the given interface, if a semaphore is configured.
152    pub async fn acquire_semaphore<'a>(
153        interface: &str,
154        semaphore: &'a Option<Arc<Semaphore>>,
155    ) -> Option<tokio::sync::SemaphorePermit<'a>> {
156        let s = semaphore.as_ref()?;
157        acquire(interface, || s.try_acquire(), async || s.acquire().await).await
158    }
159
160    /// Acquires an owned semaphore permit for the given interface, if a semaphore is configured.
161    pub async fn acquire_owned_semaphore(
162        interface: &str,
163        semaphore: &Option<Arc<Semaphore>>,
164    ) -> Option<tokio::sync::OwnedSemaphorePermit> {
165        let s = semaphore.as_ref()?;
166        acquire(
167            interface,
168            || s.clone().try_acquire_owned(),
169            async || s.clone().acquire_owned().await,
170        )
171        .await
172    }
173
174    /// Helper function to acquire a semaphore permit, either immediately or by waiting.
175    ///
176    /// Allows getting either a borrowed or owned permit.
177    async fn acquire<T>(
178        interface: &str,
179        try_acquire: impl Fn() -> Result<T, tokio::sync::TryAcquireError>,
180        acquire: impl AsyncFnOnce() -> Result<T, tokio::sync::AcquireError>,
181    ) -> Option<T> {
182        // Try to acquire a permit without waiting first
183        // Keep track of whether we had to wait for metrics purposes.
184        let mut waited = false;
185        let permit = match try_acquire() {
186            Ok(p) => Ok(p),
187            // No available permits right now; wait for one
188            Err(tokio::sync::TryAcquireError::NoPermits) => {
189                waited = true;
190                acquire().await.map_err(|_| ())
191            }
192            Err(_) => Err(()),
193        };
194        if permit.is_ok() {
195            spin_telemetry::monotonic_counter!(
196                outbound_http.concurrent_connection_permits_acquired = 1,
197                interface = interface,
198                waited = waited
199            );
200        }
201        permit.ok()
202    }
203}
204
205pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
206pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
207
208/// SelfRequestOrigin indicates the base URI to use for "self" requests.
209#[derive(Clone, Debug)]
210pub struct SelfRequestOrigin {
211    pub scheme: Scheme,
212    pub authority: Authority,
213}
214
215impl SelfRequestOrigin {
216    pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
217        Ok(SelfRequestOrigin {
218            scheme,
219            authority: auth
220                .parse()
221                .with_context(|| format!("address '{auth}' is not a valid authority"))?,
222        })
223    }
224
225    pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
226        Ok(Self {
227            scheme: uri.scheme().context("URI missing scheme")?.clone(),
228            authority: uri.authority().context("URI missing authority")?.clone(),
229        })
230    }
231
232    fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
233        let mut parts = Parts::default();
234        parts.scheme = Some(self.scheme);
235        parts.authority = Some(self.authority);
236        parts.path_and_query = path_and_query;
237        Uri::from_parts(parts).unwrap()
238    }
239
240    fn use_tls(&self) -> bool {
241        self.scheme == Scheme::HTTPS
242    }
243
244    fn host_header(&self) -> HeaderValue {
245        HeaderValue::from_str(self.authority.as_str()).unwrap()
246    }
247}
248
249impl std::fmt::Display for SelfRequestOrigin {
250    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
251        write!(f, "{}://{}", self.scheme, self.authority)
252    }
253}
254
255pub struct AppState {
256    // Connection pooling clients for `wasi:http/outgoing-handler` interface
257    wasi_http_clients: wasi::HttpClients,
258    /// Whether connection pooling is enabled for this app.
259    connection_pooling_enabled: bool,
260    /// A semaphore to limit the number of concurrent outbound connections.
261    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
262}
263
264/// Removes IPs in the given [`BlockedNetworks`].
265///
266/// Returns [`ErrorCode::DestinationIpProhibited`] if all IPs are removed.
267fn remove_blocked_addrs(
268    blocked_networks: &BlockedNetworks,
269    addrs: &mut Vec<SocketAddr>,
270) -> Result<(), ErrorCode> {
271    if addrs.is_empty() {
272        return Ok(());
273    }
274    let blocked_addrs = blocked_networks.remove_blocked(addrs);
275    if addrs.is_empty() && !blocked_addrs.is_empty() {
276        tracing::error!(
277            "error.type" = "destination_ip_prohibited",
278            ?blocked_addrs,
279            "all destination IP(s) prohibited by runtime config"
280        );
281        return Err(ErrorCode::DestinationIpProhibited);
282    }
283    Ok(())
284}