1use std::{
2 collections::HashMap,
3 future::Future,
4 io::{ErrorKind, IsTerminal},
5 net::SocketAddr,
6 sync::Arc,
7 time::Duration,
8};
9
10use anyhow::{Context, bail};
11use http::{
12 Request, Response, StatusCode, Uri,
13 uri::{Authority, Scheme},
14};
15use http_body_util::BodyExt;
16use hyper::{
17 body::{Bytes, Incoming},
18 service::service_fn,
19};
20use hyper_util::{
21 rt::{TokioExecutor, TokioIo},
22 server::conn::auto::Builder,
23};
24use rand::Rng;
25use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
26use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
27use spin_factors::RuntimeFactors;
28use spin_factors_executor::InstanceState;
29use spin_http::{
30 app_info::AppInfo,
31 body,
32 config::{HttpExecutorType, HttpTriggerConfig},
33 routes::{RouteInfo, RouteMatch, Router},
34 trigger::HandlerType,
35};
36use tokio::{
37 io::{AsyncRead, AsyncWrite},
38 net::TcpListener,
39 task,
40};
41use tracing::Instrument;
42use wasmtime::ToWasmtimeResult;
43use wasmtime_wasi::p2::bindings::CommandIndices;
44use wasmtime_wasi_http::handler::{HandlerState, StoreBundle};
45use wasmtime_wasi_http::p2::body::HyperOutgoingBody;
46
47use crate::{
48 Body, InstanceReuseConfig, NotFoundRouteKind, OutputFormat, TlsConfig, TriggerApp,
49 TriggerInstanceBuilder,
50 headers::strip_forbidden_headers,
51 instrument::{MatchedRoute, finalize_http_span, http_span, instrument_error},
52 outbound_http::OutboundHttpInterceptor,
53 spin::SpinHttpExecutor,
54 wagi::WagiHttpExecutor,
55 wasi::WasiHttpExecutor,
56 wasip3::Wasip3HttpExecutor,
57};
58
59pub const MAX_RETRIES: u16 = 10;
60
61pub struct HttpServer<F: RuntimeFactors> {
63 listen_addr: SocketAddr,
65 tls_config: Option<TlsConfig>,
67 http1_max_buf_size: Option<usize>,
69 find_free_port: bool,
71 output_format: OutputFormat,
73 router: Router,
75 trigger_app: Arc<TriggerApp<F>>,
77 component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
79 component_handler_types: HashMap<String, HandlerType<HttpHandlerState<F>>>,
81}
82
83impl<F: RuntimeFactors> HttpServer<F> {
84 pub fn new(
86 listen_addr: SocketAddr,
87 tls_config: Option<TlsConfig>,
88 find_free_port: bool,
89 trigger_app: TriggerApp<F>,
90 http1_max_buf_size: Option<usize>,
91 reuse_config: InstanceReuseConfig,
92 output_format: OutputFormat,
93 ) -> anyhow::Result<Self> {
94 let component_trigger_configs = trigger_app
96 .app()
97 .trigger_configs::<HttpTriggerConfig>("http")?
98 .into_iter()
99 .map(|(trigger_id, config)| config.lookup_key(trigger_id).map(|k| (k, config)))
100 .collect::<Result<Vec<_>, _>>()?;
101
102 let component_routes = component_trigger_configs
104 .iter()
105 .map(|(key, config)| (key, &config.route));
106 let mut duplicate_routes = Vec::new();
107 let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
108 if !duplicate_routes.is_empty() {
109 tracing::error!(
110 "The following component routes are duplicates and will never be used:"
111 );
112 for dup in &duplicate_routes {
113 tracing::error!(
114 " {}: {} (duplicate of {})",
115 dup.replaced_id,
116 dup.route(),
117 dup.effective_id,
118 );
119 }
120 }
121 if router.contains_reserved_route() {
122 tracing::error!(
123 "Routes under {} are handled by the Spin runtime and will never be reached",
124 spin_http::WELL_KNOWN_PREFIX
125 );
126 }
127 tracing::trace!(
128 "Constructed router: {:?}",
129 router.routes().collect::<Vec<_>>()
130 );
131
132 let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
134
135 let trigger_app = Arc::new(trigger_app);
136
137 let component_handler_types = component_trigger_configs
138 .iter()
139 .filter_map(|(key, trigger_config)| match key {
140 spin_http::routes::TriggerLookupKey::Component(component) => Some(
141 Self::handler_type_for_component(
142 &trigger_app,
143 component,
144 &trigger_config.executor,
145 reuse_config,
146 )
147 .map(|ht| (component.clone(), ht)),
148 ),
149 spin_http::routes::TriggerLookupKey::Trigger(_) => None,
150 })
151 .collect::<anyhow::Result<_>>()?;
152 Ok(Self {
153 listen_addr,
154 tls_config,
155 find_free_port,
156 router,
157 trigger_app,
158 http1_max_buf_size,
159 component_trigger_configs,
160 component_handler_types,
161 output_format,
162 })
163 }
164
165 fn handler_type_for_component(
166 trigger_app: &Arc<TriggerApp<F>>,
167 component_id: &str,
168 executor: &Option<HttpExecutorType>,
169 reuse_config: InstanceReuseConfig,
170 ) -> anyhow::Result<HandlerType<HttpHandlerState<F>>> {
171 let pre = trigger_app.get_instance_pre(component_id)?;
172 let handler_type = match executor {
173 None | Some(HttpExecutorType::Http) => HandlerType::from_instance_pre(
174 pre,
175 HttpHandlerState {
176 trigger_app: trigger_app.clone(),
177 component_id: component_id.into(),
178 reuse_config,
179 },
180 )?,
181 Some(HttpExecutorType::Wagi(wagi_config)) => {
182 anyhow::ensure!(
183 wagi_config.entrypoint == "_start",
184 "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
185 );
186 HandlerType::Wagi(
187 CommandIndices::new(pre)
188 .map_err(anyhow::Error::from)
189 .context("failed to find wasi command interface for wagi executor")?,
190 )
191 }
192 };
193 Ok(handler_type)
194 }
195
196 pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
198 let listener: TcpListener = if self.find_free_port {
199 self.search_for_free_port().await?
200 } else {
201 TcpListener::bind(self.listen_addr).await.map_err(|err| {
202 if err.kind() == ErrorKind::AddrInUse {
203 anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
204 } else {
205 anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
206 }
207 })?
208 };
209
210 if let Some(tls_config) = self.tls_config.clone() {
211 self.serve_https(listener, tls_config).await?;
212 } else {
213 self.serve_http(listener).await?;
214 }
215 Ok(())
216 }
217
218 async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
219 let mut found_listener = None;
220 let mut addr = self.listen_addr;
221
222 for _ in 1..=MAX_RETRIES {
223 if addr.port() == u16::MAX {
224 anyhow::bail!(
225 "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
226 );
227 }
228
229 match TcpListener::bind(addr).await {
230 Ok(listener) => {
231 found_listener = Some(listener);
232 break;
233 }
234 Err(err) if err.kind() == ErrorKind::AddrInUse => {
235 addr.set_port(addr.port() + 1);
236 continue;
237 }
238 Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
239 }
240 }
241
242 found_listener.ok_or_else(|| anyhow::anyhow!(
243 "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
244 self.listen_addr.port(),
245 self.listen_addr.port() + MAX_RETRIES
246 ))
247 }
248
249 async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
250 self.print_startup_msgs("http", &listener)?;
251 loop {
252 let (stream, client_addr) = listener.accept().await?;
253 self.clone()
254 .serve_connection(stream, Scheme::HTTP, client_addr);
255 }
256 }
257
258 async fn serve_https(
259 self: Arc<Self>,
260 listener: TcpListener,
261 tls_config: TlsConfig,
262 ) -> anyhow::Result<()> {
263 self.print_startup_msgs("https", &listener)?;
264 let acceptor = tls_config.server_config()?;
265 loop {
266 let (stream, client_addr) = listener.accept().await?;
267 match acceptor.accept(stream).await {
268 Ok(stream) => self
269 .clone()
270 .serve_connection(stream, Scheme::HTTPS, client_addr),
271 Err(err) => tracing::error!(?err, "Failed to start TLS session"),
272 }
273 }
274 }
275
276 pub async fn handle(
281 self: &Arc<Self>,
282 mut req: Request<Body>,
283 server_scheme: Scheme,
284 client_addr: SocketAddr,
285 ) -> anyhow::Result<Response<Body>> {
286 strip_forbidden_headers(&mut req);
287
288 spin_telemetry::extract_trace_context(&req);
289
290 let path = req.uri().path().to_string();
291
292 tracing::info!("Processing request on path '{path}'");
293
294 if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
296 return match well_known {
297 "health" => Ok(MatchedRoute::with_response_extension(
298 Response::new(body::full(Bytes::from_static(b"OK"))),
299 path,
300 )),
301 "info" => self.app_info(path),
302 _ => Self::not_found(NotFoundRouteKind::WellKnown),
303 };
304 }
305
306 match self.router.route(&path) {
307 Ok(route_match) => {
308 self.handle_trigger_route(req, route_match, server_scheme, client_addr)
309 .await
310 }
311 Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
312 }
313 }
314
315 pub async fn handle_trigger_route(
317 self: &Arc<Self>,
318 mut req: Request<Body>,
319 route_match: RouteMatch<'_, '_>,
320 server_scheme: Scheme,
321 client_addr: SocketAddr,
322 ) -> anyhow::Result<Response<Body>> {
323 set_req_uri(&mut req, server_scheme.clone())?;
324 let app_id = self
325 .trigger_app
326 .app()
327 .get_metadata(APP_NAME_KEY)?
328 .unwrap_or_else(|| "<unnamed>".into());
329
330 let lookup_key = route_match.lookup_key();
331
332 spin_telemetry::metrics::monotonic_counter!(
333 spin.request_count = 1,
334 trigger_type = "http",
335 app_id = app_id,
336 component_id = lookup_key.to_string()
337 );
338
339 let trigger_config = self
340 .component_trigger_configs
341 .get(lookup_key)
342 .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
343
344 match (&trigger_config.component, &trigger_config.static_response) {
345 (Some(component), None) => {
346 self.respond_wasm_component(
347 req,
348 route_match,
349 server_scheme,
350 client_addr,
351 component,
352 &trigger_config.executor,
353 )
354 .await
355 }
356 (None, Some(static_response)) => Self::respond_static_response(static_response),
357 (None, None) => Err(anyhow::anyhow!(
359 "Triggers must specify either component or static_response - neither is specified for {}",
360 route_match.raw_route()
361 )),
362 (Some(_), Some(_)) => Err(anyhow::anyhow!(
363 "Triggers must specify either component or static_response - both are specified for {}",
364 route_match.raw_route()
365 )),
366 }
367 }
368
369 async fn respond_wasm_component(
370 self: &Arc<Self>,
371 req: Request<Body>,
372 route_match: RouteMatch<'_, '_>,
373 server_scheme: Scheme,
374 client_addr: SocketAddr,
375 component_id: &str,
376 executor: &Option<HttpExecutorType>,
377 ) -> anyhow::Result<Response<Body>> {
378 let mut instance_builder = self.trigger_app.prepare(component_id)?;
379
380 let outbound_http = instance_builder
385 .factor_builder::<OutboundHttpFactor>()
386 .context(
387 "The wasi HTTP trigger was configured without the required wasi outbound http support",
388 )?;
389 let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
390 outbound_http.set_self_request_origin(origin);
391 outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
392
393 let handler_type = self
395 .component_handler_types
396 .get(component_id)
397 .with_context(|| format!("unknown component ID {component_id:?}"))?;
398 let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
399
400 let res = match executor {
401 HttpExecutorType::Http => match handler_type {
402 HandlerType::Spin => {
403 SpinHttpExecutor
404 .execute(instance_builder, &route_match, req, client_addr)
405 .await
406 }
407 HandlerType::Wasi0_3(_, handler) => {
408 Wasip3HttpExecutor(handler)
409 .execute(&route_match, req, client_addr)
410 .await
411 }
412 HandlerType::Wasi0_2(_)
413 | HandlerType::Wasi2023_11_10(_)
414 | HandlerType::Wasi2023_10_18(_) => {
415 WasiHttpExecutor { handler_type }
416 .execute(instance_builder, &route_match, req, client_addr)
417 .await
418 }
419 HandlerType::Wagi(_) => unreachable!(),
420 },
421 HttpExecutorType::Wagi(wagi_config) => {
422 let indices = match handler_type {
423 HandlerType::Wagi(indices) => indices,
424 _ => unreachable!(),
425 };
426 let executor = WagiHttpExecutor {
427 wagi_config,
428 indices,
429 };
430 executor
431 .execute(instance_builder, &route_match, req, client_addr)
432 .await
433 }
434 };
435 match res {
436 Ok(res) => Ok(MatchedRoute::with_response_extension(
437 res,
438 route_match.raw_route(),
439 )),
440 Err(err) => {
441 tracing::error!("Error processing request: {err:?}");
442 instrument_error(&err);
443 Self::internal_error(None, route_match.raw_route())
444 }
445 }
446 }
447
448 fn respond_static_response(
449 sr: &spin_http::config::StaticResponse,
450 ) -> anyhow::Result<Response<Body>> {
451 let mut response = Response::builder();
452
453 response = response.status(sr.status());
454 for (header_name, header_value) in sr.headers() {
455 response = response.header(header_name, header_value);
456 }
457
458 let body = match sr.body() {
459 Some(b) => body::full(b.clone().into()),
460 None => body::empty(),
461 };
462
463 Ok(response.body(body)?)
464 }
465
466 fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
468 let info = AppInfo::new(self.trigger_app.app());
469 let body = serde_json::to_vec_pretty(&info)?;
470 Ok(MatchedRoute::with_response_extension(
471 Response::builder()
472 .header("content-type", "application/json")
473 .body(body::full(body.into()))?,
474 route,
475 ))
476 }
477
478 fn internal_error(
480 body: Option<&str>,
481 route: impl Into<String>,
482 ) -> anyhow::Result<Response<Body>> {
483 let body = match body {
484 Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
485 None => body::empty(),
486 };
487
488 Ok(MatchedRoute::with_response_extension(
489 Response::builder()
490 .status(StatusCode::INTERNAL_SERVER_ERROR)
491 .body(body)?,
492 route,
493 ))
494 }
495
496 fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
498 use std::sync::atomic::{AtomicBool, Ordering};
499 static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
500 if let NotFoundRouteKind::Normal(route) = kind
501 && !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
502 && std::io::stderr().is_terminal()
503 {
504 terminal::warn!(
505 "Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."
506 );
507 }
508 Ok(Response::builder()
509 .status(StatusCode::NOT_FOUND)
510 .body(body::empty())?)
511 }
512
513 fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
514 self: Arc<Self>,
515 stream: S,
516 server_scheme: Scheme,
517 client_addr: SocketAddr,
518 ) {
519 task::spawn(async move {
520 let mut server_builder = Builder::new(TokioExecutor::new());
521
522 if let Some(http1_max_buf_size) = self.http1_max_buf_size {
523 server_builder.http1().max_buf_size(http1_max_buf_size);
524 }
525
526 if let Err(err) = server_builder
527 .serve_connection(
528 TokioIo::new(stream),
529 service_fn(move |request| {
530 self.clone().instrumented_service_fn(
531 server_scheme.clone(),
532 client_addr,
533 request,
534 )
535 }),
536 )
537 .await
538 {
539 tracing::warn!("Error serving HTTP connection: {err:?}");
540 }
541 });
542 }
543
544 async fn instrumented_service_fn(
545 self: Arc<Self>,
546 server_scheme: Scheme,
547 client_addr: SocketAddr,
548 request: Request<Incoming>,
549 ) -> anyhow::Result<Response<HyperOutgoingBody>> {
550 let span = http_span!(request, client_addr);
551 let method = request.method().to_string();
552 async {
553 let result = self
554 .handle(
555 request.map(|body: Incoming| {
556 body.map_err(wasmtime_wasi_http::p2::hyper_response_error)
557 .boxed_unsync()
558 }),
559 server_scheme,
560 client_addr,
561 )
562 .await;
563 finalize_http_span(result, method)
564 }
565 .instrument(span)
566 .await
567 }
568
569 fn get_description_for_route(
570 &self,
571 key: &spin_http::routes::TriggerLookupKey,
572 ) -> anyhow::Result<Option<String>> {
573 if let spin_http::routes::TriggerLookupKey::Component(component_id) = key {
574 self.trigger_app
575 .app()
576 .get_component(component_id)
577 .and_then(|c| c.get_metadata(APP_DESCRIPTION_KEY).transpose())
578 .transpose()
579 .map_err(Into::into)
580 } else {
581 Ok(None)
582 }
583 }
584
585 fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
586 let local_addr = listener.local_addr()?;
587 let base_url = format!("{scheme}://{local_addr:?}");
588 tracing::info!("Serving {base_url}");
589
590 match self.output_format {
591 OutputFormat::Plain => {
592 terminal::step!("\nServing", "{base_url}");
593 println!("Available Routes:");
594 for (route, key) in self.router.routes() {
595 println!(" {key}: {base_url}{route}");
596 if let Some(description) = self.get_description_for_route(key)? {
597 println!(" {description}");
598 }
599 }
600 }
601 OutputFormat::Json => {
602 #[derive(serde::Serialize)]
603 struct RoutesOutput {
604 base_url: String,
605 routes: Vec<RouteEntry>,
606 }
607
608 #[derive(serde::Serialize)]
609 struct RouteEntry {
610 id: String,
611 route: String,
612 wildcard: bool,
613 #[serde(skip_serializing_if = "Option::is_none")]
614 description: Option<String>,
615 }
616 let mut routes = Vec::new();
617 for (route, key) in self.router.routes() {
618 routes.push(RouteEntry {
619 id: key.to_string(),
620 route: route.path().to_string(),
621 wildcard: route.is_wildcard(),
622 description: self.get_description_for_route(key)?,
623 });
624 }
625
626 let output = RoutesOutput { base_url, routes };
627 println!("{}", serde_json::to_string_pretty(&output)?);
628 }
629 }
630 Ok(())
631 }
632}
633
634fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
640 let uri = req.uri().clone();
641 let mut parts = uri.into_parts();
642 let headers = req.headers();
643 let header_authority = headers
644 .get(http::header::HOST)
645 .map(|h| -> anyhow::Result<Authority> {
646 let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
647 host_header
648 .parse()
649 .context("'Host' header contains an invalid authority")
650 })
651 .transpose()?;
652 let uri_authority = parts.authority;
653
654 let authority = match (header_authority, uri_authority) {
656 (None, None) => bail!("no 'Host' header present in request"),
657 (None, Some(a)) => a,
658 (Some(a), None) => a,
659 (Some(a1), Some(a2)) => {
660 if a1 != a2 {
663 return Err(anyhow::anyhow!(
664 "authority in 'Host' header does not match authority in URI"
665 ));
666 }
667 a1
668 }
669 };
670 parts.scheme = Some(scheme);
671 parts.authority = Some(authority);
672 *req.uri_mut() = Uri::from_parts(parts).unwrap();
673 Ok(())
674}
675
676pub(crate) trait HttpExecutor {
678 fn execute<F: RuntimeFactors>(
679 &self,
680 instance_builder: TriggerInstanceBuilder<F>,
681 route_match: &RouteMatch<'_, '_>,
682 req: Request<Body>,
683 client_addr: SocketAddr,
684 ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
685}
686
687pub(crate) struct HttpHandlerState<F: RuntimeFactors> {
688 trigger_app: Arc<TriggerApp<F>>,
689 component_id: String,
690 reuse_config: InstanceReuseConfig,
691}
692
693impl<F: RuntimeFactors> HandlerState for HttpHandlerState<F> {
694 type StoreData = InstanceState<F::InstanceState, ()>;
695
696 fn new_store(&self, _req_id: Option<u64>) -> wasmtime::Result<StoreBundle<Self::StoreData>> {
697 Ok(StoreBundle {
698 store: self
699 .trigger_app
700 .prepare(&self.component_id)
701 .to_wasmtime_result()?
702 .instantiate_store(())
703 .to_wasmtime_result()?
704 .into_inner(),
705 write_profile: Box::new(|_| ()),
706 })
707 }
708
709 fn request_timeout(&self) -> Duration {
710 self.reuse_config
711 .request_timeout
712 .map(|range| rand::rng().random_range(range))
713 .unwrap_or(Duration::MAX)
714 }
715
716 fn idle_instance_timeout(&self) -> Duration {
717 rand::rng().random_range(self.reuse_config.idle_instance_timeout)
718 }
719
720 fn max_instance_reuse_count(&self) -> usize {
721 rand::rng().random_range(self.reuse_config.max_instance_reuse_count)
722 }
723
724 fn max_instance_concurrent_reuse_count(&self) -> usize {
725 rand::rng().random_range(self.reuse_config.max_instance_concurrent_reuse_count)
726 }
727
728 fn handle_worker_error(&self, error: wasmtime::Error) {
729 tracing::warn!("worker error: {error:?}")
730 }
731}