spin_factor_outbound_http/
lib.rs

1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12    uri::{Authority, Parts, PathAndQuery, Scheme},
13    HeaderValue, Uri,
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_outbound_networking::{
18    config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
19    ComponentTlsClientConfigs, OutboundNetworkingFactor,
20};
21use spin_factors::{
22    anyhow, ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors,
23    SelfInstanceBuilder,
24};
25use tokio::sync::Semaphore;
26use wasmtime_wasi_http::WasiHttpCtx;
27
28pub use wasmtime_wasi_http::{
29    bindings::http::types::ErrorCode,
30    body::HyperOutgoingBody,
31    types::{HostFutureIncomingResponse, OutgoingRequestConfig},
32    HttpResult,
33};
34
35pub use wasi::{p2_to_p3_error_code, p3_to_p2_error_code};
36
37#[derive(Default)]
38pub struct OutboundHttpFactor {
39    _priv: (),
40}
41
42impl Factor for OutboundHttpFactor {
43    type RuntimeConfig = RuntimeConfig;
44    type AppState = AppState;
45    type InstanceBuilder = InstanceState;
46
47    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
48        ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
49        wasi::add_to_linker(ctx)?;
50        Ok(())
51    }
52
53    fn configure_app<T: RuntimeFactors>(
54        &self,
55        mut ctx: ConfigureAppContext<T, Self>,
56    ) -> anyhow::Result<Self::AppState> {
57        let config = ctx.take_runtime_config().unwrap_or_default();
58        Ok(AppState {
59            wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
60            connection_pooling_enabled: config.connection_pooling_enabled,
61            concurrent_outbound_connections_semaphore: config
62                .max_concurrent_connections
63                // Permit count is the max concurrent connections + 1.
64                // i.e., 0 concurrent connections means 1 total connection.
65                .map(|n| Arc::new(Semaphore::new(n + 1))),
66        })
67    }
68
69    fn prepare<T: RuntimeFactors>(
70        &self,
71        mut ctx: PrepareContext<T, Self>,
72    ) -> anyhow::Result<Self::InstanceBuilder> {
73        let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
74        let allowed_hosts = outbound_networking.allowed_hosts();
75        let blocked_networks = outbound_networking.blocked_networks();
76        let component_tls_configs = outbound_networking.component_tls_configs();
77        Ok(InstanceState {
78            wasi_http_ctx: WasiHttpCtx::new(),
79            allowed_hosts,
80            blocked_networks,
81            component_tls_configs,
82            self_request_origin: None,
83            request_interceptor: None,
84            spin_http_client: None,
85            wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
86            connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
87            concurrent_outbound_connections_semaphore: ctx
88                .app_state()
89                .concurrent_outbound_connections_semaphore
90                .clone(),
91        })
92    }
93}
94
95pub struct InstanceState {
96    wasi_http_ctx: WasiHttpCtx,
97    allowed_hosts: OutboundAllowedHosts,
98    blocked_networks: BlockedNetworks,
99    component_tls_configs: ComponentTlsClientConfigs,
100    self_request_origin: Option<SelfRequestOrigin>,
101    request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
102    // Connection-pooling client for 'fermyon:spin/http' interface
103    //
104    // TODO: We could move this to `AppState` like the
105    // `wasi:http/outgoing-handler` pool for consistency, although it's probably
106    // not a high priority given that `fermyon:spin/http` is deprecated anyway.
107    spin_http_client: Option<reqwest::Client>,
108    // Connection pooling clients for `wasi:http/outgoing-handler` interface
109    //
110    // This is a clone of `AppState::wasi_http_clients`, meaning it is shared
111    // among all instances of the app.
112    wasi_http_clients: wasi::HttpClients,
113    /// Whether connection pooling is enabled for this instance.
114    connection_pooling_enabled: bool,
115    /// A semaphore to limit the number of concurrent outbound connections.
116    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
117}
118
119impl InstanceState {
120    /// Sets the [`SelfRequestOrigin`] for this instance.
121    ///
122    /// This is used to handle outbound requests to relative URLs. If unset,
123    /// those requests will fail.
124    pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
125        self.self_request_origin = Some(origin);
126    }
127
128    /// Sets a [`OutboundHttpInterceptor`] for this instance.
129    ///
130    /// Returns an error if it has already been called for this instance.
131    pub fn set_request_interceptor(
132        &mut self,
133        interceptor: impl OutboundHttpInterceptor + 'static,
134    ) -> anyhow::Result<()> {
135        if self.request_interceptor.is_some() {
136            anyhow::bail!("set_request_interceptor can only be called once");
137        }
138        self.request_interceptor = Some(Arc::new(interceptor));
139        Ok(())
140    }
141}
142
143impl SelfInstanceBuilder for InstanceState {}
144
145pub type Request = http::Request<wasmtime_wasi_http::body::HyperOutgoingBody>;
146pub type Response = http::Response<wasmtime_wasi_http::body::HyperIncomingBody>;
147
148/// SelfRequestOrigin indicates the base URI to use for "self" requests.
149#[derive(Clone, Debug)]
150pub struct SelfRequestOrigin {
151    pub scheme: Scheme,
152    pub authority: Authority,
153}
154
155impl SelfRequestOrigin {
156    pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
157        Ok(SelfRequestOrigin {
158            scheme,
159            authority: auth
160                .parse()
161                .with_context(|| format!("address '{auth}' is not a valid authority"))?,
162        })
163    }
164
165    pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
166        Ok(Self {
167            scheme: uri.scheme().context("URI missing scheme")?.clone(),
168            authority: uri.authority().context("URI missing authority")?.clone(),
169        })
170    }
171
172    fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
173        let mut parts = Parts::default();
174        parts.scheme = Some(self.scheme);
175        parts.authority = Some(self.authority);
176        parts.path_and_query = path_and_query;
177        Uri::from_parts(parts).unwrap()
178    }
179
180    fn use_tls(&self) -> bool {
181        self.scheme == Scheme::HTTPS
182    }
183
184    fn host_header(&self) -> HeaderValue {
185        HeaderValue::from_str(self.authority.as_str()).unwrap()
186    }
187}
188
189impl std::fmt::Display for SelfRequestOrigin {
190    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
191        write!(f, "{}://{}", self.scheme, self.authority)
192    }
193}
194
195pub struct AppState {
196    // Connection pooling clients for `wasi:http/outgoing-handler` interface
197    wasi_http_clients: wasi::HttpClients,
198    /// Whether connection pooling is enabled for this app.
199    connection_pooling_enabled: bool,
200    /// A semaphore to limit the number of concurrent outbound connections.
201    concurrent_outbound_connections_semaphore: Option<Arc<Semaphore>>,
202}
203
204/// Removes IPs in the given [`BlockedNetworks`].
205///
206/// Returns [`ErrorCode::DestinationIpProhibited`] if all IPs are removed.
207fn remove_blocked_addrs(
208    blocked_networks: &BlockedNetworks,
209    addrs: &mut Vec<SocketAddr>,
210) -> Result<(), ErrorCode> {
211    if addrs.is_empty() {
212        return Ok(());
213    }
214    let blocked_addrs = blocked_networks.remove_blocked(addrs);
215    if addrs.is_empty() && !blocked_addrs.is_empty() {
216        tracing::error!(
217            "error.type" = "destination_ip_prohibited",
218            ?blocked_addrs,
219            "all destination IP(s) prohibited by runtime config"
220        );
221        return Err(ErrorCode::DestinationIpProhibited);
222    }
223    Ok(())
224}