1use std::{
2 collections::HashMap,
3 future::Future,
4 io::{ErrorKind, IsTerminal},
5 net::SocketAddr,
6 sync::Arc,
7 time::Duration,
8};
9
10use anyhow::{bail, Context};
11use http::{
12 uri::{Authority, Scheme},
13 Request, Response, StatusCode, Uri,
14};
15use http_body_util::BodyExt;
16use hyper::{
17 body::{Bytes, Incoming},
18 service::service_fn,
19};
20use hyper_util::{
21 rt::{TokioExecutor, TokioIo},
22 server::conn::auto::Builder,
23};
24use rand::Rng;
25use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
26use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
27use spin_factors::RuntimeFactors;
28use spin_factors_executor::InstanceState;
29use spin_http::{
30 app_info::AppInfo,
31 body,
32 config::{HttpExecutorType, HttpTriggerConfig},
33 routes::{RouteMatch, Router},
34 trigger::HandlerType,
35};
36use tokio::{
37 io::{AsyncRead, AsyncWrite},
38 net::TcpListener,
39 task,
40};
41use tracing::Instrument;
42use wasmtime_wasi::p2::bindings::CommandIndices;
43use wasmtime_wasi_http::body::HyperOutgoingBody;
44use wasmtime_wasi_http::handler::{HandlerState, StoreBundle};
45
46use crate::{
47 headers::strip_forbidden_headers,
48 instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
49 outbound_http::OutboundHttpInterceptor,
50 spin::SpinHttpExecutor,
51 wagi::WagiHttpExecutor,
52 wasi::WasiHttpExecutor,
53 wasip3::Wasip3HttpExecutor,
54 Body, InstanceReuseConfig, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
55};
56
57pub const MAX_RETRIES: u16 = 10;
58
59pub struct HttpServer<F: RuntimeFactors> {
61 listen_addr: SocketAddr,
63 tls_config: Option<TlsConfig>,
65 http1_max_buf_size: Option<usize>,
67 find_free_port: bool,
69 router: Router,
71 trigger_app: Arc<TriggerApp<F>>,
73 component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
75 component_handler_types: HashMap<String, HandlerType<HttpHandlerState<F>>>,
77}
78
79impl<F: RuntimeFactors> HttpServer<F> {
80 pub fn new(
82 listen_addr: SocketAddr,
83 tls_config: Option<TlsConfig>,
84 find_free_port: bool,
85 trigger_app: TriggerApp<F>,
86 http1_max_buf_size: Option<usize>,
87 reuse_config: InstanceReuseConfig,
88 ) -> anyhow::Result<Self> {
89 let component_trigger_configs = trigger_app
91 .app()
92 .trigger_configs::<HttpTriggerConfig>("http")?
93 .into_iter()
94 .map(|(trigger_id, config)| config.lookup_key(trigger_id).map(|k| (k, config)))
95 .collect::<Result<Vec<_>, _>>()?;
96
97 let component_routes = component_trigger_configs
99 .iter()
100 .map(|(key, config)| (key, &config.route));
101 let mut duplicate_routes = Vec::new();
102 let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
103 if !duplicate_routes.is_empty() {
104 tracing::error!(
105 "The following component routes are duplicates and will never be used:"
106 );
107 for dup in &duplicate_routes {
108 tracing::error!(
109 " {}: {} (duplicate of {})",
110 dup.replaced_id,
111 dup.route(),
112 dup.effective_id,
113 );
114 }
115 }
116 if router.contains_reserved_route() {
117 tracing::error!(
118 "Routes under {} are handled by the Spin runtime and will never be reached",
119 spin_http::WELL_KNOWN_PREFIX
120 );
121 }
122 tracing::trace!(
123 "Constructed router: {:?}",
124 router.routes().collect::<Vec<_>>()
125 );
126
127 let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
129
130 let trigger_app = Arc::new(trigger_app);
131
132 let component_handler_types = component_trigger_configs
133 .iter()
134 .filter_map(|(key, trigger_config)| match key {
135 spin_http::routes::TriggerLookupKey::Component(component) => Some(
136 Self::handler_type_for_component(
137 &trigger_app,
138 component,
139 &trigger_config.executor,
140 reuse_config,
141 )
142 .map(|ht| (component.clone(), ht)),
143 ),
144 spin_http::routes::TriggerLookupKey::Trigger(_) => None,
145 })
146 .collect::<anyhow::Result<_>>()?;
147 Ok(Self {
148 listen_addr,
149 tls_config,
150 find_free_port,
151 router,
152 trigger_app,
153 http1_max_buf_size,
154 component_trigger_configs,
155 component_handler_types,
156 })
157 }
158
159 fn handler_type_for_component(
160 trigger_app: &Arc<TriggerApp<F>>,
161 component_id: &str,
162 executor: &Option<HttpExecutorType>,
163 reuse_config: InstanceReuseConfig,
164 ) -> anyhow::Result<HandlerType<HttpHandlerState<F>>> {
165 let pre = trigger_app.get_instance_pre(component_id)?;
166 let handler_type = match executor {
167 None | Some(HttpExecutorType::Http) | Some(HttpExecutorType::Wasip3Unstable) => {
168 let handler_type = HandlerType::from_instance_pre(
169 pre,
170 HttpHandlerState {
171 trigger_app: trigger_app.clone(),
172 component_id: component_id.into(),
173 reuse_config,
174 },
175 )?;
176 handler_type.validate_executor(executor)?;
177 handler_type
178 }
179 Some(HttpExecutorType::Wagi(wagi_config)) => {
180 anyhow::ensure!(
181 wagi_config.entrypoint == "_start",
182 "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
183 );
184 HandlerType::Wagi(
185 CommandIndices::new(pre)
186 .context("failed to find wasi command interface for wagi executor")?,
187 )
188 }
189 };
190 Ok(handler_type)
191 }
192
193 pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
195 let listener: TcpListener = if self.find_free_port {
196 self.search_for_free_port().await?
197 } else {
198 TcpListener::bind(self.listen_addr).await.map_err(|err| {
199 if err.kind() == ErrorKind::AddrInUse {
200 anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
201 } else {
202 anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
203 }
204 })?
205 };
206
207 if let Some(tls_config) = self.tls_config.clone() {
208 self.serve_https(listener, tls_config).await?;
209 } else {
210 self.serve_http(listener).await?;
211 }
212 Ok(())
213 }
214
215 async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
216 let mut found_listener = None;
217 let mut addr = self.listen_addr;
218
219 for _ in 1..=MAX_RETRIES {
220 if addr.port() == u16::MAX {
221 anyhow::bail!(
222 "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
223 );
224 }
225
226 match TcpListener::bind(addr).await {
227 Ok(listener) => {
228 found_listener = Some(listener);
229 break;
230 }
231 Err(err) if err.kind() == ErrorKind::AddrInUse => {
232 addr.set_port(addr.port() + 1);
233 continue;
234 }
235 Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
236 }
237 }
238
239 found_listener.ok_or_else(|| anyhow::anyhow!(
240 "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
241 self.listen_addr.port(),
242 self.listen_addr.port() + MAX_RETRIES
243 ))
244 }
245
246 async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
247 self.print_startup_msgs("http", &listener)?;
248 loop {
249 let (stream, client_addr) = listener.accept().await?;
250 self.clone()
251 .serve_connection(stream, Scheme::HTTP, client_addr);
252 }
253 }
254
255 async fn serve_https(
256 self: Arc<Self>,
257 listener: TcpListener,
258 tls_config: TlsConfig,
259 ) -> anyhow::Result<()> {
260 self.print_startup_msgs("https", &listener)?;
261 let acceptor = tls_config.server_config()?;
262 loop {
263 let (stream, client_addr) = listener.accept().await?;
264 match acceptor.accept(stream).await {
265 Ok(stream) => self
266 .clone()
267 .serve_connection(stream, Scheme::HTTPS, client_addr),
268 Err(err) => tracing::error!(?err, "Failed to start TLS session"),
269 }
270 }
271 }
272
273 pub async fn handle(
278 self: &Arc<Self>,
279 mut req: Request<Body>,
280 server_scheme: Scheme,
281 client_addr: SocketAddr,
282 ) -> anyhow::Result<Response<Body>> {
283 strip_forbidden_headers(&mut req);
284
285 spin_telemetry::extract_trace_context(&req);
286
287 let path = req.uri().path().to_string();
288
289 tracing::info!("Processing request on path '{path}'");
290
291 if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
293 return match well_known {
294 "health" => Ok(MatchedRoute::with_response_extension(
295 Response::new(body::full(Bytes::from_static(b"OK"))),
296 path,
297 )),
298 "info" => self.app_info(path),
299 _ => Self::not_found(NotFoundRouteKind::WellKnown),
300 };
301 }
302
303 match self.router.route(&path) {
304 Ok(route_match) => {
305 self.handle_trigger_route(req, route_match, server_scheme, client_addr)
306 .await
307 }
308 Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
309 }
310 }
311
312 pub async fn handle_trigger_route(
314 self: &Arc<Self>,
315 mut req: Request<Body>,
316 route_match: RouteMatch<'_, '_>,
317 server_scheme: Scheme,
318 client_addr: SocketAddr,
319 ) -> anyhow::Result<Response<Body>> {
320 set_req_uri(&mut req, server_scheme.clone())?;
321 let app_id = self
322 .trigger_app
323 .app()
324 .get_metadata(APP_NAME_KEY)?
325 .unwrap_or_else(|| "<unnamed>".into());
326
327 let lookup_key = route_match.lookup_key();
328
329 spin_telemetry::metrics::monotonic_counter!(
330 spin.request_count = 1,
331 trigger_type = "http",
332 app_id = app_id,
333 component_id = lookup_key.to_string()
334 );
335
336 let trigger_config = self
337 .component_trigger_configs
338 .get(lookup_key)
339 .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
340
341 match (&trigger_config.component, &trigger_config.static_response) {
342 (Some(component), None) => {
343 self.respond_wasm_component(
344 req,
345 route_match,
346 server_scheme,
347 client_addr,
348 component,
349 &trigger_config.executor,
350 )
351 .await
352 }
353 (None, Some(static_response)) => Self::respond_static_response(static_response),
354 (None, None) => Err(anyhow::anyhow!(
356 "Triggers must specify either component or static_response - neither is specified for {}",
357 route_match.raw_route()
358 )),
359 (Some(_), Some(_)) => Err(anyhow::anyhow!(
360 "Triggers must specify either component or static_response - both are specified for {}",
361 route_match.raw_route()
362 )),
363 }
364 }
365
366 async fn respond_wasm_component(
367 self: &Arc<Self>,
368 req: Request<Body>,
369 route_match: RouteMatch<'_, '_>,
370 server_scheme: Scheme,
371 client_addr: SocketAddr,
372 component_id: &str,
373 executor: &Option<HttpExecutorType>,
374 ) -> anyhow::Result<Response<Body>> {
375 let mut instance_builder = self.trigger_app.prepare(component_id)?;
376
377 let outbound_http = instance_builder
382 .factor_builder::<OutboundHttpFactor>()
383 .context(
384 "The wasi HTTP trigger was configured without the required wasi outbound http support",
385 )?;
386 let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
387 outbound_http.set_self_request_origin(origin);
388 outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
389
390 let handler_type = self
392 .component_handler_types
393 .get(component_id)
394 .with_context(|| format!("unknown component ID {component_id:?}"))?;
395 let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
396
397 let res = match executor {
398 HttpExecutorType::Http | HttpExecutorType::Wasip3Unstable => match handler_type {
399 HandlerType::Spin => {
400 SpinHttpExecutor
401 .execute(instance_builder, &route_match, req, client_addr)
402 .await
403 }
404 HandlerType::Wasi0_3(handler) => {
405 Wasip3HttpExecutor(handler)
406 .execute(&route_match, req, client_addr)
407 .await
408 }
409 HandlerType::Wasi0_2(_)
410 | HandlerType::Wasi2023_11_10(_)
411 | HandlerType::Wasi2023_10_18(_) => {
412 WasiHttpExecutor { handler_type }
413 .execute(instance_builder, &route_match, req, client_addr)
414 .await
415 }
416 HandlerType::Wagi(_) => unreachable!(),
417 },
418 HttpExecutorType::Wagi(wagi_config) => {
419 let indices = match handler_type {
420 HandlerType::Wagi(indices) => indices,
421 _ => unreachable!(),
422 };
423 let executor = WagiHttpExecutor {
424 wagi_config,
425 indices,
426 };
427 executor
428 .execute(instance_builder, &route_match, req, client_addr)
429 .await
430 }
431 };
432 match res {
433 Ok(res) => Ok(MatchedRoute::with_response_extension(
434 res,
435 route_match.raw_route(),
436 )),
437 Err(err) => {
438 tracing::error!("Error processing request: {err:?}");
439 instrument_error(&err);
440 Self::internal_error(None, route_match.raw_route())
441 }
442 }
443 }
444
445 fn respond_static_response(
446 sr: &spin_http::config::StaticResponse,
447 ) -> anyhow::Result<Response<Body>> {
448 let mut response = Response::builder();
449
450 response = response.status(sr.status());
451 for (header_name, header_value) in sr.headers() {
452 response = response.header(header_name, header_value);
453 }
454
455 let body = match sr.body() {
456 Some(b) => body::full(b.clone().into()),
457 None => body::empty(),
458 };
459
460 Ok(response.body(body)?)
461 }
462
463 fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
465 let info = AppInfo::new(self.trigger_app.app());
466 let body = serde_json::to_vec_pretty(&info)?;
467 Ok(MatchedRoute::with_response_extension(
468 Response::builder()
469 .header("content-type", "application/json")
470 .body(body::full(body.into()))?,
471 route,
472 ))
473 }
474
475 fn internal_error(
477 body: Option<&str>,
478 route: impl Into<String>,
479 ) -> anyhow::Result<Response<Body>> {
480 let body = match body {
481 Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
482 None => body::empty(),
483 };
484
485 Ok(MatchedRoute::with_response_extension(
486 Response::builder()
487 .status(StatusCode::INTERNAL_SERVER_ERROR)
488 .body(body)?,
489 route,
490 ))
491 }
492
493 fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
495 use std::sync::atomic::{AtomicBool, Ordering};
496 static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
497 if let NotFoundRouteKind::Normal(route) = kind {
498 if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
499 && std::io::stderr().is_terminal()
500 {
501 terminal::warn!(
502 "Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."
503 );
504 }
505 }
506 Ok(Response::builder()
507 .status(StatusCode::NOT_FOUND)
508 .body(body::empty())?)
509 }
510
511 fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
512 self: Arc<Self>,
513 stream: S,
514 server_scheme: Scheme,
515 client_addr: SocketAddr,
516 ) {
517 task::spawn(async move {
518 let mut server_builder = Builder::new(TokioExecutor::new());
519
520 if let Some(http1_max_buf_size) = self.http1_max_buf_size {
521 server_builder.http1().max_buf_size(http1_max_buf_size);
522 }
523
524 if let Err(err) = server_builder
525 .serve_connection(
526 TokioIo::new(stream),
527 service_fn(move |request| {
528 self.clone().instrumented_service_fn(
529 server_scheme.clone(),
530 client_addr,
531 request,
532 )
533 }),
534 )
535 .await
536 {
537 tracing::warn!("Error serving HTTP connection: {err:?}");
538 }
539 });
540 }
541
542 async fn instrumented_service_fn(
543 self: Arc<Self>,
544 server_scheme: Scheme,
545 client_addr: SocketAddr,
546 request: Request<Incoming>,
547 ) -> anyhow::Result<Response<HyperOutgoingBody>> {
548 let span = http_span!(request, client_addr);
549 let method = request.method().to_string();
550 async {
551 let result = self
552 .handle(
553 request.map(|body: Incoming| {
554 body.map_err(wasmtime_wasi_http::hyper_response_error)
555 .boxed()
556 }),
557 server_scheme,
558 client_addr,
559 )
560 .await;
561 finalize_http_span(result, method)
562 }
563 .instrument(span)
564 .await
565 }
566
567 fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
568 let local_addr = listener.local_addr()?;
569 let base_url = format!("{scheme}://{local_addr:?}");
570 terminal::step!("\nServing", "{base_url}");
571 tracing::info!("Serving {base_url}");
572
573 println!("Available Routes:");
574 for (route, key) in self.router.routes() {
575 println!(" {key}: {base_url}{route}");
576 if let spin_http::routes::TriggerLookupKey::Component(component_id) = &key {
577 if let Some(component) = self.trigger_app.app().get_component(component_id) {
578 if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
579 println!(" {description}");
580 }
581 }
582 }
583 }
584 Ok(())
585 }
586}
587
588fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
594 let uri = req.uri().clone();
595 let mut parts = uri.into_parts();
596 let headers = req.headers();
597 let header_authority = headers
598 .get(http::header::HOST)
599 .map(|h| -> anyhow::Result<Authority> {
600 let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
601 host_header
602 .parse()
603 .context("'Host' header contains an invalid authority")
604 })
605 .transpose()?;
606 let uri_authority = parts.authority;
607
608 let authority = match (header_authority, uri_authority) {
610 (None, None) => bail!("no 'Host' header present in request"),
611 (None, Some(a)) => a,
612 (Some(a), None) => a,
613 (Some(a1), Some(a2)) => {
614 if a1 != a2 {
617 return Err(anyhow::anyhow!(
618 "authority in 'Host' header does not match authority in URI"
619 ));
620 }
621 a1
622 }
623 };
624 parts.scheme = Some(scheme);
625 parts.authority = Some(authority);
626 *req.uri_mut() = Uri::from_parts(parts).unwrap();
627 Ok(())
628}
629
630pub(crate) trait HttpExecutor {
632 fn execute<F: RuntimeFactors>(
633 &self,
634 instance_builder: TriggerInstanceBuilder<F>,
635 route_match: &RouteMatch<'_, '_>,
636 req: Request<Body>,
637 client_addr: SocketAddr,
638 ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
639}
640
641pub(crate) struct HttpHandlerState<F: RuntimeFactors> {
642 trigger_app: Arc<TriggerApp<F>>,
643 component_id: String,
644 reuse_config: InstanceReuseConfig,
645}
646
647impl<F: RuntimeFactors> HandlerState for HttpHandlerState<F> {
648 type StoreData = InstanceState<F::InstanceState, ()>;
649
650 fn new_store(&self, _req_id: Option<u64>) -> anyhow::Result<StoreBundle<Self::StoreData>> {
651 Ok(StoreBundle {
652 store: self
653 .trigger_app
654 .prepare(&self.component_id)?
655 .instantiate_store(())?
656 .into_inner(),
657 write_profile: Box::new(|_| ()),
658 })
659 }
660
661 fn request_timeout(&self) -> Duration {
662 self.reuse_config
663 .request_timeout
664 .map(|range| rand::rng().random_range(range))
665 .unwrap_or(Duration::MAX)
666 }
667
668 fn idle_instance_timeout(&self) -> Duration {
669 rand::rng().random_range(self.reuse_config.idle_instance_timeout)
670 }
671
672 fn max_instance_reuse_count(&self) -> usize {
673 rand::rng().random_range(self.reuse_config.max_instance_reuse_count)
674 }
675
676 fn max_instance_concurrent_reuse_count(&self) -> usize {
677 rand::rng().random_range(self.reuse_config.max_instance_concurrent_reuse_count)
678 }
679
680 fn handle_worker_error(&self, error: anyhow::Error) {
681 tracing::warn!("worker error: {error:?}")
682 }
683}