spin_trigger_http/
server.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    io::{ErrorKind, IsTerminal},
5    net::SocketAddr,
6    sync::Arc,
7    time::Duration,
8};
9
10use anyhow::{bail, Context};
11use http::{
12    uri::{Authority, Scheme},
13    Request, Response, StatusCode, Uri,
14};
15use http_body_util::BodyExt;
16use hyper::{
17    body::{Bytes, Incoming},
18    service::service_fn,
19};
20use hyper_util::{
21    rt::{TokioExecutor, TokioIo},
22    server::conn::auto::Builder,
23};
24use rand::Rng;
25use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
26use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
27use spin_factors::RuntimeFactors;
28use spin_factors_executor::InstanceState;
29use spin_http::{
30    app_info::AppInfo,
31    body,
32    config::{HttpExecutorType, HttpTriggerConfig},
33    routes::{RouteMatch, Router},
34    trigger::HandlerType,
35};
36use tokio::{
37    io::{AsyncRead, AsyncWrite},
38    net::TcpListener,
39    task,
40};
41use tracing::Instrument;
42use wasmtime_wasi::p2::bindings::CommandIndices;
43use wasmtime_wasi_http::body::HyperOutgoingBody;
44use wasmtime_wasi_http::handler::{HandlerState, StoreBundle};
45
46use crate::{
47    headers::strip_forbidden_headers,
48    instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
49    outbound_http::OutboundHttpInterceptor,
50    spin::SpinHttpExecutor,
51    wagi::WagiHttpExecutor,
52    wasi::WasiHttpExecutor,
53    wasip3::Wasip3HttpExecutor,
54    Body, InstanceReuseConfig, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
55};
56
57pub const MAX_RETRIES: u16 = 10;
58
59/// An HTTP server which runs Spin apps.
60pub struct HttpServer<F: RuntimeFactors> {
61    /// The address the server is listening on.
62    listen_addr: SocketAddr,
63    /// The TLS configuration for the server.
64    tls_config: Option<TlsConfig>,
65    /// The maximum buffer size for an HTTP1 connection.
66    http1_max_buf_size: Option<usize>,
67    /// Whether to find a free port if the specified port is already in use.
68    find_free_port: bool,
69    /// Request router.
70    router: Router,
71    /// The app being triggered.
72    trigger_app: Arc<TriggerApp<F>>,
73    // Component ID -> component trigger config
74    component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
75    // Component ID -> handler type
76    component_handler_types: HashMap<String, HandlerType<HttpHandlerState<F>>>,
77}
78
79impl<F: RuntimeFactors> HttpServer<F> {
80    /// Create a new [`HttpServer`].
81    pub fn new(
82        listen_addr: SocketAddr,
83        tls_config: Option<TlsConfig>,
84        find_free_port: bool,
85        trigger_app: TriggerApp<F>,
86        http1_max_buf_size: Option<usize>,
87        reuse_config: InstanceReuseConfig,
88    ) -> anyhow::Result<Self> {
89        // This needs to be a vec before building the router to handle duplicate routes
90        let component_trigger_configs = trigger_app
91            .app()
92            .trigger_configs::<HttpTriggerConfig>("http")?
93            .into_iter()
94            .map(|(trigger_id, config)| config.lookup_key(trigger_id).map(|k| (k, config)))
95            .collect::<Result<Vec<_>, _>>()?;
96
97        // Build router
98        let component_routes = component_trigger_configs
99            .iter()
100            .map(|(key, config)| (key, &config.route));
101        let mut duplicate_routes = Vec::new();
102        let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
103        if !duplicate_routes.is_empty() {
104            tracing::error!(
105                "The following component routes are duplicates and will never be used:"
106            );
107            for dup in &duplicate_routes {
108                tracing::error!(
109                    "  {}: {} (duplicate of {})",
110                    dup.replaced_id,
111                    dup.route(),
112                    dup.effective_id,
113                );
114            }
115        }
116        if router.contains_reserved_route() {
117            tracing::error!(
118                "Routes under {} are handled by the Spin runtime and will never be reached",
119                spin_http::WELL_KNOWN_PREFIX
120            );
121        }
122        tracing::trace!(
123            "Constructed router: {:?}",
124            router.routes().collect::<Vec<_>>()
125        );
126
127        // Now that router is built we can merge duplicate routes by component
128        let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
129
130        let trigger_app = Arc::new(trigger_app);
131
132        let component_handler_types = component_trigger_configs
133            .iter()
134            .filter_map(|(key, trigger_config)| match key {
135                spin_http::routes::TriggerLookupKey::Component(component) => Some(
136                    Self::handler_type_for_component(
137                        &trigger_app,
138                        component,
139                        &trigger_config.executor,
140                        reuse_config,
141                    )
142                    .map(|ht| (component.clone(), ht)),
143                ),
144                spin_http::routes::TriggerLookupKey::Trigger(_) => None,
145            })
146            .collect::<anyhow::Result<_>>()?;
147        Ok(Self {
148            listen_addr,
149            tls_config,
150            find_free_port,
151            router,
152            trigger_app,
153            http1_max_buf_size,
154            component_trigger_configs,
155            component_handler_types,
156        })
157    }
158
159    fn handler_type_for_component(
160        trigger_app: &Arc<TriggerApp<F>>,
161        component_id: &str,
162        executor: &Option<HttpExecutorType>,
163        reuse_config: InstanceReuseConfig,
164    ) -> anyhow::Result<HandlerType<HttpHandlerState<F>>> {
165        let pre = trigger_app.get_instance_pre(component_id)?;
166        let handler_type = match executor {
167            None | Some(HttpExecutorType::Http) | Some(HttpExecutorType::Wasip3Unstable) => {
168                let handler_type = HandlerType::from_instance_pre(
169                    pre,
170                    HttpHandlerState {
171                        trigger_app: trigger_app.clone(),
172                        component_id: component_id.into(),
173                        reuse_config,
174                    },
175                )?;
176                handler_type.validate_executor(executor)?;
177                handler_type
178            }
179            Some(HttpExecutorType::Wagi(wagi_config)) => {
180                anyhow::ensure!(
181                    wagi_config.entrypoint == "_start",
182                    "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
183                );
184                HandlerType::Wagi(
185                    CommandIndices::new(pre)
186                        .context("failed to find wasi command interface for wagi executor")?,
187                )
188            }
189        };
190        Ok(handler_type)
191    }
192
193    /// Serve incoming requests over the provided [`TcpListener`].
194    pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
195        let listener: TcpListener = if self.find_free_port {
196            self.search_for_free_port().await?
197        } else {
198            TcpListener::bind(self.listen_addr).await.map_err(|err| {
199                if err.kind() == ErrorKind::AddrInUse {
200                    anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
201                } else {
202                    anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
203                }
204            })?
205        };
206
207        if let Some(tls_config) = self.tls_config.clone() {
208            self.serve_https(listener, tls_config).await?;
209        } else {
210            self.serve_http(listener).await?;
211        }
212        Ok(())
213    }
214
215    async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
216        let mut found_listener = None;
217        let mut addr = self.listen_addr;
218
219        for _ in 1..=MAX_RETRIES {
220            if addr.port() == u16::MAX {
221                anyhow::bail!(
222                    "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
223                );
224            }
225
226            match TcpListener::bind(addr).await {
227                Ok(listener) => {
228                    found_listener = Some(listener);
229                    break;
230                }
231                Err(err) if err.kind() == ErrorKind::AddrInUse => {
232                    addr.set_port(addr.port() + 1);
233                    continue;
234                }
235                Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
236            }
237        }
238
239        found_listener.ok_or_else(|| anyhow::anyhow!(
240            "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
241            self.listen_addr.port(),
242            self.listen_addr.port() + MAX_RETRIES
243        ))
244    }
245
246    async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
247        self.print_startup_msgs("http", &listener)?;
248        loop {
249            let (stream, client_addr) = listener.accept().await?;
250            self.clone()
251                .serve_connection(stream, Scheme::HTTP, client_addr);
252        }
253    }
254
255    async fn serve_https(
256        self: Arc<Self>,
257        listener: TcpListener,
258        tls_config: TlsConfig,
259    ) -> anyhow::Result<()> {
260        self.print_startup_msgs("https", &listener)?;
261        let acceptor = tls_config.server_config()?;
262        loop {
263            let (stream, client_addr) = listener.accept().await?;
264            match acceptor.accept(stream).await {
265                Ok(stream) => self
266                    .clone()
267                    .serve_connection(stream, Scheme::HTTPS, client_addr),
268                Err(err) => tracing::error!(?err, "Failed to start TLS session"),
269            }
270        }
271    }
272
273    /// Handles incoming requests using an HTTP executor.
274    ///
275    /// This method handles well known paths and routes requests to the handler when the router
276    /// matches the requests path.
277    pub async fn handle(
278        self: &Arc<Self>,
279        mut req: Request<Body>,
280        server_scheme: Scheme,
281        client_addr: SocketAddr,
282    ) -> anyhow::Result<Response<Body>> {
283        strip_forbidden_headers(&mut req);
284
285        spin_telemetry::extract_trace_context(&req);
286
287        let path = req.uri().path().to_string();
288
289        tracing::info!("Processing request on path '{path}'");
290
291        // Handle well-known spin paths
292        if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
293            return match well_known {
294                "health" => Ok(MatchedRoute::with_response_extension(
295                    Response::new(body::full(Bytes::from_static(b"OK"))),
296                    path,
297                )),
298                "info" => self.app_info(path),
299                _ => Self::not_found(NotFoundRouteKind::WellKnown),
300            };
301        }
302
303        match self.router.route(&path) {
304            Ok(route_match) => {
305                self.handle_trigger_route(req, route_match, server_scheme, client_addr)
306                    .await
307            }
308            Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
309        }
310    }
311
312    /// Handles a successful route match.
313    pub async fn handle_trigger_route(
314        self: &Arc<Self>,
315        mut req: Request<Body>,
316        route_match: RouteMatch<'_, '_>,
317        server_scheme: Scheme,
318        client_addr: SocketAddr,
319    ) -> anyhow::Result<Response<Body>> {
320        set_req_uri(&mut req, server_scheme.clone())?;
321        let app_id = self
322            .trigger_app
323            .app()
324            .get_metadata(APP_NAME_KEY)?
325            .unwrap_or_else(|| "<unnamed>".into());
326
327        let lookup_key = route_match.lookup_key();
328
329        spin_telemetry::metrics::monotonic_counter!(
330            spin.request_count = 1,
331            trigger_type = "http",
332            app_id = app_id,
333            component_id = lookup_key.to_string()
334        );
335
336        let trigger_config = self
337            .component_trigger_configs
338            .get(lookup_key)
339            .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
340
341        match (&trigger_config.component, &trigger_config.static_response) {
342            (Some(component), None) => {
343                self.respond_wasm_component(
344                    req,
345                    route_match,
346                    server_scheme,
347                    client_addr,
348                    component,
349                    &trigger_config.executor,
350                )
351                .await
352            }
353            (None, Some(static_response)) => Self::respond_static_response(static_response),
354            // These error cases should have been ruled out by this point but belt and braces
355            (None, None) => Err(anyhow::anyhow!(
356                "Triggers must specify either component or static_response - neither is specified for {}",
357                route_match.raw_route()
358            )),
359            (Some(_), Some(_)) => Err(anyhow::anyhow!(
360                "Triggers must specify either component or static_response - both are specified for {}",
361                route_match.raw_route()
362            )),
363        }
364    }
365
366    async fn respond_wasm_component(
367        self: &Arc<Self>,
368        req: Request<Body>,
369        route_match: RouteMatch<'_, '_>,
370        server_scheme: Scheme,
371        client_addr: SocketAddr,
372        component_id: &str,
373        executor: &Option<HttpExecutorType>,
374    ) -> anyhow::Result<Response<Body>> {
375        let mut instance_builder = self.trigger_app.prepare(component_id)?;
376
377        // Set up outbound HTTP request origin and service chaining
378        // The outbound HTTP factor is required since both inbound and outbound wasi HTTP
379        // implementations assume they use the same underlying wasmtime resource storage.
380        // Eventually, we may be able to factor this out to a separate factor.
381        let outbound_http = instance_builder
382            .factor_builder::<OutboundHttpFactor>()
383            .context(
384            "The wasi HTTP trigger was configured without the required wasi outbound http support",
385        )?;
386        let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
387        outbound_http.set_self_request_origin(origin);
388        outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
389
390        // Prepare HTTP executor
391        let handler_type = self
392            .component_handler_types
393            .get(component_id)
394            .with_context(|| format!("unknown component ID {component_id:?}"))?;
395        let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
396
397        let res = match executor {
398            HttpExecutorType::Http | HttpExecutorType::Wasip3Unstable => match handler_type {
399                HandlerType::Spin => {
400                    SpinHttpExecutor
401                        .execute(instance_builder, &route_match, req, client_addr)
402                        .await
403                }
404                HandlerType::Wasi0_3(handler) => {
405                    Wasip3HttpExecutor(handler)
406                        .execute(&route_match, req, client_addr)
407                        .await
408                }
409                HandlerType::Wasi0_2(_)
410                | HandlerType::Wasi2023_11_10(_)
411                | HandlerType::Wasi2023_10_18(_) => {
412                    WasiHttpExecutor { handler_type }
413                        .execute(instance_builder, &route_match, req, client_addr)
414                        .await
415                }
416                HandlerType::Wagi(_) => unreachable!(),
417            },
418            HttpExecutorType::Wagi(wagi_config) => {
419                let indices = match handler_type {
420                    HandlerType::Wagi(indices) => indices,
421                    _ => unreachable!(),
422                };
423                let executor = WagiHttpExecutor {
424                    wagi_config,
425                    indices,
426                };
427                executor
428                    .execute(instance_builder, &route_match, req, client_addr)
429                    .await
430            }
431        };
432        match res {
433            Ok(res) => Ok(MatchedRoute::with_response_extension(
434                res,
435                route_match.raw_route(),
436            )),
437            Err(err) => {
438                tracing::error!("Error processing request: {err:?}");
439                instrument_error(&err);
440                Self::internal_error(None, route_match.raw_route())
441            }
442        }
443    }
444
445    fn respond_static_response(
446        sr: &spin_http::config::StaticResponse,
447    ) -> anyhow::Result<Response<Body>> {
448        let mut response = Response::builder();
449
450        response = response.status(sr.status());
451        for (header_name, header_value) in sr.headers() {
452            response = response.header(header_name, header_value);
453        }
454
455        let body = match sr.body() {
456            Some(b) => body::full(b.clone().into()),
457            None => body::empty(),
458        };
459
460        Ok(response.body(body)?)
461    }
462
463    /// Returns spin status information.
464    fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
465        let info = AppInfo::new(self.trigger_app.app());
466        let body = serde_json::to_vec_pretty(&info)?;
467        Ok(MatchedRoute::with_response_extension(
468            Response::builder()
469                .header("content-type", "application/json")
470                .body(body::full(body.into()))?,
471            route,
472        ))
473    }
474
475    /// Creates an HTTP 500 response.
476    fn internal_error(
477        body: Option<&str>,
478        route: impl Into<String>,
479    ) -> anyhow::Result<Response<Body>> {
480        let body = match body {
481            Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
482            None => body::empty(),
483        };
484
485        Ok(MatchedRoute::with_response_extension(
486            Response::builder()
487                .status(StatusCode::INTERNAL_SERVER_ERROR)
488                .body(body)?,
489            route,
490        ))
491    }
492
493    /// Creates an HTTP 404 response.
494    fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
495        use std::sync::atomic::{AtomicBool, Ordering};
496        static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
497        if let NotFoundRouteKind::Normal(route) = kind {
498            if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
499                && std::io::stderr().is_terminal()
500            {
501                terminal::warn!(
502                    "Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."
503                );
504            }
505        }
506        Ok(Response::builder()
507            .status(StatusCode::NOT_FOUND)
508            .body(body::empty())?)
509    }
510
511    fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
512        self: Arc<Self>,
513        stream: S,
514        server_scheme: Scheme,
515        client_addr: SocketAddr,
516    ) {
517        task::spawn(async move {
518            let mut server_builder = Builder::new(TokioExecutor::new());
519
520            if let Some(http1_max_buf_size) = self.http1_max_buf_size {
521                server_builder.http1().max_buf_size(http1_max_buf_size);
522            }
523
524            if let Err(err) = server_builder
525                .serve_connection(
526                    TokioIo::new(stream),
527                    service_fn(move |request| {
528                        self.clone().instrumented_service_fn(
529                            server_scheme.clone(),
530                            client_addr,
531                            request,
532                        )
533                    }),
534                )
535                .await
536            {
537                tracing::warn!("Error serving HTTP connection: {err:?}");
538            }
539        });
540    }
541
542    async fn instrumented_service_fn(
543        self: Arc<Self>,
544        server_scheme: Scheme,
545        client_addr: SocketAddr,
546        request: Request<Incoming>,
547    ) -> anyhow::Result<Response<HyperOutgoingBody>> {
548        let span = http_span!(request, client_addr);
549        let method = request.method().to_string();
550        async {
551            let result = self
552                .handle(
553                    request.map(|body: Incoming| {
554                        body.map_err(wasmtime_wasi_http::hyper_response_error)
555                            .boxed()
556                    }),
557                    server_scheme,
558                    client_addr,
559                )
560                .await;
561            finalize_http_span(result, method)
562        }
563        .instrument(span)
564        .await
565    }
566
567    fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
568        let local_addr = listener.local_addr()?;
569        let base_url = format!("{scheme}://{local_addr:?}");
570        terminal::step!("\nServing", "{base_url}");
571        tracing::info!("Serving {base_url}");
572
573        println!("Available Routes:");
574        for (route, key) in self.router.routes() {
575            println!("  {key}: {base_url}{route}");
576            if let spin_http::routes::TriggerLookupKey::Component(component_id) = &key {
577                if let Some(component) = self.trigger_app.app().get_component(component_id) {
578                    if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
579                        println!("    {description}");
580                    }
581                }
582            }
583        }
584        Ok(())
585    }
586}
587
588/// The incoming request's scheme and authority
589///
590/// The incoming request's URI is relative to the server, so we need to set the scheme and authority.
591/// Either the `Host` header or the request's URI's authority is used as the source of truth for the authority.
592/// This function will error if the authority cannot be unambiguously determined.
593fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
594    let uri = req.uri().clone();
595    let mut parts = uri.into_parts();
596    let headers = req.headers();
597    let header_authority = headers
598        .get(http::header::HOST)
599        .map(|h| -> anyhow::Result<Authority> {
600            let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
601            host_header
602                .parse()
603                .context("'Host' header contains an invalid authority")
604        })
605        .transpose()?;
606    let uri_authority = parts.authority;
607
608    // Get authority either from request URI or from 'Host' header
609    let authority = match (header_authority, uri_authority) {
610        (None, None) => bail!("no 'Host' header present in request"),
611        (None, Some(a)) => a,
612        (Some(a), None) => a,
613        (Some(a1), Some(a2)) => {
614            // Ensure that if `req.authority` is set, it matches what was in the `Host` header
615            // https://github.com/hyperium/hyper/issues/1612
616            if a1 != a2 {
617                return Err(anyhow::anyhow!(
618                    "authority in 'Host' header does not match authority in URI"
619                ));
620            }
621            a1
622        }
623    };
624    parts.scheme = Some(scheme);
625    parts.authority = Some(authority);
626    *req.uri_mut() = Uri::from_parts(parts).unwrap();
627    Ok(())
628}
629
630/// An HTTP executor.
631pub(crate) trait HttpExecutor {
632    fn execute<F: RuntimeFactors>(
633        &self,
634        instance_builder: TriggerInstanceBuilder<F>,
635        route_match: &RouteMatch<'_, '_>,
636        req: Request<Body>,
637        client_addr: SocketAddr,
638    ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
639}
640
641pub(crate) struct HttpHandlerState<F: RuntimeFactors> {
642    trigger_app: Arc<TriggerApp<F>>,
643    component_id: String,
644    reuse_config: InstanceReuseConfig,
645}
646
647impl<F: RuntimeFactors> HandlerState for HttpHandlerState<F> {
648    type StoreData = InstanceState<F::InstanceState, ()>;
649
650    fn new_store(&self, _req_id: Option<u64>) -> anyhow::Result<StoreBundle<Self::StoreData>> {
651        Ok(StoreBundle {
652            store: self
653                .trigger_app
654                .prepare(&self.component_id)?
655                .instantiate_store(())?
656                .into_inner(),
657            write_profile: Box::new(|_| ()),
658        })
659    }
660
661    fn request_timeout(&self) -> Duration {
662        self.reuse_config
663            .request_timeout
664            .map(|range| rand::rng().random_range(range))
665            .unwrap_or(Duration::MAX)
666    }
667
668    fn idle_instance_timeout(&self) -> Duration {
669        rand::rng().random_range(self.reuse_config.idle_instance_timeout)
670    }
671
672    fn max_instance_reuse_count(&self) -> usize {
673        rand::rng().random_range(self.reuse_config.max_instance_reuse_count)
674    }
675
676    fn max_instance_concurrent_reuse_count(&self) -> usize {
677        rand::rng().random_range(self.reuse_config.max_instance_concurrent_reuse_count)
678    }
679
680    fn handle_worker_error(&self, error: anyhow::Error) {
681        tracing::warn!("worker error: {error:?}")
682    }
683}