spin_factor_outbound_http/
spin.rs1use std::sync::Arc;
2
3use http_body_util::BodyExt;
4use spin_factor_outbound_networking::config::blocked_networks::BlockedNetworks;
5use spin_world::v1::{
6 http as spin_http,
7 http_types::{self, HttpError, Method, Request, Response},
8};
9use tracing::{field::Empty, instrument, Span};
10
11use crate::intercept::InterceptOutcome;
12
13impl spin_http::Host for crate::InstanceState {
14 #[instrument(name = "spin_outbound_http.send_request", skip_all,
15 fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
16 http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
17 async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
18 let span = Span::current();
19 record_request_fields(&span, &req);
20
21 let uri = req.uri;
22 tracing::trace!("Sending outbound HTTP to {uri:?}");
23
24 if !req.params.is_empty() {
25 tracing::warn!("HTTP params field is deprecated");
26 }
27 let req_url = if !uri.starts_with('/') {
28 let is_allowed = self
30 .allowed_hosts
31 .check_url(&uri, "https")
32 .await
33 .unwrap_or(false);
34 if !is_allowed {
35 return Err(HttpError::DestinationNotAllowed);
36 }
37 uri.parse().map_err(|_| HttpError::InvalidUrl)?
38 } else {
39 let is_allowed = self
41 .allowed_hosts
42 .check_relative_url(&["http", "https"])
43 .await
44 .unwrap_or(false);
45 if !is_allowed {
46 return Err(HttpError::DestinationNotAllowed);
47 }
48
49 let Some(origin) = &self.self_request_origin else {
50 tracing::error!(
51 "Couldn't handle outbound HTTP request to relative URI; no origin set"
52 );
53 return Err(HttpError::InvalidUrl);
54 };
55 let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
56 origin.clone().into_uri(Some(path_and_query))
57 };
58
59 let mut req = {
61 let mut builder = http::Request::builder()
62 .method(hyper_method(req.method))
63 .uri(&req_url);
64 for (key, val) in req.headers {
65 builder = builder.header(key, val);
66 }
67 builder.body(req.body.unwrap_or_default())
68 }
69 .map_err(|err| {
70 tracing::error!("Error building outbound request: {err}");
71 HttpError::RuntimeError
72 })?;
73
74 spin_telemetry::inject_trace_context(req.headers_mut());
75
76 if let Some(interceptor) = &self.request_interceptor {
77 let intercepted_request = std::mem::take(&mut req).into();
78 match interceptor.intercept(intercepted_request).await {
79 Ok(InterceptOutcome::Continue(intercepted_request)) => {
80 req = intercepted_request.into_vec_request().unwrap();
81 }
82 Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
83 Err(err) => {
84 tracing::error!("Error in outbound HTTP interceptor: {err}");
85 return Err(HttpError::RuntimeError);
86 }
87 }
88 }
89
90 let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
92
93 let client = self.spin_http_client.get_or_insert_with(|| {
96 let mut builder = reqwest::Client::builder()
97 .dns_resolver(Arc::new(SpinDnsResolver(self.blocked_networks.clone())));
98 if !self.connection_pooling_enabled {
99 builder = builder.pool_max_idle_per_host(0);
100 }
101 builder.build().unwrap()
102 });
103
104 let permit = match &self.concurrent_outbound_connections_semaphore {
108 Some(s) => s.acquire().await.ok(),
109 None => None,
110 };
111 let resp = client.execute(req).await.map_err(log_reqwest_error)?;
112 drop(permit);
113
114 tracing::trace!("Returning response from outbound request to {req_url}");
115 span.record("http.response.status_code", resp.status().as_u16());
116 response_from_reqwest(resp).await
117 }
118}
119
120struct SpinDnsResolver(BlockedNetworks);
122
123impl reqwest::dns::Resolve for SpinDnsResolver {
124 fn resolve(&self, name: reqwest::dns::Name) -> reqwest::dns::Resolving {
125 let blocked_networks = self.0.clone();
126 Box::pin(async move {
127 let mut addrs = tokio::net::lookup_host(name.as_str())
128 .await
129 .map_err(Box::new)?
130 .collect::<Vec<_>>();
131 crate::remove_blocked_addrs(&blocked_networks, &mut addrs).map_err(Box::new)?;
133 Ok(Box::new(addrs.into_iter()) as reqwest::dns::Addrs)
134 })
135 }
136}
137
138impl http_types::Host for crate::InstanceState {
139 fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
140 Ok(err)
141 }
142}
143
144fn record_request_fields(span: &Span, req: &Request) {
145 let method = match req.method {
146 Method::Get => "GET",
147 Method::Post => "POST",
148 Method::Put => "PUT",
149 Method::Delete => "DELETE",
150 Method::Patch => "PATCH",
151 Method::Head => "HEAD",
152 Method::Options => "OPTIONS",
153 };
154 span.record("otel.name", method)
157 .record("http.request.method", method)
158 .record("url.full", req.uri.clone());
159 if let Ok(uri) = req.uri.parse::<http::Uri>() {
160 if let Some(authority) = uri.authority() {
161 span.record("server.address", authority.host());
162 if let Some(port) = authority.port() {
163 span.record("server.port", port.as_u16());
164 }
165 }
166 }
167}
168
169fn hyper_method(m: Method) -> http::Method {
170 match m {
171 Method::Get => http::Method::GET,
172 Method::Post => http::Method::POST,
173 Method::Put => http::Method::PUT,
174 Method::Delete => http::Method::DELETE,
175 Method::Patch => http::Method::PATCH,
176 Method::Head => http::Method::HEAD,
177 Method::Options => http::Method::OPTIONS,
178 }
179}
180
181async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
182 let status = resp.status().as_u16();
183
184 let headers = headers_from_map(resp.headers());
185
186 let body = resp
187 .body_mut()
188 .collect()
189 .await
190 .map_err(|_| HttpError::RuntimeError)?
191 .to_bytes()
192 .to_vec();
193
194 Ok(Response {
195 status,
196 headers: Some(headers),
197 body: Some(body),
198 })
199}
200
201fn log_reqwest_error(err: reqwest::Error) -> HttpError {
202 let error_desc = if err.is_timeout() {
203 "timeout error"
204 } else if err.is_connect() {
205 "connection error"
206 } else if err.is_body() || err.is_decode() {
207 "message body error"
208 } else if err.is_request() {
209 "request error"
210 } else {
211 "error"
212 };
213 tracing::warn!(
214 "Outbound HTTP {}: URL {}, error detail {:?}",
215 error_desc,
216 err.url()
217 .map(|u| u.to_string())
218 .unwrap_or_else(|| "<unknown>".to_owned()),
219 err
220 );
221 HttpError::RuntimeError
222}
223
224async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
225 let status = res.status().as_u16();
226
227 let headers = headers_from_map(res.headers());
228
229 let body = res
230 .bytes()
231 .await
232 .map_err(|_| HttpError::RuntimeError)?
233 .to_vec();
234
235 Ok(Response {
236 status,
237 headers: Some(headers),
238 body: Some(body),
239 })
240}
241
242fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
243 map.iter()
244 .filter_map(|(key, val)| {
245 Some((
246 key.to_string(),
247 val.to_str()
248 .ok()
249 .or_else(|| {
250 tracing::warn!("Non-ascii response header value for {key}");
251 None
252 })?
253 .to_string(),
254 ))
255 })
256 .collect()
257}