spin_factor_outbound_http/
spin.rs

1use http_body_util::BodyExt;
2use spin_world::v1::{
3    http as spin_http,
4    http_types::{self, HttpError, Method, Request, Response},
5};
6use tracing::{field::Empty, instrument, Span};
7
8use crate::intercept::InterceptOutcome;
9
10impl spin_http::Host for crate::InstanceState {
11    #[instrument(name = "spin_outbound_http.send_request", skip_all,
12        fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
13        http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
14    async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
15        let span = Span::current();
16        record_request_fields(&span, &req);
17
18        let uri = req.uri;
19        tracing::trace!("Sending outbound HTTP to {uri:?}");
20
21        if !req.params.is_empty() {
22            tracing::warn!("HTTP params field is deprecated");
23        }
24        let req_url = if !uri.starts_with('/') {
25            // Absolute URI
26            let is_allowed = self
27                .allowed_hosts
28                .check_url(&uri, "https")
29                .await
30                .unwrap_or(false);
31            if !is_allowed {
32                return Err(HttpError::DestinationNotAllowed);
33            }
34            uri.parse().map_err(|_| HttpError::InvalidUrl)?
35        } else {
36            // Relative URI ("self" request)
37            let is_allowed = self
38                .allowed_hosts
39                .check_relative_url(&["http", "https"])
40                .await
41                .unwrap_or(false);
42            if !is_allowed {
43                return Err(HttpError::DestinationNotAllowed);
44            }
45
46            let Some(origin) = &self.self_request_origin else {
47                tracing::error!(
48                    "Couldn't handle outbound HTTP request to relative URI; no origin set"
49                );
50                return Err(HttpError::InvalidUrl);
51            };
52            let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
53            origin.clone().into_uri(Some(path_and_query))
54        };
55
56        // Build an http::Request for OutboundHttpInterceptor
57        let mut req = {
58            let mut builder = http::Request::builder()
59                .method(hyper_method(req.method))
60                .uri(&req_url);
61            for (key, val) in req.headers {
62                builder = builder.header(key, val);
63            }
64            builder.body(req.body.unwrap_or_default())
65        }
66        .map_err(|err| {
67            tracing::error!("Error building outbound request: {err}");
68            HttpError::RuntimeError
69        })?;
70
71        spin_telemetry::inject_trace_context(req.headers_mut());
72
73        if let Some(interceptor) = &self.request_interceptor {
74            let intercepted_request = std::mem::take(&mut req).into();
75            match interceptor.intercept(intercepted_request).await {
76                Ok(InterceptOutcome::Continue(intercepted_request)) => {
77                    req = intercepted_request.into_vec_request().unwrap();
78                }
79                Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
80                Err(err) => {
81                    tracing::error!("Error in outbound HTTP interceptor: {err}");
82                    return Err(HttpError::RuntimeError);
83                }
84            }
85        }
86
87        // Convert http::Request to reqwest::Request
88        let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
89
90        // Allow reuse of Client's internal connection pool for multiple requests
91        // in a single component execution
92        let client = self.spin_http_client.get_or_insert_with(|| {
93            let mut builder = reqwest::Client::builder();
94            if !self.connection_pooling_enabled {
95                builder = builder.pool_max_idle_per_host(0);
96            }
97            builder.build().unwrap()
98        });
99
100        // If we're limiting concurrent outbound requests, acquire a permit
101        // Note: since we don't have access to the underlying connection, we can only
102        // limit the number of concurrent requests, not connections.
103        let permit = match &self.concurrent_outbound_connections_semaphore {
104            Some(s) => s.acquire().await.ok(),
105            None => None,
106        };
107        let resp = client.execute(req).await.map_err(log_reqwest_error)?;
108        drop(permit);
109
110        tracing::trace!("Returning response from outbound request to {req_url}");
111        span.record("http.response.status_code", resp.status().as_u16());
112        response_from_reqwest(resp).await
113    }
114}
115
116impl http_types::Host for crate::InstanceState {
117    fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
118        Ok(err)
119    }
120}
121
122fn record_request_fields(span: &Span, req: &Request) {
123    let method = match req.method {
124        Method::Get => "GET",
125        Method::Post => "POST",
126        Method::Put => "PUT",
127        Method::Delete => "DELETE",
128        Method::Patch => "PATCH",
129        Method::Head => "HEAD",
130        Method::Options => "OPTIONS",
131    };
132    // Set otel.name to just the method name to fit with OpenTelemetry conventions
133    // <https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name>
134    span.record("otel.name", method)
135        .record("http.request.method", method)
136        .record("url.full", req.uri.clone());
137    if let Ok(uri) = req.uri.parse::<http::Uri>() {
138        if let Some(authority) = uri.authority() {
139            span.record("server.address", authority.host());
140            if let Some(port) = authority.port() {
141                span.record("server.port", port.as_u16());
142            }
143        }
144    }
145}
146
147fn hyper_method(m: Method) -> http::Method {
148    match m {
149        Method::Get => http::Method::GET,
150        Method::Post => http::Method::POST,
151        Method::Put => http::Method::PUT,
152        Method::Delete => http::Method::DELETE,
153        Method::Patch => http::Method::PATCH,
154        Method::Head => http::Method::HEAD,
155        Method::Options => http::Method::OPTIONS,
156    }
157}
158
159async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
160    let status = resp.status().as_u16();
161
162    let headers = headers_from_map(resp.headers());
163
164    let body = resp
165        .body_mut()
166        .collect()
167        .await
168        .map_err(|_| HttpError::RuntimeError)?
169        .to_bytes()
170        .to_vec();
171
172    Ok(Response {
173        status,
174        headers: Some(headers),
175        body: Some(body),
176    })
177}
178
179fn log_reqwest_error(err: reqwest::Error) -> HttpError {
180    let error_desc = if err.is_timeout() {
181        "timeout error"
182    } else if err.is_connect() {
183        "connection error"
184    } else if err.is_body() || err.is_decode() {
185        "message body error"
186    } else if err.is_request() {
187        "request error"
188    } else {
189        "error"
190    };
191    tracing::warn!(
192        "Outbound HTTP {}: URL {}, error detail {:?}",
193        error_desc,
194        err.url()
195            .map(|u| u.to_string())
196            .unwrap_or_else(|| "<unknown>".to_owned()),
197        err
198    );
199    HttpError::RuntimeError
200}
201
202async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
203    let status = res.status().as_u16();
204
205    let headers = headers_from_map(res.headers());
206
207    let body = res
208        .bytes()
209        .await
210        .map_err(|_| HttpError::RuntimeError)?
211        .to_vec();
212
213    Ok(Response {
214        status,
215        headers: Some(headers),
216        body: Some(body),
217    })
218}
219
220fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
221    map.iter()
222        .filter_map(|(key, val)| {
223            Some((
224                key.to_string(),
225                val.to_str()
226                    .ok()
227                    .or_else(|| {
228                        tracing::warn!("Non-ascii response header value for {key}");
229                        None
230                    })?
231                    .to_string(),
232            ))
233        })
234        .collect()
235}