spin_factor_outbound_http/
spin.rs1use http_body_util::BodyExt;
2use spin_world::v1::{
3 http as spin_http,
4 http_types::{self, HttpError, Method, Request, Response},
5};
6use tracing::{field::Empty, instrument, Level, Span};
7
8use crate::intercept::InterceptOutcome;
9
10impl spin_http::Host for crate::InstanceState {
11 #[instrument(name = "spin_outbound_http.send_request", skip_all, err(level = Level::INFO),
12 fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
13 http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
14 async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
15 let span = Span::current();
16 record_request_fields(&span, &req);
17
18 let uri = req.uri;
19 tracing::trace!("Sending outbound HTTP to {uri:?}");
20
21 if !req.params.is_empty() {
22 tracing::warn!("HTTP params field is deprecated");
23 }
24
25 let req_url = if !uri.starts_with('/') {
26 let is_allowed = self
28 .allowed_hosts
29 .check_url(&uri, "https")
30 .await
31 .unwrap_or(false);
32 if !is_allowed {
33 return Err(HttpError::DestinationNotAllowed);
34 }
35 uri.parse().map_err(|_| HttpError::InvalidUrl)?
36 } else {
37 let is_allowed = self
39 .allowed_hosts
40 .check_relative_url(&["http", "https"])
41 .await
42 .unwrap_or(false);
43 if !is_allowed {
44 return Err(HttpError::DestinationNotAllowed);
45 }
46
47 let Some(origin) = &self.self_request_origin else {
48 tracing::error!(
49 "Couldn't handle outbound HTTP request to relative URI; no origin set"
50 );
51 return Err(HttpError::InvalidUrl);
52 };
53 let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
54 origin.clone().into_uri(Some(path_and_query))
55 };
56
57 let mut req = {
59 let mut builder = http::Request::builder()
60 .method(hyper_method(req.method))
61 .uri(&req_url);
62 for (key, val) in req.headers {
63 builder = builder.header(key, val);
64 }
65 builder.body(req.body.unwrap_or_default())
66 }
67 .map_err(|err| {
68 tracing::error!("Error building outbound request: {err}");
69 HttpError::RuntimeError
70 })?;
71
72 spin_telemetry::inject_trace_context(req.headers_mut());
73
74 if let Some(interceptor) = &self.request_interceptor {
75 let intercepted_request = std::mem::take(&mut req).into();
76 match interceptor.intercept(intercepted_request).await {
77 Ok(InterceptOutcome::Continue(intercepted_request)) => {
78 req = intercepted_request.into_vec_request().unwrap();
79 }
80 Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
81 Err(err) => {
82 tracing::error!("Error in outbound HTTP interceptor: {err}");
83 return Err(HttpError::RuntimeError);
84 }
85 }
86 }
87
88 let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
90
91 let client = self.spin_http_client.get_or_insert_with(Default::default);
94
95 let resp = client.execute(req).await.map_err(log_reqwest_error)?;
96
97 tracing::trace!("Returning response from outbound request to {req_url}");
98 span.record("http.response.status_code", resp.status().as_u16());
99 response_from_reqwest(resp).await
100 }
101}
102
103impl http_types::Host for crate::InstanceState {
104 fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
105 Ok(err)
106 }
107}
108
109fn record_request_fields(span: &Span, req: &Request) {
110 let method = match req.method {
111 Method::Get => "GET",
112 Method::Post => "POST",
113 Method::Put => "PUT",
114 Method::Delete => "DELETE",
115 Method::Patch => "PATCH",
116 Method::Head => "HEAD",
117 Method::Options => "OPTIONS",
118 };
119 span.record("otel.name", method)
120 .record("http.request.method", method)
121 .record("url.full", req.uri.clone());
122 if let Ok(uri) = req.uri.parse::<http::Uri>() {
123 if let Some(authority) = uri.authority() {
124 span.record("server.address", authority.host());
125 if let Some(port) = authority.port() {
126 span.record("server.port", port.as_u16());
127 }
128 }
129 }
130}
131
132fn hyper_method(m: Method) -> http::Method {
133 match m {
134 Method::Get => http::Method::GET,
135 Method::Post => http::Method::POST,
136 Method::Put => http::Method::PUT,
137 Method::Delete => http::Method::DELETE,
138 Method::Patch => http::Method::PATCH,
139 Method::Head => http::Method::HEAD,
140 Method::Options => http::Method::OPTIONS,
141 }
142}
143
144async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
145 let status = resp.status().as_u16();
146
147 let headers = headers_from_map(resp.headers());
148
149 let body = resp
150 .body_mut()
151 .collect()
152 .await
153 .map_err(|_| HttpError::RuntimeError)?
154 .to_bytes()
155 .to_vec();
156
157 Ok(Response {
158 status,
159 headers: Some(headers),
160 body: Some(body),
161 })
162}
163
164fn log_reqwest_error(err: reqwest::Error) -> HttpError {
165 let error_desc = if err.is_timeout() {
166 "timeout error"
167 } else if err.is_connect() {
168 "connection error"
169 } else if err.is_body() || err.is_decode() {
170 "message body error"
171 } else if err.is_request() {
172 "request error"
173 } else {
174 "error"
175 };
176 tracing::warn!(
177 "Outbound HTTP {}: URL {}, error detail {:?}",
178 error_desc,
179 err.url()
180 .map(|u| u.to_string())
181 .unwrap_or_else(|| "<unknown>".to_owned()),
182 err
183 );
184 HttpError::RuntimeError
185}
186
187async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
188 let status = res.status().as_u16();
189
190 let headers = headers_from_map(res.headers());
191
192 let body = res
193 .bytes()
194 .await
195 .map_err(|_| HttpError::RuntimeError)?
196 .to_vec();
197
198 Ok(Response {
199 status,
200 headers: Some(headers),
201 body: Some(body),
202 })
203}
204
205fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
206 map.iter()
207 .filter_map(|(key, val)| {
208 Some((
209 key.to_string(),
210 val.to_str()
211 .ok()
212 .or_else(|| {
213 tracing::warn!("Non-ascii response header value for {key}");
214 None
215 })?
216 .to_string(),
217 ))
218 })
219 .collect()
220}