spin_factor_outbound_http/
spin.rs1use std::sync::Arc;
2
3use http_body_util::BodyExt;
4use spin_factor_outbound_networking::config::blocked_networks::BlockedNetworks;
5use spin_world::v1::{
6 http as spin_http,
7 http_types::{self, HttpError, Method, Request, Response},
8};
9use tracing::{field::Empty, instrument, Span};
10
11use crate::intercept::InterceptOutcome;
12
13impl spin_http::Host for crate::InstanceState {
14 #[instrument(name = "spin_outbound_http.send_request", skip_all,
15 fields(otel.kind = "client", url.full = Empty, http.request.method = Empty,
16 http.response.status_code = Empty, otel.name = Empty, server.address = Empty, server.port = Empty))]
17 async fn send_request(&mut self, req: Request) -> Result<Response, HttpError> {
18 let span = Span::current();
19 record_request_fields(&span, &req);
20
21 let uri = req.uri;
22 tracing::trace!("Sending outbound HTTP to {uri:?}");
23
24 if !req.params.is_empty() {
25 tracing::warn!("HTTP params field is deprecated");
26 }
27 let req_url = if !uri.starts_with('/') {
28 let is_allowed = self
30 .allowed_hosts
31 .check_url(&uri, "https")
32 .await
33 .unwrap_or(false);
34 if !is_allowed {
35 return Err(HttpError::DestinationNotAllowed);
36 }
37 uri.parse().map_err(|_| HttpError::InvalidUrl)?
38 } else {
39 let is_allowed = self
41 .allowed_hosts
42 .check_relative_url(&["http", "https"])
43 .await
44 .unwrap_or(false);
45 if !is_allowed {
46 return Err(HttpError::DestinationNotAllowed);
47 }
48
49 let Some(origin) = &self.self_request_origin else {
50 tracing::error!(
51 "Couldn't handle outbound HTTP request to relative URI; no origin set"
52 );
53 return Err(HttpError::InvalidUrl);
54 };
55 let path_and_query = uri.parse().map_err(|_| HttpError::InvalidUrl)?;
56 origin.clone().into_uri(Some(path_and_query))
57 };
58
59 let mut req = {
61 let mut builder = http::Request::builder()
62 .method(hyper_method(req.method))
63 .uri(&req_url);
64 for (key, val) in req.headers {
65 builder = builder.header(key, val);
66 }
67 builder.body(req.body.unwrap_or_default())
68 }
69 .map_err(|err| {
70 tracing::error!("Error building outbound request: {err}");
71 HttpError::RuntimeError
72 })?;
73
74 spin_telemetry::inject_trace_context(req.headers_mut());
75
76 if let Some(interceptor) = &self.request_interceptor {
77 let intercepted_request = std::mem::take(&mut req).into();
78 match interceptor.intercept(intercepted_request).await {
79 Ok(InterceptOutcome::Continue(intercepted_request)) => {
80 req = intercepted_request.into_vec_request().unwrap();
81 }
82 Ok(InterceptOutcome::Complete(resp)) => return response_from_hyper(resp).await,
83 Err(err) => {
84 tracing::error!("Error in outbound HTTP interceptor: {err}");
85 return Err(HttpError::RuntimeError);
86 }
87 }
88 }
89
90 let req = reqwest::Request::try_from(req).map_err(|_| HttpError::InvalidUrl)?;
92
93 let client = self.spin_http_client.get_or_insert_with(|| {
96 let mut builder = reqwest::Client::builder()
97 .dns_resolver(Arc::new(SpinDnsResolver(self.blocked_networks.clone())));
98 if !self.connection_pooling_enabled {
99 builder = builder.pool_max_idle_per_host(0);
100 }
101 builder.build().unwrap()
102 });
103
104 let permit = crate::concurrent_outbound_connections::acquire_semaphore(
108 "spin",
109 &self.concurrent_outbound_connections_semaphore,
110 )
111 .await;
112 let resp = client.execute(req).await.map_err(log_reqwest_error)?;
113 drop(permit);
114
115 tracing::trace!("Returning response from outbound request to {req_url}");
116 span.record("http.response.status_code", resp.status().as_u16());
117 response_from_reqwest(resp).await
118 }
119}
120
121struct SpinDnsResolver(BlockedNetworks);
123
124impl reqwest::dns::Resolve for SpinDnsResolver {
125 fn resolve(&self, name: reqwest::dns::Name) -> reqwest::dns::Resolving {
126 let blocked_networks = self.0.clone();
127 Box::pin(async move {
128 let mut addrs = tokio::net::lookup_host(name.as_str())
129 .await
130 .map_err(Box::new)?
131 .collect::<Vec<_>>();
132 crate::remove_blocked_addrs(&blocked_networks, &mut addrs).map_err(Box::new)?;
134 Ok(Box::new(addrs.into_iter()) as reqwest::dns::Addrs)
135 })
136 }
137}
138
139impl http_types::Host for crate::InstanceState {
140 fn convert_http_error(&mut self, err: HttpError) -> anyhow::Result<HttpError> {
141 Ok(err)
142 }
143}
144
145fn record_request_fields(span: &Span, req: &Request) {
146 let method = match req.method {
147 Method::Get => "GET",
148 Method::Post => "POST",
149 Method::Put => "PUT",
150 Method::Delete => "DELETE",
151 Method::Patch => "PATCH",
152 Method::Head => "HEAD",
153 Method::Options => "OPTIONS",
154 };
155 span.record("otel.name", method)
158 .record("http.request.method", method)
159 .record("url.full", req.uri.clone());
160 if let Ok(uri) = req.uri.parse::<http::Uri>() {
161 if let Some(authority) = uri.authority() {
162 span.record("server.address", authority.host());
163 if let Some(port) = authority.port() {
164 span.record("server.port", port.as_u16());
165 }
166 }
167 }
168}
169
170fn hyper_method(m: Method) -> http::Method {
171 match m {
172 Method::Get => http::Method::GET,
173 Method::Post => http::Method::POST,
174 Method::Put => http::Method::PUT,
175 Method::Delete => http::Method::DELETE,
176 Method::Patch => http::Method::PATCH,
177 Method::Head => http::Method::HEAD,
178 Method::Options => http::Method::OPTIONS,
179 }
180}
181
182async fn response_from_hyper(mut resp: crate::Response) -> Result<Response, HttpError> {
183 let status = resp.status().as_u16();
184
185 let headers = headers_from_map(resp.headers());
186
187 let body = resp
188 .body_mut()
189 .collect()
190 .await
191 .map_err(|_| HttpError::RuntimeError)?
192 .to_bytes()
193 .to_vec();
194
195 Ok(Response {
196 status,
197 headers: Some(headers),
198 body: Some(body),
199 })
200}
201
202fn log_reqwest_error(err: reqwest::Error) -> HttpError {
203 let error_desc = if err.is_timeout() {
204 "timeout error"
205 } else if err.is_connect() {
206 "connection error"
207 } else if err.is_body() || err.is_decode() {
208 "message body error"
209 } else if err.is_request() {
210 "request error"
211 } else {
212 "error"
213 };
214 tracing::warn!(
215 "Outbound HTTP {}: URL {}, error detail {:?}",
216 error_desc,
217 err.url()
218 .map(|u| u.to_string())
219 .unwrap_or_else(|| "<unknown>".to_owned()),
220 err
221 );
222 HttpError::RuntimeError
223}
224
225async fn response_from_reqwest(res: reqwest::Response) -> Result<Response, HttpError> {
226 let status = res.status().as_u16();
227
228 let headers = headers_from_map(res.headers());
229
230 let body = res
231 .bytes()
232 .await
233 .map_err(|_| HttpError::RuntimeError)?
234 .to_vec();
235
236 Ok(Response {
237 status,
238 headers: Some(headers),
239 body: Some(body),
240 })
241}
242
243fn headers_from_map(map: &http::HeaderMap) -> Vec<(String, String)> {
244 map.iter()
245 .filter_map(|(key, val)| {
246 Some((
247 key.to_string(),
248 val.to_str()
249 .ok()
250 .or_else(|| {
251 tracing::warn!("Non-ascii response header value for {key}");
252 None
253 })?
254 .to_string(),
255 ))
256 })
257 .collect()
258}