spin_trigger_http/
server.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    io::{ErrorKind, IsTerminal},
5    net::SocketAddr,
6    sync::Arc,
7};
8
9use anyhow::{bail, Context};
10use http::{
11    uri::{Authority, Scheme},
12    Request, Response, StatusCode, Uri,
13};
14use http_body_util::BodyExt;
15use hyper::{
16    body::{Bytes, Incoming},
17    service::service_fn,
18};
19use hyper_util::{
20    rt::{TokioExecutor, TokioIo},
21    server::conn::auto::Builder,
22};
23use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
24use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
25use spin_factors::RuntimeFactors;
26use spin_http::{
27    app_info::AppInfo,
28    body,
29    config::{HttpExecutorType, HttpTriggerConfig},
30    routes::{RouteMatch, Router},
31    trigger::HandlerType,
32};
33use tokio::{
34    io::{AsyncRead, AsyncWrite},
35    net::TcpListener,
36    task,
37};
38use tracing::Instrument;
39use wasmtime_wasi::p2::bindings::CommandIndices;
40use wasmtime_wasi_http::body::HyperOutgoingBody;
41
42use crate::{
43    headers::strip_forbidden_headers,
44    instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
45    outbound_http::OutboundHttpInterceptor,
46    spin::SpinHttpExecutor,
47    wagi::WagiHttpExecutor,
48    wasi::WasiHttpExecutor,
49    wasip3::Wasip3HttpExecutor,
50    Body, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
51};
52
53pub const MAX_RETRIES: u16 = 10;
54
55/// An HTTP server which runs Spin apps.
56pub struct HttpServer<F: RuntimeFactors> {
57    /// The address the server is listening on.
58    listen_addr: SocketAddr,
59    /// The TLS configuration for the server.
60    tls_config: Option<TlsConfig>,
61    /// The maximum buffer size for an HTTP1 connection.
62    http1_max_buf_size: Option<usize>,
63    /// Whether to find a free port if the specified port is already in use.
64    find_free_port: bool,
65    /// Request router.
66    router: Router,
67    /// The app being triggered.
68    trigger_app: TriggerApp<F>,
69    // Component ID -> component trigger config
70    component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
71    // Component ID -> handler type
72    component_handler_types: HashMap<String, HandlerType>,
73}
74
75impl<F: RuntimeFactors> HttpServer<F> {
76    /// Create a new [`HttpServer`].
77    pub fn new(
78        listen_addr: SocketAddr,
79        tls_config: Option<TlsConfig>,
80        find_free_port: bool,
81        trigger_app: TriggerApp<F>,
82        http1_max_buf_size: Option<usize>,
83    ) -> anyhow::Result<Self> {
84        // This needs to be a vec before building the router to handle duplicate routes
85        let component_trigger_configs = trigger_app
86            .app()
87            .trigger_configs::<HttpTriggerConfig>("http")?
88            .into_iter()
89            .map(|(trigger_id, config)| config.lookup_key(trigger_id).map(|k| (k, config)))
90            .collect::<Result<Vec<_>, _>>()?;
91
92        // Build router
93        let component_routes = component_trigger_configs
94            .iter()
95            .map(|(key, config)| (key, &config.route));
96        let mut duplicate_routes = Vec::new();
97        let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
98        if !duplicate_routes.is_empty() {
99            tracing::error!(
100                "The following component routes are duplicates and will never be used:"
101            );
102            for dup in &duplicate_routes {
103                tracing::error!(
104                    "  {}: {} (duplicate of {})",
105                    dup.replaced_id,
106                    dup.route(),
107                    dup.effective_id,
108                );
109            }
110        }
111        if router.contains_reserved_route() {
112            tracing::error!(
113                "Routes under {} are handled by the Spin runtime and will never be reached",
114                spin_http::WELL_KNOWN_PREFIX
115            );
116        }
117        tracing::trace!(
118            "Constructed router: {:?}",
119            router.routes().collect::<Vec<_>>()
120        );
121
122        // Now that router is built we can merge duplicate routes by component
123        let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
124
125        let component_handler_types = component_trigger_configs
126            .iter()
127            .filter_map(|(key, trigger_config)| match key {
128                spin_http::routes::TriggerLookupKey::Component(component) => Some(
129                    Self::handler_type_for_component(
130                        &trigger_app,
131                        component,
132                        &trigger_config.executor,
133                    )
134                    .map(|ht| (component.clone(), ht)),
135                ),
136                spin_http::routes::TriggerLookupKey::Trigger(_) => None,
137            })
138            .collect::<anyhow::Result<_>>()?;
139        Ok(Self {
140            listen_addr,
141            tls_config,
142            find_free_port,
143            router,
144            trigger_app,
145            http1_max_buf_size,
146            component_trigger_configs,
147            component_handler_types,
148        })
149    }
150
151    fn handler_type_for_component(
152        trigger_app: &TriggerApp<F>,
153        component_id: &str,
154        executor: &Option<HttpExecutorType>,
155    ) -> anyhow::Result<HandlerType> {
156        let pre = trigger_app.get_instance_pre(component_id)?;
157        let handler_type = match executor {
158            None | Some(HttpExecutorType::Http) | Some(HttpExecutorType::Wasip3Unstable) => {
159                let handler_type = HandlerType::from_instance_pre(pre)?;
160                handler_type.validate_executor(executor)?;
161                handler_type
162            }
163            Some(HttpExecutorType::Wagi(wagi_config)) => {
164                anyhow::ensure!(
165                    wagi_config.entrypoint == "_start",
166                    "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
167                );
168                HandlerType::Wagi(
169                    CommandIndices::new(pre)
170                        .context("failed to find wasi command interface for wagi executor")?,
171                )
172            }
173        };
174        Ok(handler_type)
175    }
176
177    /// Serve incoming requests over the provided [`TcpListener`].
178    pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
179        let listener: TcpListener = if self.find_free_port {
180            self.search_for_free_port().await?
181        } else {
182            TcpListener::bind(self.listen_addr).await.map_err(|err| {
183                if err.kind() == ErrorKind::AddrInUse {
184                    anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
185                } else {
186                    anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
187                }
188            })?
189        };
190
191        if let Some(tls_config) = self.tls_config.clone() {
192            self.serve_https(listener, tls_config).await?;
193        } else {
194            self.serve_http(listener).await?;
195        }
196        Ok(())
197    }
198
199    async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
200        let mut found_listener = None;
201        let mut addr = self.listen_addr;
202
203        for _ in 1..=MAX_RETRIES {
204            if addr.port() == u16::MAX {
205                anyhow::bail!(
206                    "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
207                );
208            }
209
210            match TcpListener::bind(addr).await {
211                Ok(listener) => {
212                    found_listener = Some(listener);
213                    break;
214                }
215                Err(err) if err.kind() == ErrorKind::AddrInUse => {
216                    addr.set_port(addr.port() + 1);
217                    continue;
218                }
219                Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
220            }
221        }
222
223        found_listener.ok_or_else(|| anyhow::anyhow!(
224            "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
225            self.listen_addr.port(),
226            self.listen_addr.port() + MAX_RETRIES
227        ))
228    }
229
230    async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
231        self.print_startup_msgs("http", &listener)?;
232        loop {
233            let (stream, client_addr) = listener.accept().await?;
234            self.clone()
235                .serve_connection(stream, Scheme::HTTP, client_addr);
236        }
237    }
238
239    async fn serve_https(
240        self: Arc<Self>,
241        listener: TcpListener,
242        tls_config: TlsConfig,
243    ) -> anyhow::Result<()> {
244        self.print_startup_msgs("https", &listener)?;
245        let acceptor = tls_config.server_config()?;
246        loop {
247            let (stream, client_addr) = listener.accept().await?;
248            match acceptor.accept(stream).await {
249                Ok(stream) => self
250                    .clone()
251                    .serve_connection(stream, Scheme::HTTPS, client_addr),
252                Err(err) => tracing::error!(?err, "Failed to start TLS session"),
253            }
254        }
255    }
256
257    /// Handles incoming requests using an HTTP executor.
258    ///
259    /// This method handles well known paths and routes requests to the handler when the router
260    /// matches the requests path.
261    pub async fn handle(
262        self: &Arc<Self>,
263        mut req: Request<Body>,
264        server_scheme: Scheme,
265        client_addr: SocketAddr,
266    ) -> anyhow::Result<Response<Body>> {
267        strip_forbidden_headers(&mut req);
268
269        spin_telemetry::extract_trace_context(&req);
270
271        let path = req.uri().path().to_string();
272
273        tracing::info!("Processing request on path '{path}'");
274
275        // Handle well-known spin paths
276        if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
277            return match well_known {
278                "health" => Ok(MatchedRoute::with_response_extension(
279                    Response::new(body::full(Bytes::from_static(b"OK"))),
280                    path,
281                )),
282                "info" => self.app_info(path),
283                _ => Self::not_found(NotFoundRouteKind::WellKnown),
284            };
285        }
286
287        match self.router.route(&path) {
288            Ok(route_match) => {
289                self.handle_trigger_route(req, route_match, server_scheme, client_addr)
290                    .await
291            }
292            Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
293        }
294    }
295
296    /// Handles a successful route match.
297    pub async fn handle_trigger_route(
298        self: &Arc<Self>,
299        mut req: Request<Body>,
300        route_match: RouteMatch<'_, '_>,
301        server_scheme: Scheme,
302        client_addr: SocketAddr,
303    ) -> anyhow::Result<Response<Body>> {
304        set_req_uri(&mut req, server_scheme.clone())?;
305        let app_id = self
306            .trigger_app
307            .app()
308            .get_metadata(APP_NAME_KEY)?
309            .unwrap_or_else(|| "<unnamed>".into());
310
311        let lookup_key = route_match.lookup_key();
312
313        spin_telemetry::metrics::monotonic_counter!(
314            spin.request_count = 1,
315            trigger_type = "http",
316            app_id = app_id,
317            component_id = lookup_key.to_string()
318        );
319
320        let trigger_config = self
321            .component_trigger_configs
322            .get(lookup_key)
323            .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
324
325        match (&trigger_config.component, &trigger_config.static_response) {
326            (Some(component), None) => {
327                self.respond_wasm_component(
328                    req,
329                    route_match,
330                    server_scheme,
331                    client_addr,
332                    component,
333                    &trigger_config.executor,
334                )
335                .await
336            }
337            (None, Some(static_response)) => {
338                Self::respond_static_response(static_response)
339            },
340            // These error cases should have been ruled out by this point but belt and braces
341            (None, None) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - neither is specified for {}", route_match.raw_route())),
342            (Some(_), Some(_)) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - both are specified for {}", route_match.raw_route())),
343        }
344    }
345
346    async fn respond_wasm_component(
347        self: &Arc<Self>,
348        req: Request<Body>,
349        route_match: RouteMatch<'_, '_>,
350        server_scheme: Scheme,
351        client_addr: SocketAddr,
352        component_id: &str,
353        executor: &Option<HttpExecutorType>,
354    ) -> anyhow::Result<Response<Body>> {
355        let mut instance_builder = self.trigger_app.prepare(component_id)?;
356
357        // Set up outbound HTTP request origin and service chaining
358        // The outbound HTTP factor is required since both inbound and outbound wasi HTTP
359        // implementations assume they use the same underlying wasmtime resource storage.
360        // Eventually, we may be able to factor this out to a separate factor.
361        let outbound_http = instance_builder
362            .factor_builder::<OutboundHttpFactor>()
363            .context(
364            "The wasi HTTP trigger was configured without the required wasi outbound http support",
365        )?;
366        let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
367        outbound_http.set_self_request_origin(origin);
368        outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
369
370        // Prepare HTTP executor
371        let handler_type = self
372            .component_handler_types
373            .get(component_id)
374            .with_context(|| format!("unknown component ID {component_id:?}"))?;
375        let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
376
377        let res = match executor {
378            HttpExecutorType::Http | HttpExecutorType::Wasip3Unstable => match handler_type {
379                HandlerType::Spin => {
380                    SpinHttpExecutor
381                        .execute(instance_builder, &route_match, req, client_addr)
382                        .await
383                }
384                HandlerType::Wasi0_3(indices) => {
385                    Wasip3HttpExecutor(indices)
386                        .execute(instance_builder, &route_match, req, client_addr)
387                        .await
388                }
389                HandlerType::Wasi0_2(_)
390                | HandlerType::Wasi2023_11_10(_)
391                | HandlerType::Wasi2023_10_18(_) => {
392                    WasiHttpExecutor { handler_type }
393                        .execute(instance_builder, &route_match, req, client_addr)
394                        .await
395                }
396                HandlerType::Wagi(_) => unreachable!(),
397            },
398            HttpExecutorType::Wagi(wagi_config) => {
399                let indices = match handler_type {
400                    HandlerType::Wagi(indices) => indices,
401                    _ => unreachable!(),
402                };
403                let executor = WagiHttpExecutor {
404                    wagi_config,
405                    indices,
406                };
407                executor
408                    .execute(instance_builder, &route_match, req, client_addr)
409                    .await
410            }
411        };
412        match res {
413            Ok(res) => Ok(MatchedRoute::with_response_extension(
414                res,
415                route_match.raw_route(),
416            )),
417            Err(err) => {
418                tracing::error!("Error processing request: {err:?}");
419                instrument_error(&err);
420                Self::internal_error(None, route_match.raw_route())
421            }
422        }
423    }
424
425    fn respond_static_response(
426        sr: &spin_http::config::StaticResponse,
427    ) -> anyhow::Result<Response<Body>> {
428        let mut response = Response::builder();
429
430        response = response.status(sr.status());
431        for (header_name, header_value) in sr.headers() {
432            response = response.header(header_name, header_value);
433        }
434
435        let body = match sr.body() {
436            Some(b) => body::full(b.clone().into()),
437            None => body::empty(),
438        };
439
440        Ok(response.body(body)?)
441    }
442
443    /// Returns spin status information.
444    fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
445        let info = AppInfo::new(self.trigger_app.app());
446        let body = serde_json::to_vec_pretty(&info)?;
447        Ok(MatchedRoute::with_response_extension(
448            Response::builder()
449                .header("content-type", "application/json")
450                .body(body::full(body.into()))?,
451            route,
452        ))
453    }
454
455    /// Creates an HTTP 500 response.
456    fn internal_error(
457        body: Option<&str>,
458        route: impl Into<String>,
459    ) -> anyhow::Result<Response<Body>> {
460        let body = match body {
461            Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
462            None => body::empty(),
463        };
464
465        Ok(MatchedRoute::with_response_extension(
466            Response::builder()
467                .status(StatusCode::INTERNAL_SERVER_ERROR)
468                .body(body)?,
469            route,
470        ))
471    }
472
473    /// Creates an HTTP 404 response.
474    fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
475        use std::sync::atomic::{AtomicBool, Ordering};
476        static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
477        if let NotFoundRouteKind::Normal(route) = kind {
478            if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
479                && std::io::stderr().is_terminal()
480            {
481                terminal::warn!(
482                    "Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."
483                );
484            }
485        }
486        Ok(Response::builder()
487            .status(StatusCode::NOT_FOUND)
488            .body(body::empty())?)
489    }
490
491    fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
492        self: Arc<Self>,
493        stream: S,
494        server_scheme: Scheme,
495        client_addr: SocketAddr,
496    ) {
497        task::spawn(async move {
498            let mut server_builder = Builder::new(TokioExecutor::new());
499
500            if let Some(http1_max_buf_size) = self.http1_max_buf_size {
501                server_builder.http1().max_buf_size(http1_max_buf_size);
502            }
503
504            if let Err(err) = server_builder
505                .serve_connection(
506                    TokioIo::new(stream),
507                    service_fn(move |request| {
508                        self.clone().instrumented_service_fn(
509                            server_scheme.clone(),
510                            client_addr,
511                            request,
512                        )
513                    }),
514                )
515                .await
516            {
517                tracing::warn!("Error serving HTTP connection: {err:?}");
518            }
519        });
520    }
521
522    async fn instrumented_service_fn(
523        self: Arc<Self>,
524        server_scheme: Scheme,
525        client_addr: SocketAddr,
526        request: Request<Incoming>,
527    ) -> anyhow::Result<Response<HyperOutgoingBody>> {
528        let span = http_span!(request, client_addr);
529        let method = request.method().to_string();
530        async {
531            let result = self
532                .handle(
533                    request.map(|body: Incoming| {
534                        body.map_err(wasmtime_wasi_http::hyper_response_error)
535                            .boxed()
536                    }),
537                    server_scheme,
538                    client_addr,
539                )
540                .await;
541            finalize_http_span(result, method)
542        }
543        .instrument(span)
544        .await
545    }
546
547    fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
548        let local_addr = listener.local_addr()?;
549        let base_url = format!("{scheme}://{local_addr:?}");
550        terminal::step!("\nServing", "{base_url}");
551        tracing::info!("Serving {base_url}");
552
553        println!("Available Routes:");
554        for (route, key) in self.router.routes() {
555            println!("  {key}: {base_url}{route}");
556            if let spin_http::routes::TriggerLookupKey::Component(component_id) = &key {
557                if let Some(component) = self.trigger_app.app().get_component(component_id) {
558                    if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
559                        println!("    {description}");
560                    }
561                }
562            }
563        }
564        Ok(())
565    }
566}
567
568/// The incoming request's scheme and authority
569///
570/// The incoming request's URI is relative to the server, so we need to set the scheme and authority.
571/// Either the `Host` header or the request's URI's authority is used as the source of truth for the authority.
572/// This function will error if the authority cannot be unambiguously determined.
573fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
574    let uri = req.uri().clone();
575    let mut parts = uri.into_parts();
576    let headers = req.headers();
577    let header_authority = headers
578        .get(http::header::HOST)
579        .map(|h| -> anyhow::Result<Authority> {
580            let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
581            host_header
582                .parse()
583                .context("'Host' header contains an invalid authority")
584        })
585        .transpose()?;
586    let uri_authority = parts.authority;
587
588    // Get authority either from request URI or from 'Host' header
589    let authority = match (header_authority, uri_authority) {
590        (None, None) => bail!("no 'Host' header present in request"),
591        (None, Some(a)) => a,
592        (Some(a), None) => a,
593        (Some(a1), Some(a2)) => {
594            // Ensure that if `req.authority` is set, it matches what was in the `Host` header
595            // https://github.com/hyperium/hyper/issues/1612
596            if a1 != a2 {
597                return Err(anyhow::anyhow!(
598                    "authority in 'Host' header does not match authority in URI"
599                ));
600            }
601            a1
602        }
603    };
604    parts.scheme = Some(scheme);
605    parts.authority = Some(authority);
606    *req.uri_mut() = Uri::from_parts(parts).unwrap();
607    Ok(())
608}
609
610/// An HTTP executor.
611pub(crate) trait HttpExecutor {
612    fn execute<F: RuntimeFactors>(
613        &self,
614        instance_builder: TriggerInstanceBuilder<F>,
615        route_match: &RouteMatch<'_, '_>,
616        req: Request<Body>,
617        client_addr: SocketAddr,
618    ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
619}