spin_factor_outbound_http/
spin.rs

1use std::sync::Arc;
2
3use http_body_util::BodyExt;
4use spin_factor_outbound_networking::config::blocked_networks::BlockedNetworks;
5use spin_world::v1::{
6    http as spin_http,
7    http_types::{self, HttpError, Method, Request, Response},
8};
9use tracing::{field::Empty, instrument, Span};
10
11use crate::intercept::InterceptOutcome;
12
13impl spin_http::Host for crate::InstanceState {
14    #[instrument(name = "spin_outbound_http.send_request", skip_all,
15        fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
16        http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
17    async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
18        let span = Span::current();
19        record_request_fields(&span, &req);
20
21        let uri = req.uri;
22        tracing::trace!("Sending outbound HTTP to {uri:?}");
23
24        if !req.params.is_empty() {
25            tracing::warn!("HTTP params field is deprecated");
26        }
27        let req_url = if !uri.starts_with('/') {
28            // Absolute URI
29            let is_allowed = self
30                .allowed_hosts
31                .check_url(&uri, "https")
32                .await
33                .unwrap_or(false);
34            if !is_allowed {
35                return Err(HttpError::DestinationNotAllowed);
36            }
37            uri.parse().map_err(|_| HttpError::InvalidUrl)?
38        } else {
39            // Relative URI ("self" request)
40            let is_allowed = self
41                .allowed_hosts
42                .check_relative_url(&["http", "https"])
43                .await
44                .unwrap_or(false);
45            if !is_allowed {
46                return Err(HttpError::DestinationNotAllowed);
47            }
48
49            let Some(origin) = &self.self_request_origin else {
50                tracing::error!(
51                    "Couldn't handle outbound HTTP request to relative URI; no origin set"
52                );
53                return Err(HttpError::InvalidUrl);
54            };
55            let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
56            origin.clone().into_uri(Some(path_and_query))
57        };
58
59        // Build an http::Request for OutboundHttpInterceptor
60        let mut req = {
61            let mut builder = http::Request::builder()
62                .method(hyper_method(req.method))
63                .uri(&req_url);
64            for (key, val) in req.headers {
65                builder = builder.header(key, val);
66            }
67            builder.body(req.body.unwrap_or_default())
68        }
69        .map_err(|err| {
70            tracing::error!("Error building outbound request: {err}");
71            HttpError::RuntimeError
72        })?;
73
74        spin_telemetry::inject_trace_context(req.headers_mut());
75
76        if let Some(interceptor) = &self.request_interceptor {
77            let intercepted_request = std::mem::take(&mut req).into();
78            match interceptor.intercept(intercepted_request).await {
79                Ok(InterceptOutcome::Continue(intercepted_request)) => {
80                    req = intercepted_request.into_vec_request().unwrap();
81                }
82                Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
83                Err(err) => {
84                    tracing::error!("Error in outbound HTTP interceptor: {err}");
85                    return Err(HttpError::RuntimeError);
86                }
87            }
88        }
89
90        // Convert http::Request to reqwest::Request
91        let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
92
93        // Allow reuse of Client's internal connection pool for multiple requests
94        // in a single component execution
95        let client = self.spin_http_client.get_or_insert_with(|| {
96            let mut builder = reqwest::Client::builder()
97                .dns_resolver(Arc::new(SpinDnsResolver(self.blocked_networks.clone())));
98            if !self.connection_pooling_enabled {
99                builder = builder.pool_max_idle_per_host(0);
100            }
101            builder.build().unwrap()
102        });
103
104        // If we're limiting concurrent outbound requests, acquire a permit
105        // Note: since we don't have access to the underlying connection, we can only
106        // limit the number of concurrent requests, not connections.
107        let permit = crate::concurrent_outbound_connections::acquire_semaphore(
108            "spin",
109            &self.concurrent_outbound_connections_semaphore,
110        )
111        .await;
112        let resp = client.execute(req).await.map_err(log_reqwest_error)?;
113        drop(permit);
114
115        tracing::trace!("Returning response from outbound request to {req_url}");
116        span.record("http.response.status_code", resp.status().as_u16());
117        response_from_reqwest(resp).await
118    }
119}
120
121/// Resolves DNS using Tokio's resolver, filtering out blocked IPs.
122struct SpinDnsResolver(BlockedNetworks);
123
124impl reqwest::dns::Resolve for SpinDnsResolver {
125    fn resolve(&self, name: reqwest::dns::Name) -> reqwest::dns::Resolving {
126        let blocked_networks = self.0.clone();
127        Box::pin(async move {
128            let mut addrs = tokio::net::lookup_host(name.as_str())
129                .await
130                .map_err(Box::new)?
131                .collect::<Vec<_>>();
132            // Remove blocked IPs
133            crate::remove_blocked_addrs(&blocked_networks, &mut addrs).map_err(Box::new)?;
134            Ok(Box::new(addrs.into_iter()) as reqwest::dns::Addrs)
135        })
136    }
137}
138
139impl http_types::Host for crate::InstanceState {
140    fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
141        Ok(err)
142    }
143}
144
145fn record_request_fields(span: &Span, req: &Request) {
146    let method = match req.method {
147        Method::Get => "GET",
148        Method::Post => "POST",
149        Method::Put => "PUT",
150        Method::Delete => "DELETE",
151        Method::Patch => "PATCH",
152        Method::Head => "HEAD",
153        Method::Options => "OPTIONS",
154    };
155    // Set otel.name to just the method name to fit with OpenTelemetry conventions
156    // <https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name>
157    span.record("otel.name", method)
158        .record("http.request.method", method)
159        .record("url.full", req.uri.clone());
160    if let Ok(uri) = req.uri.parse::<http::Uri>() {
161        if let Some(authority) = uri.authority() {
162            span.record("server.address", authority.host());
163            if let Some(port) = authority.port() {
164                span.record("server.port", port.as_u16());
165            }
166        }
167    }
168}
169
170fn hyper_method(m: Method) -> http::Method {
171    match m {
172        Method::Get => http::Method::GET,
173        Method::Post => http::Method::POST,
174        Method::Put => http::Method::PUT,
175        Method::Delete => http::Method::DELETE,
176        Method::Patch => http::Method::PATCH,
177        Method::Head => http::Method::HEAD,
178        Method::Options => http::Method::OPTIONS,
179    }
180}
181
182async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
183    let status = resp.status().as_u16();
184
185    let headers = headers_from_map(resp.headers());
186
187    let body = resp
188        .body_mut()
189        .collect()
190        .await
191        .map_err(|_| HttpError::RuntimeError)?
192        .to_bytes()
193        .to_vec();
194
195    Ok(Response {
196        status,
197        headers: Some(headers),
198        body: Some(body),
199    })
200}
201
202fn log_reqwest_error(err: reqwest::Error) -> HttpError {
203    let error_desc = if err.is_timeout() {
204        "timeout error"
205    } else if err.is_connect() {
206        "connection error"
207    } else if err.is_body() || err.is_decode() {
208        "message body error"
209    } else if err.is_request() {
210        "request error"
211    } else {
212        "error"
213    };
214    tracing::warn!(
215        "Outbound HTTP {}: URL {}, error detail {:?}",
216        error_desc,
217        err.url()
218            .map(|u| u.to_string())
219            .unwrap_or_else(|| "<unknown>".to_owned()),
220        err
221    );
222    HttpError::RuntimeError
223}
224
225async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
226    let status = res.status().as_u16();
227
228    let headers = headers_from_map(res.headers());
229
230    let body = res
231        .bytes()
232        .await
233        .map_err(|_| HttpError::RuntimeError)?
234        .to_vec();
235
236    Ok(Response {
237        status,
238        headers: Some(headers),
239        body: Some(body),
240    })
241}
242
243fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
244    map.iter()
245        .filter_map(|(key, val)| {
246            Some((
247                key.to_string(),
248                val.to_str()
249                    .ok()
250                    .or_else(|| {
251                        tracing::warn!("Non-ascii response header value for {key}");
252                        None
253                    })?
254                    .to_string(),
255            ))
256        })
257        .collect()
258}