spin_factor_outbound_http/
spin.rs1use http_body_util::BodyExt;
2use spin_world::v1::{
3 http as spin_http,
4 http_types::{self, HttpError, Method, Request, Response},
5};
6use tracing::{field::Empty, instrument, Span};
7
8use crate::intercept::InterceptOutcome;
9
10impl spin_http::Host for crate::InstanceState {
11 #[instrument(name = "spin_outbound_http.send_request", skip_all,
12 fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
13 http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
14 async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
15 let span = Span::current();
16 record_request_fields(&span, &req);
17
18 let uri = req.uri;
19 tracing::trace!("Sending outbound HTTP to {uri:?}");
20
21 if !req.params.is_empty() {
22 tracing::warn!("HTTP params field is deprecated");
23 }
24 let req_url = if !uri.starts_with('/') {
25 let is_allowed = self
27 .allowed_hosts
28 .check_url(&uri, "https")
29 .await
30 .unwrap_or(false);
31 if !is_allowed {
32 return Err(HttpError::DestinationNotAllowed);
33 }
34 uri.parse().map_err(|_| HttpError::InvalidUrl)?
35 } else {
36 let is_allowed = self
38 .allowed_hosts
39 .check_relative_url(&["http", "https"])
40 .await
41 .unwrap_or(false);
42 if !is_allowed {
43 return Err(HttpError::DestinationNotAllowed);
44 }
45
46 let Some(origin) = &self.self_request_origin else {
47 tracing::error!(
48 "Couldn't handle outbound HTTP request to relative URI; no origin set"
49 );
50 return Err(HttpError::InvalidUrl);
51 };
52 let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
53 origin.clone().into_uri(Some(path_and_query))
54 };
55
56 let mut req = {
58 let mut builder = http::Request::builder()
59 .method(hyper_method(req.method))
60 .uri(&req_url);
61 for (key, val) in req.headers {
62 builder = builder.header(key, val);
63 }
64 builder.body(req.body.unwrap_or_default())
65 }
66 .map_err(|err| {
67 tracing::error!("Error building outbound request: {err}");
68 HttpError::RuntimeError
69 })?;
70
71 spin_telemetry::inject_trace_context(req.headers_mut());
72
73 if let Some(interceptor) = &self.request_interceptor {
74 let intercepted_request = std::mem::take(&mut req).into();
75 match interceptor.intercept(intercepted_request).await {
76 Ok(InterceptOutcome::Continue(intercepted_request)) => {
77 req = intercepted_request.into_vec_request().unwrap();
78 }
79 Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
80 Err(err) => {
81 tracing::error!("Error in outbound HTTP interceptor: {err}");
82 return Err(HttpError::RuntimeError);
83 }
84 }
85 }
86
87 let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
89
90 let client = self.spin_http_client.get_or_insert_with(|| {
93 let mut builder = reqwest::Client::builder();
94 if !self.connection_pooling_enabled {
95 builder = builder.pool_max_idle_per_host(0);
96 }
97 builder.build().unwrap()
98 });
99
100 let permit = match &self.concurrent_outbound_connections_semaphore {
104 Some(s) => s.acquire().await.ok(),
105 None => None,
106 };
107 let resp = client.execute(req).await.map_err(log_reqwest_error)?;
108 drop(permit);
109
110 tracing::trace!("Returning response from outbound request to {req_url}");
111 span.record("http.response.status_code", resp.status().as_u16());
112 response_from_reqwest(resp).await
113 }
114}
115
116impl http_types::Host for crate::InstanceState {
117 fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
118 Ok(err)
119 }
120}
121
122fn record_request_fields(span: &Span, req: &Request) {
123 let method = match req.method {
124 Method::Get => "GET",
125 Method::Post => "POST",
126 Method::Put => "PUT",
127 Method::Delete => "DELETE",
128 Method::Patch => "PATCH",
129 Method::Head => "HEAD",
130 Method::Options => "OPTIONS",
131 };
132 span.record("otel.name", method)
135 .record("http.request.method", method)
136 .record("url.full", req.uri.clone());
137 if let Ok(uri) = req.uri.parse::<http::Uri>() {
138 if let Some(authority) = uri.authority() {
139 span.record("server.address", authority.host());
140 if let Some(port) = authority.port() {
141 span.record("server.port", port.as_u16());
142 }
143 }
144 }
145}
146
147fn hyper_method(m: Method) -> http::Method {
148 match m {
149 Method::Get => http::Method::GET,
150 Method::Post => http::Method::POST,
151 Method::Put => http::Method::PUT,
152 Method::Delete => http::Method::DELETE,
153 Method::Patch => http::Method::PATCH,
154 Method::Head => http::Method::HEAD,
155 Method::Options => http::Method::OPTIONS,
156 }
157}
158
159async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
160 let status = resp.status().as_u16();
161
162 let headers = headers_from_map(resp.headers());
163
164 let body = resp
165 .body_mut()
166 .collect()
167 .await
168 .map_err(|_| HttpError::RuntimeError)?
169 .to_bytes()
170 .to_vec();
171
172 Ok(Response {
173 status,
174 headers: Some(headers),
175 body: Some(body),
176 })
177}
178
179fn log_reqwest_error(err: reqwest::Error) -> HttpError {
180 let error_desc = if err.is_timeout() {
181 "timeout error"
182 } else if err.is_connect() {
183 "connection error"
184 } else if err.is_body() || err.is_decode() {
185 "message body error"
186 } else if err.is_request() {
187 "request error"
188 } else {
189 "error"
190 };
191 tracing::warn!(
192 "Outbound HTTP {}: URL {}, error detail {:?}",
193 error_desc,
194 err.url()
195 .map(|u| u.to_string())
196 .unwrap_or_else(|| "<unknown>".to_owned()),
197 err
198 );
199 HttpError::RuntimeError
200}
201
202async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
203 let status = res.status().as_u16();
204
205 let headers = headers_from_map(res.headers());
206
207 let body = res
208 .bytes()
209 .await
210 .map_err(|_| HttpError::RuntimeError)?
211 .to_vec();
212
213 Ok(Response {
214 status,
215 headers: Some(headers),
216 body: Some(body),
217 })
218}
219
220fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
221 map.iter()
222 .filter_map(|(key, val)| {
223 Some((
224 key.to_string(),
225 val.to_str()
226 .ok()
227 .or_else(|| {
228 tracing::warn!("Non-ascii response header value for {key}");
229 None
230 })?
231 .to_string(),
232 ))
233 })
234 .collect()
235}