Skip to main content

spin_factor_outbound_http/
lib.rs

1pub mod intercept;
2pub mod runtime_config;
3mod spin;
4mod wasi;
5pub mod wasi_2023_10_18;
6pub mod wasi_2023_11_10;
7
8use std::{net::SocketAddr, sync::Arc};
9
10use anyhow::Context;
11use http::{
12    HeaderValue, Uri,
13    uri::{Authority, Parts, PathAndQuery, Scheme},
14};
15use intercept::OutboundHttpInterceptor;
16use runtime_config::RuntimeConfig;
17use spin_factor_otel::OtelFactorState;
18use spin_factor_outbound_networking::{
19    ComponentTlsClientConfigs, ConnectionSemaphore, OutboundNetworkingFactor,
20    build_connection_semaphore,
21    config::{allowed_hosts::OutboundAllowedHosts, blocked_networks::BlockedNetworks},
22};
23use spin_factors::{
24    ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
25    anyhow,
26};
27use wasmtime_wasi_http::WasiHttpCtx;
28
29pub use wasmtime_wasi_http::p2::{
30    HttpResult,
31    bindings::http::types::ErrorCode,
32    body::HyperOutgoingBody,
33    types::{HostFutureIncomingResponse, OutgoingRequestConfig},
34};
35
36pub use wasi::{MutexBody, NotifyOnDropBody, p2_to_p3_error_code, p3_to_p2_error_code};
37
38#[derive(Default)]
39pub struct OutboundHttpFactor {
40    _priv: (),
41}
42
43impl Factor for OutboundHttpFactor {
44    type RuntimeConfig = RuntimeConfig;
45    type AppState = AppState;
46    type InstanceBuilder = InstanceState;
47
48    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
49        ctx.link_bindings(spin_world::v1::http::add_to_linker::<_, FactorData<Self>>)?;
50        wasi::add_to_linker(ctx)?;
51        Ok(())
52    }
53
54    fn configure_app<T: RuntimeFactors>(
55        &self,
56        mut ctx: ConfigureAppContext<T, Self>,
57    ) -> anyhow::Result<Self::AppState> {
58        let config = ctx.take_runtime_config().unwrap_or_default();
59        let networking = ctx.app_state::<OutboundNetworkingFactor>().ok();
60
61        Ok(AppState {
62            wasi_http_clients: wasi::HttpClients::new(config.connection_pooling_enabled),
63            connection_pooling_enabled: config.connection_pooling_enabled,
64            semaphore: build_connection_semaphore(
65                networking,
66                "http",
67                config.max_concurrent_connections,
68                config.wait_timeout,
69            ),
70        })
71    }
72
73    fn prepare<T: RuntimeFactors>(
74        &self,
75        mut ctx: PrepareContext<T, Self>,
76    ) -> anyhow::Result<Self::InstanceBuilder> {
77        let outbound_networking = ctx.instance_builder::<OutboundNetworkingFactor>()?;
78        let allowed_hosts = outbound_networking.allowed_hosts();
79        let blocked_networks = outbound_networking.blocked_networks();
80        let component_tls_configs = outbound_networking.component_tls_configs();
81        let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
82        Ok(InstanceState {
83            wasi_http_ctx: WasiHttpCtx::new(),
84            hooks: InstanceHttpHooks {
85                allowed_hosts,
86                blocked_networks,
87                component_tls_configs,
88                self_request_origin: None,
89                request_interceptor: None,
90                spin_http_client: None,
91                wasi_http_clients: ctx.app_state().wasi_http_clients.clone(),
92                connection_pooling_enabled: ctx.app_state().connection_pooling_enabled,
93                semaphore: ctx.app_state().semaphore.clone(),
94                otel,
95            },
96        })
97    }
98}
99
100pub struct InstanceState {
101    wasi_http_ctx: WasiHttpCtx,
102    hooks: InstanceHttpHooks,
103}
104
105struct InstanceHttpHooks {
106    allowed_hosts: OutboundAllowedHosts,
107    blocked_networks: BlockedNetworks,
108    component_tls_configs: ComponentTlsClientConfigs,
109    self_request_origin: Option<SelfRequestOrigin>,
110    request_interceptor: Option<Arc<dyn OutboundHttpInterceptor>>,
111    // Connection-pooling client for 'fermyon:spin/http' interface
112    //
113    // TODO: We could move this to `AppState` like the
114    // `wasi:http/outgoing-handler` pool for consistency, although it's probably
115    // not a high priority given that `fermyon:spin/http` is deprecated anyway.
116    spin_http_client: Option<reqwest::Client>,
117    // Connection pooling clients for `wasi:http/outgoing-handler` interface
118    //
119    // This is a clone of `AppState::wasi_http_clients`, meaning it is shared
120    // among all instances of the app.
121    wasi_http_clients: wasi::HttpClients,
122    /// Whether connection pooling is enabled for this instance.
123    connection_pooling_enabled: bool,
124    /// Semaphore to limit concurrent outbound connections.
125    semaphore: ConnectionSemaphore,
126    /// Manages access to the OtelFactor state.
127    otel: OtelFactorState,
128}
129
130impl InstanceState {
131    /// Sets the [`SelfRequestOrigin`] for this instance.
132    ///
133    /// This is used to handle outbound requests to relative URLs. If unset,
134    /// those requests will fail.
135    pub fn set_self_request_origin(&mut self, origin: SelfRequestOrigin) {
136        self.hooks.self_request_origin = Some(origin);
137    }
138
139    /// Sets a [`OutboundHttpInterceptor`] for this instance.
140    ///
141    /// Returns an error if it has already been called for this instance.
142    pub fn set_request_interceptor(
143        &mut self,
144        interceptor: impl OutboundHttpInterceptor + 'static,
145    ) -> anyhow::Result<()> {
146        if self.hooks.request_interceptor.is_some() {
147            anyhow::bail!("set_request_interceptor can only be called once");
148        }
149        self.hooks.request_interceptor = Some(Arc::new(interceptor));
150        Ok(())
151    }
152}
153
154impl SelfInstanceBuilder for InstanceState {}
155
156pub type Request = http::Request<wasmtime_wasi_http::p2::body::HyperOutgoingBody>;
157pub type Response = http::Response<wasmtime_wasi_http::p2::body::HyperIncomingBody>;
158
159/// SelfRequestOrigin indicates the base URI to use for "self" requests.
160#[derive(Clone, Debug)]
161pub struct SelfRequestOrigin {
162    pub scheme: Scheme,
163    pub authority: Authority,
164}
165
166impl SelfRequestOrigin {
167    pub fn create(scheme: Scheme, auth: &str) -> anyhow::Result<Self> {
168        Ok(SelfRequestOrigin {
169            scheme,
170            authority: auth
171                .parse()
172                .with_context(|| format!("address '{auth}' is not a valid authority"))?,
173        })
174    }
175
176    pub fn from_uri(uri: &Uri) -> anyhow::Result<Self> {
177        Ok(Self {
178            scheme: uri.scheme().context("URI missing scheme")?.clone(),
179            authority: uri.authority().context("URI missing authority")?.clone(),
180        })
181    }
182
183    fn into_uri(self, path_and_query: Option<PathAndQuery>) -> Uri {
184        let mut parts = Parts::default();
185        parts.scheme = Some(self.scheme);
186        parts.authority = Some(self.authority);
187        parts.path_and_query = path_and_query;
188        Uri::from_parts(parts).unwrap()
189    }
190
191    fn use_tls(&self) -> bool {
192        self.scheme == Scheme::HTTPS
193    }
194
195    fn host_header(&self) -> HeaderValue {
196        HeaderValue::from_str(self.authority.as_str()).unwrap()
197    }
198}
199
200impl std::fmt::Display for SelfRequestOrigin {
201    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202        write!(f, "{}://{}", self.scheme, self.authority)
203    }
204}
205
206pub struct AppState {
207    // Connection pooling clients for `wasi:http/outgoing-handler` interface
208    wasi_http_clients: wasi::HttpClients,
209    /// Whether connection pooling is enabled for this app.
210    connection_pooling_enabled: bool,
211    /// Semaphore to limit concurrent outbound connections.
212    semaphore: ConnectionSemaphore,
213}
214
215/// Removes IPs in the given [`BlockedNetworks`].
216///
217/// Returns [`ErrorCode::DestinationIpProhibited`] if all IPs are removed.
218fn remove_blocked_addrs(
219    blocked_networks: &BlockedNetworks,
220    addrs: &mut Vec<SocketAddr>,
221) -> Result<(), ErrorCode> {
222    if addrs.is_empty() {
223        return Ok(());
224    }
225    let blocked_addrs = blocked_networks.remove_blocked(addrs);
226    if addrs.is_empty() && !blocked_addrs.is_empty() {
227        tracing::error!(
228            "error.type" = "destination_ip_prohibited",
229            ?blocked_addrs,
230            "all destination IP(s) prohibited by runtime config"
231        );
232        return Err(ErrorCode::DestinationIpProhibited);
233    }
234    Ok(())
235}