1use std::{collections::HashMap, future::Future, io::IsTerminal, net::SocketAddr, sync::Arc};
2
3use anyhow::{bail, Context};
4use http::{
5 uri::{Authority, Scheme},
6 Request, Response, StatusCode, Uri,
7};
8use http_body_util::BodyExt;
9use hyper::{
10 body::{Bytes, Incoming},
11 server::conn::http1,
12 service::service_fn,
13};
14use hyper_util::rt::TokioIo;
15use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
16use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
17use spin_factors::RuntimeFactors;
18use spin_http::{
19 app_info::AppInfo,
20 body,
21 config::{HttpExecutorType, HttpTriggerConfig},
22 routes::{RouteMatch, Router},
23 trigger::HandlerType,
24};
25use tokio::{
26 io::{AsyncRead, AsyncWrite},
27 net::TcpListener,
28 task,
29};
30use tracing::Instrument;
31use wasmtime_wasi::bindings::CommandIndices;
32use wasmtime_wasi_http::body::HyperOutgoingBody;
33
34use crate::{
35 headers::strip_forbidden_headers,
36 instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
37 outbound_http::OutboundHttpInterceptor,
38 spin::SpinHttpExecutor,
39 wagi::WagiHttpExecutor,
40 wasi::WasiHttpExecutor,
41 Body, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
42};
43
44pub struct HttpServer<F: RuntimeFactors> {
46 listen_addr: SocketAddr,
48 tls_config: Option<TlsConfig>,
50 router: Router,
52 trigger_app: TriggerApp<F>,
54 component_trigger_configs: HashMap<String, HttpTriggerConfig>,
56 component_handler_types: HashMap<String, HandlerType>,
58}
59
60impl<F: RuntimeFactors> HttpServer<F> {
61 pub fn new(
63 listen_addr: SocketAddr,
64 tls_config: Option<TlsConfig>,
65 trigger_app: TriggerApp<F>,
66 ) -> anyhow::Result<Self> {
67 let component_trigger_configs = Vec::from_iter(
69 trigger_app
70 .app()
71 .trigger_configs::<HttpTriggerConfig>("http")?
72 .into_iter()
73 .map(|(_, config)| (config.component.clone(), config)),
74 );
75
76 let component_routes = component_trigger_configs
78 .iter()
79 .map(|(component_id, config)| (component_id.as_str(), &config.route));
80 let mut duplicate_routes = Vec::new();
81 let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
82 if !duplicate_routes.is_empty() {
83 tracing::error!(
84 "The following component routes are duplicates and will never be used:"
85 );
86 for dup in &duplicate_routes {
87 tracing::error!(
88 " {}: {} (duplicate of {})",
89 dup.replaced_id,
90 dup.route(),
91 dup.effective_id,
92 );
93 }
94 }
95 if router.contains_reserved_route() {
96 tracing::error!(
97 "Routes under {} are handled by the Spin runtime and will never be reached",
98 spin_http::WELL_KNOWN_PREFIX
99 );
100 }
101 tracing::trace!(
102 "Constructed router: {:?}",
103 router.routes().collect::<Vec<_>>()
104 );
105
106 let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
108
109 let component_handler_types = component_trigger_configs
110 .iter()
111 .map(|(component_id, trigger_config)| {
112 let component = trigger_app.get_component(component_id)?;
113 let handler_type = match &trigger_config.executor {
114 None | Some(HttpExecutorType::Http) => {
115 HandlerType::from_component(component)?
116 }
117 Some(HttpExecutorType::Wagi(wagi_config)) => {
118 anyhow::ensure!(
119 wagi_config.entrypoint == "_start",
120 "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
121 );
122 HandlerType::Wagi(CommandIndices::new(component)
123 .context("failed to find wasi command interface for wagi executor")?)
124 }
125 };
126 Ok((component_id.clone(), handler_type))
127 })
128 .collect::<anyhow::Result<_>>()?;
129 Ok(Self {
130 listen_addr,
131 tls_config,
132 router,
133 trigger_app,
134 component_trigger_configs,
135 component_handler_types,
136 })
137 }
138
139 pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
141 let listener = TcpListener::bind(self.listen_addr).await.with_context(|| {
142 format!(
143 "Unable to listen on {listen_addr}",
144 listen_addr = self.listen_addr
145 )
146 })?;
147 if let Some(tls_config) = self.tls_config.clone() {
148 self.serve_https(listener, tls_config).await?;
149 } else {
150 self.serve_http(listener).await?;
151 }
152 Ok(())
153 }
154
155 async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
156 self.print_startup_msgs("http", &listener)?;
157 loop {
158 let (stream, client_addr) = listener.accept().await?;
159 self.clone()
160 .serve_connection(stream, Scheme::HTTP, client_addr);
161 }
162 }
163
164 async fn serve_https(
165 self: Arc<Self>,
166 listener: TcpListener,
167 tls_config: TlsConfig,
168 ) -> anyhow::Result<()> {
169 self.print_startup_msgs("https", &listener)?;
170 let acceptor = tls_config.server_config()?;
171 loop {
172 let (stream, client_addr) = listener.accept().await?;
173 match acceptor.accept(stream).await {
174 Ok(stream) => self
175 .clone()
176 .serve_connection(stream, Scheme::HTTPS, client_addr),
177 Err(err) => tracing::error!(?err, "Failed to start TLS session"),
178 }
179 }
180 }
181
182 pub async fn handle(
187 self: &Arc<Self>,
188 mut req: Request<Body>,
189 server_scheme: Scheme,
190 client_addr: SocketAddr,
191 ) -> anyhow::Result<Response<Body>> {
192 strip_forbidden_headers(&mut req);
193
194 spin_telemetry::extract_trace_context(&req);
195
196 let path = req.uri().path().to_string();
197
198 tracing::info!("Processing request on path '{path}'");
199
200 if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
202 return match well_known {
203 "health" => Ok(MatchedRoute::with_response_extension(
204 Response::new(body::full(Bytes::from_static(b"OK"))),
205 path,
206 )),
207 "info" => self.app_info(path),
208 _ => Self::not_found(NotFoundRouteKind::WellKnown),
209 };
210 }
211
212 match self.router.route(&path) {
213 Ok(route_match) => {
214 self.handle_trigger_route(req, route_match, server_scheme, client_addr)
215 .await
216 }
217 Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
218 }
219 }
220
221 pub async fn handle_trigger_route(
223 self: &Arc<Self>,
224 mut req: Request<Body>,
225 route_match: RouteMatch<'_, '_>,
226 server_scheme: Scheme,
227 client_addr: SocketAddr,
228 ) -> anyhow::Result<Response<Body>> {
229 set_req_uri(&mut req, server_scheme.clone())?;
230 let app_id = self
231 .trigger_app
232 .app()
233 .get_metadata(APP_NAME_KEY)?
234 .unwrap_or_else(|| "<unnamed>".into());
235
236 let component_id = route_match.component_id();
237
238 spin_telemetry::metrics::monotonic_counter!(
239 spin.request_count = 1,
240 trigger_type = "http",
241 app_id = app_id,
242 component_id = component_id
243 );
244
245 let mut instance_builder = self.trigger_app.prepare(component_id)?;
246
247 let outbound_http = instance_builder
252 .factor_builder::<OutboundHttpFactor>()
253 .context(
254 "The wasi HTTP trigger was configured without the required wasi outbound http support",
255 )?;
256 let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
257 outbound_http.set_self_request_origin(origin);
258 outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
259
260 let trigger_config = self.component_trigger_configs.get(component_id).unwrap();
262 let handler_type = self.component_handler_types.get(component_id).unwrap();
263 let executor = trigger_config
264 .executor
265 .as_ref()
266 .unwrap_or(&HttpExecutorType::Http);
267
268 let res = match executor {
269 HttpExecutorType::Http => match handler_type {
270 HandlerType::Spin => {
271 SpinHttpExecutor
272 .execute(instance_builder, &route_match, req, client_addr)
273 .await
274 }
275 HandlerType::Wasi0_2(_)
276 | HandlerType::Wasi2023_11_10(_)
277 | HandlerType::Wasi2023_10_18(_) => {
278 WasiHttpExecutor { handler_type }
279 .execute(instance_builder, &route_match, req, client_addr)
280 .await
281 }
282 HandlerType::Wagi(_) => unreachable!(),
283 },
284 HttpExecutorType::Wagi(wagi_config) => {
285 let indices = match handler_type {
286 HandlerType::Wagi(indices) => indices,
287 _ => unreachable!(),
288 };
289 let executor = WagiHttpExecutor {
290 wagi_config,
291 indices,
292 };
293 executor
294 .execute(instance_builder, &route_match, req, client_addr)
295 .await
296 }
297 };
298 match res {
299 Ok(res) => Ok(MatchedRoute::with_response_extension(
300 res,
301 route_match.raw_route(),
302 )),
303 Err(err) => {
304 tracing::error!("Error processing request: {err:?}");
305 instrument_error(&err);
306 Self::internal_error(None, route_match.raw_route())
307 }
308 }
309 }
310
311 fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
313 let info = AppInfo::new(self.trigger_app.app());
314 let body = serde_json::to_vec_pretty(&info)?;
315 Ok(MatchedRoute::with_response_extension(
316 Response::builder()
317 .header("content-type", "application/json")
318 .body(body::full(body.into()))?,
319 route,
320 ))
321 }
322
323 fn internal_error(
325 body: Option<&str>,
326 route: impl Into<String>,
327 ) -> anyhow::Result<Response<Body>> {
328 let body = match body {
329 Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
330 None => body::empty(),
331 };
332
333 Ok(MatchedRoute::with_response_extension(
334 Response::builder()
335 .status(StatusCode::INTERNAL_SERVER_ERROR)
336 .body(body)?,
337 route,
338 ))
339 }
340
341 fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
343 use std::sync::atomic::{AtomicBool, Ordering};
344 static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
345 if let NotFoundRouteKind::Normal(route) = kind {
346 if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
347 && std::io::stderr().is_terminal()
348 {
349 terminal::warn!("Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route.");
350 }
351 }
352 Ok(Response::builder()
353 .status(StatusCode::NOT_FOUND)
354 .body(body::empty())?)
355 }
356
357 fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
358 self: Arc<Self>,
359 stream: S,
360 server_scheme: Scheme,
361 client_addr: SocketAddr,
362 ) {
363 task::spawn(async move {
364 if let Err(err) = http1::Builder::new()
365 .keep_alive(true)
366 .serve_connection(
367 TokioIo::new(stream),
368 service_fn(move |request| {
369 self.clone().instrumented_service_fn(
370 server_scheme.clone(),
371 client_addr,
372 request,
373 )
374 }),
375 )
376 .await
377 {
378 tracing::warn!("Error serving HTTP connection: {err:?}");
379 }
380 });
381 }
382
383 async fn instrumented_service_fn(
384 self: Arc<Self>,
385 server_scheme: Scheme,
386 client_addr: SocketAddr,
387 request: Request<Incoming>,
388 ) -> anyhow::Result<Response<HyperOutgoingBody>> {
389 let span = http_span!(request, client_addr);
390 let method = request.method().to_string();
391 async {
392 let result = self
393 .handle(
394 request.map(|body: Incoming| {
395 body.map_err(wasmtime_wasi_http::hyper_response_error)
396 .boxed()
397 }),
398 server_scheme,
399 client_addr,
400 )
401 .await;
402 finalize_http_span(result, method)
403 }
404 .instrument(span)
405 .await
406 }
407
408 fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
409 let local_addr = listener.local_addr()?;
410 let base_url = format!("{scheme}://{local_addr:?}");
411 terminal::step!("\nServing", "{base_url}");
412 tracing::info!("Serving {base_url}");
413
414 println!("Available Routes:");
415 for (route, component_id) in self.router.routes() {
416 println!(" {}: {}{}", component_id, base_url, route);
417 if let Some(component) = self.trigger_app.app().get_component(component_id) {
418 if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
419 println!(" {}", description);
420 }
421 }
422 }
423 Ok(())
424 }
425}
426
427fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
433 let uri = req.uri().clone();
434 let mut parts = uri.into_parts();
435 let headers = req.headers();
436 let header_authority = headers
437 .get(http::header::HOST)
438 .map(|h| -> anyhow::Result<Authority> {
439 let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
440 host_header
441 .parse()
442 .context("'Host' header contains an invalid authority")
443 })
444 .transpose()?;
445 let uri_authority = parts.authority;
446
447 let authority = match (header_authority, uri_authority) {
449 (None, None) => bail!("no 'Host' header present in request"),
450 (None, Some(a)) => a,
451 (Some(a), None) => a,
452 (Some(a1), Some(a2)) => {
453 if a1 != a2 {
456 return Err(anyhow::anyhow!(
457 "authority in 'Host' header does not match authority in URI"
458 ));
459 }
460 a1
461 }
462 };
463 parts.scheme = Some(scheme);
464 parts.authority = Some(authority);
465 *req.uri_mut() = Uri::from_parts(parts).unwrap();
466 Ok(())
467}
468
469pub(crate) trait HttpExecutor {
471 fn execute<F: RuntimeFactors>(
472 &self,
473 instance_builder: TriggerInstanceBuilder<F>,
474 route_match: &RouteMatch<'_, '_>,
475 req: Request<Body>,
476 client_addr: SocketAddr,
477 ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
478}