spin_factor_outbound_http/
spin.rs

1use http_body_util::BodyExt;
2use spin_world::v1::{
3    http as spin_http,
4    http_types::{self, HttpError, Method, Request, Response},
5};
6use tracing::{field::Empty, instrument, Level, Span};
7
8use crate::intercept::InterceptOutcome;
9
10impl spin_http::Host for crate::InstanceState {
11    #[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO),
12        fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
13        http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
14    async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
15        let span = Span::current();
16        record_request_fields(&span, &req);
17
18        let uri = req.uri;
19        tracing::trace!("Sending outbound HTTP to {uri:?}");
20
21        if !req.params.is_empty() {
22            tracing::warn!("HTTP params field is deprecated");
23        }
24
25        let req_url = if !uri.starts_with('/') {
26            // Absolute URI
27            let is_allowed = self
28                .allowed_hosts
29                .check_url(&uri, "https")
30                .await
31                .unwrap_or(false);
32            if !is_allowed {
33                return Err(HttpError::DestinationNotAllowed);
34            }
35            uri.parse().map_err(|_| HttpError::InvalidUrl)?
36        } else {
37            // Relative URI ("self" request)
38            let is_allowed = self
39                .allowed_hosts
40                .check_relative_url(&["http", "https"])
41                .await
42                .unwrap_or(false);
43            if !is_allowed {
44                return Err(HttpError::DestinationNotAllowed);
45            }
46
47            let Some(origin) = &self.self_request_origin else {
48                tracing::error!(
49                    "Couldn't handle outbound HTTP request to relative URI; no origin set"
50                );
51                return Err(HttpError::InvalidUrl);
52            };
53            let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
54            origin.clone().into_uri(Some(path_and_query))
55        };
56
57        // Build an http::Request for OutboundHttpInterceptor
58        let mut req = {
59            let mut builder = http::Request::builder()
60                .method(hyper_method(req.method))
61                .uri(&req_url);
62            for (key, val) in req.headers {
63                builder = builder.header(key, val);
64            }
65            builder.body(req.body.unwrap_or_default())
66        }
67        .map_err(|err| {
68            tracing::error!("Error building outbound request: {err}");
69            HttpError::RuntimeError
70        })?;
71
72        spin_telemetry::inject_trace_context(req.headers_mut());
73
74        if let Some(interceptor) = &self.request_interceptor {
75            let intercepted_request = std::mem::take(&mut req).into();
76            match interceptor.intercept(intercepted_request).await {
77                Ok(InterceptOutcome::Continue(intercepted_request)) => {
78                    req = intercepted_request.into_vec_request().unwrap();
79                }
80                Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
81                Err(err) => {
82                    tracing::error!("Error in outbound HTTP interceptor: {err}");
83                    return Err(HttpError::RuntimeError);
84                }
85            }
86        }
87
88        // Convert http::Request to reqwest::Request
89        let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
90
91        // Allow reuse of Client's internal connection pool for multiple requests
92        // in a single component execution
93        let client = self.spin_http_client.get_or_insert_with(Default::default);
94
95        let resp = client.execute(req).await.map_err(log_reqwest_error)?;
96
97        tracing::trace!("Returning response from outbound request to {req_url}");
98        span.record("http.response.status_code", resp.status().as_u16());
99        response_from_reqwest(resp).await
100    }
101}
102
103impl http_types::Host for crate::InstanceState {
104    fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
105        Ok(err)
106    }
107}
108
109fn record_request_fields(span: &Span, req: &Request) {
110    let method = match req.method {
111        Method::Get => "GET",
112        Method::Post => "POST",
113        Method::Put => "PUT",
114        Method::Delete => "DELETE",
115        Method::Patch => "PATCH",
116        Method::Head => "HEAD",
117        Method::Options => "OPTIONS",
118    };
119    span.record("otel.name", method)
120        .record("http.request.method", method)
121        .record("url.full", req.uri.clone());
122    if let Ok(uri) = req.uri.parse::<http::Uri>() {
123        if let Some(authority) = uri.authority() {
124            span.record("server.address", authority.host());
125            if let Some(port) = authority.port() {
126                span.record("server.port", port.as_u16());
127            }
128        }
129    }
130}
131
132fn hyper_method(m: Method) -> http::Method {
133    match m {
134        Method::Get => http::Method::GET,
135        Method::Post => http::Method::POST,
136        Method::Put => http::Method::PUT,
137        Method::Delete => http::Method::DELETE,
138        Method::Patch => http::Method::PATCH,
139        Method::Head => http::Method::HEAD,
140        Method::Options => http::Method::OPTIONS,
141    }
142}
143
144async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
145    let status = resp.status().as_u16();
146
147    let headers = headers_from_map(resp.headers());
148
149    let body = resp
150        .body_mut()
151        .collect()
152        .await
153        .map_err(|_| HttpError::RuntimeError)?
154        .to_bytes()
155        .to_vec();
156
157    Ok(Response {
158        status,
159        headers: Some(headers),
160        body: Some(body),
161    })
162}
163
164fn log_reqwest_error(err: reqwest::Error) -> HttpError {
165    let error_desc = if err.is_timeout() {
166        "timeout error"
167    } else if err.is_connect() {
168        "connection error"
169    } else if err.is_body() || err.is_decode() {
170        "message body error"
171    } else if err.is_request() {
172        "request error"
173    } else {
174        "error"
175    };
176    tracing::warn!(
177        "Outbound HTTP {}: URL {}, error detail {:?}",
178        error_desc,
179        err.url()
180            .map(|u| u.to_string())
181            .unwrap_or_else(|| "<unknown>".to_owned()),
182        err
183    );
184    HttpError::RuntimeError
185}
186
187async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
188    let status = res.status().as_u16();
189
190    let headers = headers_from_map(res.headers());
191
192    let body = res
193        .bytes()
194        .await
195        .map_err(|_| HttpError::RuntimeError)?
196        .to_vec();
197
198    Ok(Response {
199        status,
200        headers: Some(headers),
201        body: Some(body),
202    })
203}
204
205fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
206    map.iter()
207        .filter_map(|(key, val)| {
208            Some((
209                key.to_string(),
210                val.to_str()
211                    .ok()
212                    .or_else(|| {
213                        tracing::warn!("Non-ascii response header value for {key}");
214                        None
215                    })?
216                    .to_string(),
217            ))
218        })
219        .collect()
220}