1use std::{
2 collections::HashMap,
3 future::Future,
4 io::{ErrorKind, IsTerminal},
5 net::SocketAddr,
6 sync::Arc,
7};
8
9use anyhow::{bail, Context};
10use http::{
11 uri::{Authority, Scheme},
12 Request, Response, StatusCode, Uri,
13};
14use http_body_util::BodyExt;
15use hyper::{
16 body::{Bytes, Incoming},
17 service::service_fn,
18};
19use hyper_util::{
20 rt::{TokioExecutor, TokioIo},
21 server::conn::auto::Builder,
22};
23use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
24use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
25use spin_factors::RuntimeFactors;
26use spin_http::{
27 app_info::AppInfo,
28 body,
29 config::{HttpExecutorType, HttpTriggerConfig},
30 routes::{RouteMatch, Router},
31 trigger::HandlerType,
32};
33use tokio::{
34 io::{AsyncRead, AsyncWrite},
35 net::TcpListener,
36 task,
37};
38use tracing::Instrument;
39use wasmtime_wasi::p2::bindings::CommandIndices;
40use wasmtime_wasi_http::body::HyperOutgoingBody;
41
42use crate::{
43 headers::strip_forbidden_headers,
44 instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
45 outbound_http::OutboundHttpInterceptor,
46 spin::SpinHttpExecutor,
47 wagi::WagiHttpExecutor,
48 wasi::WasiHttpExecutor,
49 wasip3::Wasip3HttpExecutor,
50 Body, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
51};
52
53pub const MAX_RETRIES: u16 = 10;
54
55pub struct HttpServer<F: RuntimeFactors> {
57 listen_addr: SocketAddr,
59 tls_config: Option<TlsConfig>,
61 http1_max_buf_size: Option<usize>,
63 find_free_port: bool,
65 router: Router,
67 trigger_app: TriggerApp<F>,
69 component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
71 component_handler_types: HashMap<String, HandlerType>,
73}
74
75impl<F: RuntimeFactors> HttpServer<F> {
76 pub fn new(
78 listen_addr: SocketAddr,
79 tls_config: Option<TlsConfig>,
80 find_free_port: bool,
81 trigger_app: TriggerApp<F>,
82 http1_max_buf_size: Option<usize>,
83 ) -> anyhow::Result<Self> {
84 let component_trigger_configs = trigger_app
86 .app()
87 .trigger_configs::<HttpTriggerConfig>("http")?
88 .into_iter()
89 .map(|(trigger_id, config)| config.lookup_key(trigger_id).map(|k| (k, config)))
90 .collect::<Result<Vec<_>, _>>()?;
91
92 let component_routes = component_trigger_configs
94 .iter()
95 .map(|(key, config)| (key, &config.route));
96 let mut duplicate_routes = Vec::new();
97 let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
98 if !duplicate_routes.is_empty() {
99 tracing::error!(
100 "The following component routes are duplicates and will never be used:"
101 );
102 for dup in &duplicate_routes {
103 tracing::error!(
104 " {}: {} (duplicate of {})",
105 dup.replaced_id,
106 dup.route(),
107 dup.effective_id,
108 );
109 }
110 }
111 if router.contains_reserved_route() {
112 tracing::error!(
113 "Routes under {} are handled by the Spin runtime and will never be reached",
114 spin_http::WELL_KNOWN_PREFIX
115 );
116 }
117 tracing::trace!(
118 "Constructed router: {:?}",
119 router.routes().collect::<Vec<_>>()
120 );
121
122 let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
124
125 let component_handler_types = component_trigger_configs
126 .iter()
127 .filter_map(|(key, trigger_config)| match key {
128 spin_http::routes::TriggerLookupKey::Component(component) => Some(
129 Self::handler_type_for_component(
130 &trigger_app,
131 component,
132 &trigger_config.executor,
133 )
134 .map(|ht| (component.clone(), ht)),
135 ),
136 spin_http::routes::TriggerLookupKey::Trigger(_) => None,
137 })
138 .collect::<anyhow::Result<_>>()?;
139 Ok(Self {
140 listen_addr,
141 tls_config,
142 find_free_port,
143 router,
144 trigger_app,
145 http1_max_buf_size,
146 component_trigger_configs,
147 component_handler_types,
148 })
149 }
150
151 fn handler_type_for_component(
152 trigger_app: &TriggerApp<F>,
153 component_id: &str,
154 executor: &Option<HttpExecutorType>,
155 ) -> anyhow::Result<HandlerType> {
156 let pre = trigger_app.get_instance_pre(component_id)?;
157 let handler_type = match executor {
158 None | Some(HttpExecutorType::Http) | Some(HttpExecutorType::Wasip3Unstable) => {
159 let handler_type = HandlerType::from_instance_pre(pre)?;
160 handler_type.validate_executor(executor)?;
161 handler_type
162 }
163 Some(HttpExecutorType::Wagi(wagi_config)) => {
164 anyhow::ensure!(
165 wagi_config.entrypoint == "_start",
166 "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
167 );
168 HandlerType::Wagi(
169 CommandIndices::new(pre)
170 .context("failed to find wasi command interface for wagi executor")?,
171 )
172 }
173 };
174 Ok(handler_type)
175 }
176
177 pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
179 let listener: TcpListener = if self.find_free_port {
180 self.search_for_free_port().await?
181 } else {
182 TcpListener::bind(self.listen_addr).await.map_err(|err| {
183 if err.kind() == ErrorKind::AddrInUse {
184 anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
185 } else {
186 anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
187 }
188 })?
189 };
190
191 if let Some(tls_config) = self.tls_config.clone() {
192 self.serve_https(listener, tls_config).await?;
193 } else {
194 self.serve_http(listener).await?;
195 }
196 Ok(())
197 }
198
199 async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
200 let mut found_listener = None;
201 let mut addr = self.listen_addr;
202
203 for _ in 1..=MAX_RETRIES {
204 if addr.port() == u16::MAX {
205 anyhow::bail!(
206 "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
207 );
208 }
209
210 match TcpListener::bind(addr).await {
211 Ok(listener) => {
212 found_listener = Some(listener);
213 break;
214 }
215 Err(err) if err.kind() == ErrorKind::AddrInUse => {
216 addr.set_port(addr.port() + 1);
217 continue;
218 }
219 Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
220 }
221 }
222
223 found_listener.ok_or_else(|| anyhow::anyhow!(
224 "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
225 self.listen_addr.port(),
226 self.listen_addr.port() + MAX_RETRIES
227 ))
228 }
229
230 async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
231 self.print_startup_msgs("http", &listener)?;
232 loop {
233 let (stream, client_addr) = listener.accept().await?;
234 self.clone()
235 .serve_connection(stream, Scheme::HTTP, client_addr);
236 }
237 }
238
239 async fn serve_https(
240 self: Arc<Self>,
241 listener: TcpListener,
242 tls_config: TlsConfig,
243 ) -> anyhow::Result<()> {
244 self.print_startup_msgs("https", &listener)?;
245 let acceptor = tls_config.server_config()?;
246 loop {
247 let (stream, client_addr) = listener.accept().await?;
248 match acceptor.accept(stream).await {
249 Ok(stream) => self
250 .clone()
251 .serve_connection(stream, Scheme::HTTPS, client_addr),
252 Err(err) => tracing::error!(?err, "Failed to start TLS session"),
253 }
254 }
255 }
256
257 pub async fn handle(
262 self: &Arc<Self>,
263 mut req: Request<Body>,
264 server_scheme: Scheme,
265 client_addr: SocketAddr,
266 ) -> anyhow::Result<Response<Body>> {
267 strip_forbidden_headers(&mut req);
268
269 spin_telemetry::extract_trace_context(&req);
270
271 let path = req.uri().path().to_string();
272
273 tracing::info!("Processing request on path '{path}'");
274
275 if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
277 return match well_known {
278 "health" => Ok(MatchedRoute::with_response_extension(
279 Response::new(body::full(Bytes::from_static(b"OK"))),
280 path,
281 )),
282 "info" => self.app_info(path),
283 _ => Self::not_found(NotFoundRouteKind::WellKnown),
284 };
285 }
286
287 match self.router.route(&path) {
288 Ok(route_match) => {
289 self.handle_trigger_route(req, route_match, server_scheme, client_addr)
290 .await
291 }
292 Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
293 }
294 }
295
296 pub async fn handle_trigger_route(
298 self: &Arc<Self>,
299 mut req: Request<Body>,
300 route_match: RouteMatch<'_, '_>,
301 server_scheme: Scheme,
302 client_addr: SocketAddr,
303 ) -> anyhow::Result<Response<Body>> {
304 set_req_uri(&mut req, server_scheme.clone())?;
305 let app_id = self
306 .trigger_app
307 .app()
308 .get_metadata(APP_NAME_KEY)?
309 .unwrap_or_else(|| "<unnamed>".into());
310
311 let lookup_key = route_match.lookup_key();
312
313 spin_telemetry::metrics::monotonic_counter!(
314 spin.request_count = 1,
315 trigger_type = "http",
316 app_id = app_id,
317 component_id = lookup_key.to_string()
318 );
319
320 let trigger_config = self
321 .component_trigger_configs
322 .get(lookup_key)
323 .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
324
325 match (&trigger_config.component, &trigger_config.static_response) {
326 (Some(component), None) => {
327 self.respond_wasm_component(
328 req,
329 route_match,
330 server_scheme,
331 client_addr,
332 component,
333 &trigger_config.executor,
334 )
335 .await
336 }
337 (None, Some(static_response)) => {
338 Self::respond_static_response(static_response)
339 },
340 (None, None) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - neither is specified for {}", route_match.raw_route())),
342 (Some(_), Some(_)) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - both are specified for {}", route_match.raw_route())),
343 }
344 }
345
346 async fn respond_wasm_component(
347 self: &Arc<Self>,
348 req: Request<Body>,
349 route_match: RouteMatch<'_, '_>,
350 server_scheme: Scheme,
351 client_addr: SocketAddr,
352 component_id: &str,
353 executor: &Option<HttpExecutorType>,
354 ) -> anyhow::Result<Response<Body>> {
355 let mut instance_builder = self.trigger_app.prepare(component_id)?;
356
357 let outbound_http = instance_builder
362 .factor_builder::<OutboundHttpFactor>()
363 .context(
364 "The wasi HTTP trigger was configured without the required wasi outbound http support",
365 )?;
366 let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
367 outbound_http.set_self_request_origin(origin);
368 outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
369
370 let handler_type = self
372 .component_handler_types
373 .get(component_id)
374 .with_context(|| format!("unknown component ID {component_id:?}"))?;
375 let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
376
377 let res = match executor {
378 HttpExecutorType::Http | HttpExecutorType::Wasip3Unstable => match handler_type {
379 HandlerType::Spin => {
380 SpinHttpExecutor
381 .execute(instance_builder, &route_match, req, client_addr)
382 .await
383 }
384 HandlerType::Wasi0_3(indices) => {
385 Wasip3HttpExecutor(indices)
386 .execute(instance_builder, &route_match, req, client_addr)
387 .await
388 }
389 HandlerType::Wasi0_2(_)
390 | HandlerType::Wasi2023_11_10(_)
391 | HandlerType::Wasi2023_10_18(_) => {
392 WasiHttpExecutor { handler_type }
393 .execute(instance_builder, &route_match, req, client_addr)
394 .await
395 }
396 HandlerType::Wagi(_) => unreachable!(),
397 },
398 HttpExecutorType::Wagi(wagi_config) => {
399 let indices = match handler_type {
400 HandlerType::Wagi(indices) => indices,
401 _ => unreachable!(),
402 };
403 let executor = WagiHttpExecutor {
404 wagi_config,
405 indices,
406 };
407 executor
408 .execute(instance_builder, &route_match, req, client_addr)
409 .await
410 }
411 };
412 match res {
413 Ok(res) => Ok(MatchedRoute::with_response_extension(
414 res,
415 route_match.raw_route(),
416 )),
417 Err(err) => {
418 tracing::error!("Error processing request: {err:?}");
419 instrument_error(&err);
420 Self::internal_error(None, route_match.raw_route())
421 }
422 }
423 }
424
425 fn respond_static_response(
426 sr: &spin_http::config::StaticResponse,
427 ) -> anyhow::Result<Response<Body>> {
428 let mut response = Response::builder();
429
430 response = response.status(sr.status());
431 for (header_name, header_value) in sr.headers() {
432 response = response.header(header_name, header_value);
433 }
434
435 let body = match sr.body() {
436 Some(b) => body::full(b.clone().into()),
437 None => body::empty(),
438 };
439
440 Ok(response.body(body)?)
441 }
442
443 fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
445 let info = AppInfo::new(self.trigger_app.app());
446 let body = serde_json::to_vec_pretty(&info)?;
447 Ok(MatchedRoute::with_response_extension(
448 Response::builder()
449 .header("content-type", "application/json")
450 .body(body::full(body.into()))?,
451 route,
452 ))
453 }
454
455 fn internal_error(
457 body: Option<&str>,
458 route: impl Into<String>,
459 ) -> anyhow::Result<Response<Body>> {
460 let body = match body {
461 Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
462 None => body::empty(),
463 };
464
465 Ok(MatchedRoute::with_response_extension(
466 Response::builder()
467 .status(StatusCode::INTERNAL_SERVER_ERROR)
468 .body(body)?,
469 route,
470 ))
471 }
472
473 fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
475 use std::sync::atomic::{AtomicBool, Ordering};
476 static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
477 if let NotFoundRouteKind::Normal(route) = kind {
478 if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
479 && std::io::stderr().is_terminal()
480 {
481 terminal::warn!(
482 "Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."
483 );
484 }
485 }
486 Ok(Response::builder()
487 .status(StatusCode::NOT_FOUND)
488 .body(body::empty())?)
489 }
490
491 fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
492 self: Arc<Self>,
493 stream: S,
494 server_scheme: Scheme,
495 client_addr: SocketAddr,
496 ) {
497 task::spawn(async move {
498 let mut server_builder = Builder::new(TokioExecutor::new());
499
500 if let Some(http1_max_buf_size) = self.http1_max_buf_size {
501 server_builder.http1().max_buf_size(http1_max_buf_size);
502 }
503
504 if let Err(err) = server_builder
505 .serve_connection(
506 TokioIo::new(stream),
507 service_fn(move |request| {
508 self.clone().instrumented_service_fn(
509 server_scheme.clone(),
510 client_addr,
511 request,
512 )
513 }),
514 )
515 .await
516 {
517 tracing::warn!("Error serving HTTP connection: {err:?}");
518 }
519 });
520 }
521
522 async fn instrumented_service_fn(
523 self: Arc<Self>,
524 server_scheme: Scheme,
525 client_addr: SocketAddr,
526 request: Request<Incoming>,
527 ) -> anyhow::Result<Response<HyperOutgoingBody>> {
528 let span = http_span!(request, client_addr);
529 let method = request.method().to_string();
530 async {
531 let result = self
532 .handle(
533 request.map(|body: Incoming| {
534 body.map_err(wasmtime_wasi_http::hyper_response_error)
535 .boxed()
536 }),
537 server_scheme,
538 client_addr,
539 )
540 .await;
541 finalize_http_span(result, method)
542 }
543 .instrument(span)
544 .await
545 }
546
547 fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
548 let local_addr = listener.local_addr()?;
549 let base_url = format!("{scheme}://{local_addr:?}");
550 terminal::step!("\nServing", "{base_url}");
551 tracing::info!("Serving {base_url}");
552
553 println!("Available Routes:");
554 for (route, key) in self.router.routes() {
555 println!(" {key}: {base_url}{route}");
556 if let spin_http::routes::TriggerLookupKey::Component(component_id) = &key {
557 if let Some(component) = self.trigger_app.app().get_component(component_id) {
558 if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
559 println!(" {description}");
560 }
561 }
562 }
563 }
564 Ok(())
565 }
566}
567
568fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
574 let uri = req.uri().clone();
575 let mut parts = uri.into_parts();
576 let headers = req.headers();
577 let header_authority = headers
578 .get(http::header::HOST)
579 .map(|h| -> anyhow::Result<Authority> {
580 let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
581 host_header
582 .parse()
583 .context("'Host' header contains an invalid authority")
584 })
585 .transpose()?;
586 let uri_authority = parts.authority;
587
588 let authority = match (header_authority, uri_authority) {
590 (None, None) => bail!("no 'Host' header present in request"),
591 (None, Some(a)) => a,
592 (Some(a), None) => a,
593 (Some(a1), Some(a2)) => {
594 if a1 != a2 {
597 return Err(anyhow::anyhow!(
598 "authority in 'Host' header does not match authority in URI"
599 ));
600 }
601 a1
602 }
603 };
604 parts.scheme = Some(scheme);
605 parts.authority = Some(authority);
606 *req.uri_mut() = Uri::from_parts(parts).unwrap();
607 Ok(())
608}
609
610pub(crate) trait HttpExecutor {
612 fn execute<F: RuntimeFactors>(
613 &self,
614 instance_builder: TriggerInstanceBuilder<F>,
615 route_match: &RouteMatch<'_, '_>,
616 req: Request<Body>,
617 client_addr: SocketAddr,
618 ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
619}