spin_trigger_http/
server.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    io::{ErrorKind, IsTerminal},
5    net::SocketAddr,
6    sync::Arc,
7};
8
9use anyhow::{bail, Context};
10use http::{
11    uri::{Authority, Scheme},
12    Request, Response, StatusCode, Uri,
13};
14use http_body_util::BodyExt;
15use hyper::{
16    body::{Bytes, Incoming},
17    service::service_fn,
18};
19use hyper_util::{
20    rt::{TokioExecutor, TokioIo},
21    server::conn::auto::Builder,
22};
23use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
24use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
25use spin_factors::RuntimeFactors;
26use spin_http::{
27    app_info::AppInfo,
28    body,
29    config::{HttpExecutorType, HttpTriggerConfig},
30    routes::{RouteMatch, Router},
31    trigger::HandlerType,
32};
33use tokio::{
34    io::{AsyncRead, AsyncWrite},
35    net::TcpListener,
36    task,
37};
38use tracing::Instrument;
39use wasmtime_wasi::p2::bindings::CommandIndices;
40use wasmtime_wasi_http::body::HyperOutgoingBody;
41
42use crate::{
43    headers::strip_forbidden_headers,
44    instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
45    outbound_http::OutboundHttpInterceptor,
46    spin::SpinHttpExecutor,
47    wagi::WagiHttpExecutor,
48    wasi::WasiHttpExecutor,
49    Body, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
50};
51
52pub const MAX_RETRIES: u16 = 10;
53
54/// An HTTP server which runs Spin apps.
55pub struct HttpServer<F: RuntimeFactors> {
56    /// The address the server is listening on.
57    listen_addr: SocketAddr,
58    /// The TLS configuration for the server.
59    tls_config: Option<TlsConfig>,
60    /// Whether to find a free port if the specified port is already in use.
61    find_free_port: bool,
62    /// Request router.
63    router: Router,
64    /// The app being triggered.
65    trigger_app: TriggerApp<F>,
66    // Component ID -> component trigger config
67    component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
68    // Component ID -> handler type
69    component_handler_types: HashMap<String, HandlerType>,
70}
71
72impl<F: RuntimeFactors> HttpServer<F> {
73    /// Create a new [`HttpServer`].
74    pub fn new(
75        listen_addr: SocketAddr,
76        tls_config: Option<TlsConfig>,
77        find_free_port: bool,
78        trigger_app: TriggerApp<F>,
79    ) -> anyhow::Result<Self> {
80        // This needs to be a vec before building the router to handle duplicate routes
81        let component_trigger_configs = trigger_app
82            .app()
83            .trigger_configs::<HttpTriggerConfig>("http")?
84            .into_iter()
85            .map(|(trigger_id, config)| (config.lookup_key(trigger_id).map(|k| (k, config))))
86            .collect::<Result<Vec<_>, _>>()?;
87
88        // Build router
89        let component_routes = component_trigger_configs
90            .iter()
91            .map(|(key, config)| (key, &config.route));
92        let mut duplicate_routes = Vec::new();
93        let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
94        if !duplicate_routes.is_empty() {
95            tracing::error!(
96                "The following component routes are duplicates and will never be used:"
97            );
98            for dup in &duplicate_routes {
99                tracing::error!(
100                    "  {}: {} (duplicate of {})",
101                    dup.replaced_id,
102                    dup.route(),
103                    dup.effective_id,
104                );
105            }
106        }
107        if router.contains_reserved_route() {
108            tracing::error!(
109                "Routes under {} are handled by the Spin runtime and will never be reached",
110                spin_http::WELL_KNOWN_PREFIX
111            );
112        }
113        tracing::trace!(
114            "Constructed router: {:?}",
115            router.routes().collect::<Vec<_>>()
116        );
117
118        // Now that router is built we can merge duplicate routes by component
119        let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
120
121        let component_handler_types = component_trigger_configs
122            .iter()
123            .filter_map(|(key, trigger_config)| match key {
124                spin_http::routes::TriggerLookupKey::Component(component) => Some(
125                    Self::handler_type_for_component(
126                        &trigger_app,
127                        component,
128                        &trigger_config.executor,
129                    )
130                    .map(|ht| (component.clone(), ht)),
131                ),
132                spin_http::routes::TriggerLookupKey::Trigger(_) => None,
133            })
134            .collect::<anyhow::Result<_>>()?;
135        Ok(Self {
136            listen_addr,
137            tls_config,
138            find_free_port,
139            router,
140            trigger_app,
141            component_trigger_configs,
142            component_handler_types,
143        })
144    }
145
146    fn handler_type_for_component(
147        trigger_app: &TriggerApp<F>,
148        component_id: &str,
149        executor: &Option<HttpExecutorType>,
150    ) -> anyhow::Result<HandlerType> {
151        let pre = trigger_app.get_instance_pre(component_id)?;
152        let handler_type = match executor {
153            None | Some(HttpExecutorType::Http) => HandlerType::from_instance_pre(pre)?,
154            Some(HttpExecutorType::Wagi(wagi_config)) => {
155                anyhow::ensure!(
156                    wagi_config.entrypoint == "_start",
157                    "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
158                );
159                HandlerType::Wagi(
160                    CommandIndices::new(pre)
161                        .context("failed to find wasi command interface for wagi executor")?,
162                )
163            }
164        };
165        Ok(handler_type)
166    }
167
168    /// Serve incoming requests over the provided [`TcpListener`].
169    pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
170        let listener: TcpListener = if self.find_free_port {
171            self.search_for_free_port().await?
172        } else {
173            TcpListener::bind(self.listen_addr).await.map_err(|err| {
174                if err.kind() == ErrorKind::AddrInUse {
175                    anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
176                } else {
177                    anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
178                }
179            })?
180        };
181
182        if let Some(tls_config) = self.tls_config.clone() {
183            self.serve_https(listener, tls_config).await?;
184        } else {
185            self.serve_http(listener).await?;
186        }
187        Ok(())
188    }
189
190    async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
191        let mut found_listener = None;
192        let mut addr = self.listen_addr;
193
194        for _ in 1..=MAX_RETRIES {
195            if addr.port() == u16::MAX {
196                anyhow::bail!(
197                    "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
198                );
199            }
200
201            match TcpListener::bind(addr).await {
202                Ok(listener) => {
203                    found_listener = Some(listener);
204                    break;
205                }
206                Err(err) if err.kind() == ErrorKind::AddrInUse => {
207                    addr.set_port(addr.port() + 1);
208                    continue;
209                }
210                Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
211            }
212        }
213
214        found_listener.ok_or_else(|| anyhow::anyhow!(
215            "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
216            self.listen_addr.port(),
217            self.listen_addr.port() + MAX_RETRIES
218        ))
219    }
220
221    async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
222        self.print_startup_msgs("http", &listener)?;
223        loop {
224            let (stream, client_addr) = listener.accept().await?;
225            self.clone()
226                .serve_connection(stream, Scheme::HTTP, client_addr);
227        }
228    }
229
230    async fn serve_https(
231        self: Arc<Self>,
232        listener: TcpListener,
233        tls_config: TlsConfig,
234    ) -> anyhow::Result<()> {
235        self.print_startup_msgs("https", &listener)?;
236        let acceptor = tls_config.server_config()?;
237        loop {
238            let (stream, client_addr) = listener.accept().await?;
239            match acceptor.accept(stream).await {
240                Ok(stream) => self
241                    .clone()
242                    .serve_connection(stream, Scheme::HTTPS, client_addr),
243                Err(err) => tracing::error!(?err, "Failed to start TLS session"),
244            }
245        }
246    }
247
248    /// Handles incoming requests using an HTTP executor.
249    ///
250    /// This method handles well known paths and routes requests to the handler when the router
251    /// matches the requests path.
252    pub async fn handle(
253        self: &Arc<Self>,
254        mut req: Request<Body>,
255        server_scheme: Scheme,
256        client_addr: SocketAddr,
257    ) -> anyhow::Result<Response<Body>> {
258        strip_forbidden_headers(&mut req);
259
260        spin_telemetry::extract_trace_context(&req);
261
262        let path = req.uri().path().to_string();
263
264        tracing::info!("Processing request on path '{path}'");
265
266        // Handle well-known spin paths
267        if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
268            return match well_known {
269                "health" => Ok(MatchedRoute::with_response_extension(
270                    Response::new(body::full(Bytes::from_static(b"OK"))),
271                    path,
272                )),
273                "info" => self.app_info(path),
274                _ => Self::not_found(NotFoundRouteKind::WellKnown),
275            };
276        }
277
278        match self.router.route(&path) {
279            Ok(route_match) => {
280                self.handle_trigger_route(req, route_match, server_scheme, client_addr)
281                    .await
282            }
283            Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
284        }
285    }
286
287    /// Handles a successful route match.
288    pub async fn handle_trigger_route(
289        self: &Arc<Self>,
290        mut req: Request<Body>,
291        route_match: RouteMatch<'_, '_>,
292        server_scheme: Scheme,
293        client_addr: SocketAddr,
294    ) -> anyhow::Result<Response<Body>> {
295        set_req_uri(&mut req, server_scheme.clone())?;
296        let app_id = self
297            .trigger_app
298            .app()
299            .get_metadata(APP_NAME_KEY)?
300            .unwrap_or_else(|| "<unnamed>".into());
301
302        let lookup_key = route_match.lookup_key();
303
304        spin_telemetry::metrics::monotonic_counter!(
305            spin.request_count = 1,
306            trigger_type = "http",
307            app_id = app_id,
308            component_id = lookup_key.to_string()
309        );
310
311        let trigger_config = self
312            .component_trigger_configs
313            .get(lookup_key)
314            .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
315
316        match (&trigger_config.component, &trigger_config.static_response) {
317            (Some(component), None) => {
318                self.respond_wasm_component(
319                    req,
320                    route_match,
321                    server_scheme,
322                    client_addr,
323                    component,
324                    &trigger_config.executor,
325                )
326                .await
327            }
328            (None, Some(static_response)) => {
329                Self::respond_static_response(static_response)
330            },
331            // These error cases should have been ruled out by this point but belt and braces
332            (None, None) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - neither is specified for {}", route_match.raw_route())),
333            (Some(_), Some(_)) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - both are specified for {}", route_match.raw_route())),
334        }
335    }
336
337    async fn respond_wasm_component(
338        self: &Arc<Self>,
339        req: Request<Body>,
340        route_match: RouteMatch<'_, '_>,
341        server_scheme: Scheme,
342        client_addr: SocketAddr,
343        component_id: &str,
344        executor: &Option<HttpExecutorType>,
345    ) -> anyhow::Result<Response<Body>> {
346        let mut instance_builder = self.trigger_app.prepare(component_id)?;
347
348        // Set up outbound HTTP request origin and service chaining
349        // The outbound HTTP factor is required since both inbound and outbound wasi HTTP
350        // implementations assume they use the same underlying wasmtime resource storage.
351        // Eventually, we may be able to factor this out to a separate factor.
352        let outbound_http = instance_builder
353            .factor_builder::<OutboundHttpFactor>()
354            .context(
355            "The wasi HTTP trigger was configured without the required wasi outbound http support",
356        )?;
357        let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
358        outbound_http.set_self_request_origin(origin);
359        outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
360
361        // Prepare HTTP executor
362        let handler_type = self
363            .component_handler_types
364            .get(component_id)
365            .with_context(|| format!("unknown component ID {component_id:?}"))?;
366        let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
367
368        let res = match executor {
369            HttpExecutorType::Http => match handler_type {
370                HandlerType::Spin => {
371                    SpinHttpExecutor
372                        .execute(instance_builder, &route_match, req, client_addr)
373                        .await
374                }
375                HandlerType::Wasi0_2(_)
376                | HandlerType::Wasi2023_11_10(_)
377                | HandlerType::Wasi2023_10_18(_) => {
378                    WasiHttpExecutor { handler_type }
379                        .execute(instance_builder, &route_match, req, client_addr)
380                        .await
381                }
382                HandlerType::Wagi(_) => unreachable!(),
383            },
384            HttpExecutorType::Wagi(wagi_config) => {
385                let indices = match handler_type {
386                    HandlerType::Wagi(indices) => indices,
387                    _ => unreachable!(),
388                };
389                let executor = WagiHttpExecutor {
390                    wagi_config,
391                    indices,
392                };
393                executor
394                    .execute(instance_builder, &route_match, req, client_addr)
395                    .await
396            }
397        };
398        match res {
399            Ok(res) => Ok(MatchedRoute::with_response_extension(
400                res,
401                route_match.raw_route(),
402            )),
403            Err(err) => {
404                tracing::error!("Error processing request: {err:?}");
405                instrument_error(&err);
406                Self::internal_error(None, route_match.raw_route())
407            }
408        }
409    }
410
411    fn respond_static_response(
412        sr: &spin_http::config::StaticResponse,
413    ) -> anyhow::Result<Response<Body>> {
414        let mut response = Response::builder();
415
416        response = response.status(sr.status());
417        for (header_name, header_value) in sr.headers() {
418            response = response.header(header_name, header_value);
419        }
420
421        let body = match sr.body() {
422            Some(b) => body::full(b.clone().into()),
423            None => body::empty(),
424        };
425
426        Ok(response.body(body)?)
427    }
428
429    /// Returns spin status information.
430    fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
431        let info = AppInfo::new(self.trigger_app.app());
432        let body = serde_json::to_vec_pretty(&info)?;
433        Ok(MatchedRoute::with_response_extension(
434            Response::builder()
435                .header("content-type", "application/json")
436                .body(body::full(body.into()))?,
437            route,
438        ))
439    }
440
441    /// Creates an HTTP 500 response.
442    fn internal_error(
443        body: Option<&str>,
444        route: impl Into<String>,
445    ) -> anyhow::Result<Response<Body>> {
446        let body = match body {
447            Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
448            None => body::empty(),
449        };
450
451        Ok(MatchedRoute::with_response_extension(
452            Response::builder()
453                .status(StatusCode::INTERNAL_SERVER_ERROR)
454                .body(body)?,
455            route,
456        ))
457    }
458
459    /// Creates an HTTP 404 response.
460    fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
461        use std::sync::atomic::{AtomicBool, Ordering};
462        static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
463        if let NotFoundRouteKind::Normal(route) = kind {
464            if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
465                && std::io::stderr().is_terminal()
466            {
467                terminal::warn!("Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route.");
468            }
469        }
470        Ok(Response::builder()
471            .status(StatusCode::NOT_FOUND)
472            .body(body::empty())?)
473    }
474
475    fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
476        self: Arc<Self>,
477        stream: S,
478        server_scheme: Scheme,
479        client_addr: SocketAddr,
480    ) {
481        task::spawn(async move {
482            if let Err(err) = Builder::new(TokioExecutor::new())
483                .serve_connection(
484                    TokioIo::new(stream),
485                    service_fn(move |request| {
486                        self.clone().instrumented_service_fn(
487                            server_scheme.clone(),
488                            client_addr,
489                            request,
490                        )
491                    }),
492                )
493                .await
494            {
495                tracing::warn!("Error serving HTTP connection: {err:?}");
496            }
497        });
498    }
499
500    async fn instrumented_service_fn(
501        self: Arc<Self>,
502        server_scheme: Scheme,
503        client_addr: SocketAddr,
504        request: Request<Incoming>,
505    ) -> anyhow::Result<Response<HyperOutgoingBody>> {
506        let span = http_span!(request, client_addr);
507        let method = request.method().to_string();
508        async {
509            let result = self
510                .handle(
511                    request.map(|body: Incoming| {
512                        body.map_err(wasmtime_wasi_http::hyper_response_error)
513                            .boxed()
514                    }),
515                    server_scheme,
516                    client_addr,
517                )
518                .await;
519            finalize_http_span(result, method)
520        }
521        .instrument(span)
522        .await
523    }
524
525    fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
526        let local_addr = listener.local_addr()?;
527        let base_url = format!("{scheme}://{local_addr:?}");
528        terminal::step!("\nServing", "{base_url}");
529        tracing::info!("Serving {base_url}");
530
531        println!("Available Routes:");
532        for (route, key) in self.router.routes() {
533            println!("  {key}: {base_url}{route}");
534            if let spin_http::routes::TriggerLookupKey::Component(component_id) = &key {
535                if let Some(component) = self.trigger_app.app().get_component(component_id) {
536                    if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
537                        println!("    {description}");
538                    }
539                }
540            }
541        }
542        Ok(())
543    }
544}
545
546/// The incoming request's scheme and authority
547///
548/// The incoming request's URI is relative to the server, so we need to set the scheme and authority.
549/// Either the `Host` header or the request's URI's authority is used as the source of truth for the authority.
550/// This function will error if the authority cannot be unambiguously determined.
551fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
552    let uri = req.uri().clone();
553    let mut parts = uri.into_parts();
554    let headers = req.headers();
555    let header_authority = headers
556        .get(http::header::HOST)
557        .map(|h| -> anyhow::Result<Authority> {
558            let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
559            host_header
560                .parse()
561                .context("'Host' header contains an invalid authority")
562        })
563        .transpose()?;
564    let uri_authority = parts.authority;
565
566    // Get authority either from request URI or from 'Host' header
567    let authority = match (header_authority, uri_authority) {
568        (None, None) => bail!("no 'Host' header present in request"),
569        (None, Some(a)) => a,
570        (Some(a), None) => a,
571        (Some(a1), Some(a2)) => {
572            // Ensure that if `req.authority` is set, it matches what was in the `Host` header
573            // https://github.com/hyperium/hyper/issues/1612
574            if a1 != a2 {
575                return Err(anyhow::anyhow!(
576                    "authority in 'Host' header does not match authority in URI"
577                ));
578            }
579            a1
580        }
581    };
582    parts.scheme = Some(scheme);
583    parts.authority = Some(authority);
584    *req.uri_mut() = Uri::from_parts(parts).unwrap();
585    Ok(())
586}
587
588/// An HTTP executor.
589pub(crate) trait HttpExecutor {
590    fn execute<F: RuntimeFactors>(
591        &self,
592        instance_builder: TriggerInstanceBuilder<F>,
593        route_match: &RouteMatch<'_, '_>,
594        req: Request<Body>,
595        client_addr: SocketAddr,
596    ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
597}