spin_trigger_http/
server.rs

1use std::{
2    collections::HashMap,
3    future::Future,
4    io::{ErrorKind, IsTerminal},
5    net::SocketAddr,
6    sync::Arc,
7};
8
9use anyhow::{bail, Context};
10use http::{
11    uri::{Authority, Scheme},
12    Request, Response, StatusCode, Uri,
13};
14use http_body_util::BodyExt;
15use hyper::{
16    body::{Bytes, Incoming},
17    service::service_fn,
18};
19use hyper_util::{
20    rt::{TokioExecutor, TokioIo},
21    server::conn::auto::Builder,
22};
23use spin_app::{APP_DESCRIPTION_KEY, APP_NAME_KEY};
24use spin_factor_outbound_http::{OutboundHttpFactor, SelfRequestOrigin};
25use spin_factors::RuntimeFactors;
26use spin_http::{
27    app_info::AppInfo,
28    body,
29    config::{HttpExecutorType, HttpTriggerConfig},
30    routes::{RouteMatch, Router},
31    trigger::HandlerType,
32};
33use tokio::{
34    io::{AsyncRead, AsyncWrite},
35    net::TcpListener,
36    task,
37};
38use tracing::Instrument;
39use wasmtime_wasi::p2::bindings::CommandIndices;
40use wasmtime_wasi_http::body::HyperOutgoingBody;
41
42use crate::{
43    headers::strip_forbidden_headers,
44    instrument::{finalize_http_span, http_span, instrument_error, MatchedRoute},
45    outbound_http::OutboundHttpInterceptor,
46    spin::SpinHttpExecutor,
47    wagi::WagiHttpExecutor,
48    wasi::WasiHttpExecutor,
49    wasip3::Wasip3HttpExecutor,
50    Body, NotFoundRouteKind, TlsConfig, TriggerApp, TriggerInstanceBuilder,
51};
52
53pub const MAX_RETRIES: u16 = 10;
54
55/// An HTTP server which runs Spin apps.
56pub struct HttpServer<F: RuntimeFactors> {
57    /// The address the server is listening on.
58    listen_addr: SocketAddr,
59    /// The TLS configuration for the server.
60    tls_config: Option<TlsConfig>,
61    /// Whether to find a free port if the specified port is already in use.
62    find_free_port: bool,
63    /// Request router.
64    router: Router,
65    /// The app being triggered.
66    trigger_app: TriggerApp<F>,
67    // Component ID -> component trigger config
68    component_trigger_configs: HashMap<spin_http::routes::TriggerLookupKey, HttpTriggerConfig>,
69    // Component ID -> handler type
70    component_handler_types: HashMap<String, HandlerType>,
71}
72
73impl<F: RuntimeFactors> HttpServer<F> {
74    /// Create a new [`HttpServer`].
75    pub fn new(
76        listen_addr: SocketAddr,
77        tls_config: Option<TlsConfig>,
78        find_free_port: bool,
79        trigger_app: TriggerApp<F>,
80    ) -> anyhow::Result<Self> {
81        // This needs to be a vec before building the router to handle duplicate routes
82        let component_trigger_configs = trigger_app
83            .app()
84            .trigger_configs::<HttpTriggerConfig>("http")?
85            .into_iter()
86            .map(|(trigger_id, config)| config.lookup_key(trigger_id).map(|k| (k, config)))
87            .collect::<Result<Vec<_>, _>>()?;
88
89        // Build router
90        let component_routes = component_trigger_configs
91            .iter()
92            .map(|(key, config)| (key, &config.route));
93        let mut duplicate_routes = Vec::new();
94        let router = Router::build("/", component_routes, Some(&mut duplicate_routes))?;
95        if !duplicate_routes.is_empty() {
96            tracing::error!(
97                "The following component routes are duplicates and will never be used:"
98            );
99            for dup in &duplicate_routes {
100                tracing::error!(
101                    "  {}: {} (duplicate of {})",
102                    dup.replaced_id,
103                    dup.route(),
104                    dup.effective_id,
105                );
106            }
107        }
108        if router.contains_reserved_route() {
109            tracing::error!(
110                "Routes under {} are handled by the Spin runtime and will never be reached",
111                spin_http::WELL_KNOWN_PREFIX
112            );
113        }
114        tracing::trace!(
115            "Constructed router: {:?}",
116            router.routes().collect::<Vec<_>>()
117        );
118
119        // Now that router is built we can merge duplicate routes by component
120        let component_trigger_configs = HashMap::from_iter(component_trigger_configs);
121
122        let component_handler_types = component_trigger_configs
123            .iter()
124            .filter_map(|(key, trigger_config)| match key {
125                spin_http::routes::TriggerLookupKey::Component(component) => Some(
126                    Self::handler_type_for_component(
127                        &trigger_app,
128                        component,
129                        &trigger_config.executor,
130                    )
131                    .map(|ht| (component.clone(), ht)),
132                ),
133                spin_http::routes::TriggerLookupKey::Trigger(_) => None,
134            })
135            .collect::<anyhow::Result<_>>()?;
136        Ok(Self {
137            listen_addr,
138            tls_config,
139            find_free_port,
140            router,
141            trigger_app,
142            component_trigger_configs,
143            component_handler_types,
144        })
145    }
146
147    fn handler_type_for_component(
148        trigger_app: &TriggerApp<F>,
149        component_id: &str,
150        executor: &Option<HttpExecutorType>,
151    ) -> anyhow::Result<HandlerType> {
152        let pre = trigger_app.get_instance_pre(component_id)?;
153        let handler_type = match executor {
154            None | Some(HttpExecutorType::Http) | Some(HttpExecutorType::Wasip3Unstable) => {
155                let handler_type = HandlerType::from_instance_pre(pre)?;
156                handler_type.validate_executor(executor)?;
157                handler_type
158            }
159            Some(HttpExecutorType::Wagi(wagi_config)) => {
160                anyhow::ensure!(
161                    wagi_config.entrypoint == "_start",
162                    "Wagi component '{component_id}' cannot use deprecated 'entrypoint' field"
163                );
164                HandlerType::Wagi(
165                    CommandIndices::new(pre)
166                        .context("failed to find wasi command interface for wagi executor")?,
167                )
168            }
169        };
170        Ok(handler_type)
171    }
172
173    /// Serve incoming requests over the provided [`TcpListener`].
174    pub async fn serve(self: Arc<Self>) -> anyhow::Result<()> {
175        let listener: TcpListener = if self.find_free_port {
176            self.search_for_free_port().await?
177        } else {
178            TcpListener::bind(self.listen_addr).await.map_err(|err| {
179                if err.kind() == ErrorKind::AddrInUse {
180                    anyhow::anyhow!("{} is already in use. To have Spin search for a free port, use the --find-free-port option.", self.listen_addr)
181                } else {
182                    anyhow::anyhow!("Unable to listen on {}: {err:?}", self.listen_addr)
183                }
184            })?
185        };
186
187        if let Some(tls_config) = self.tls_config.clone() {
188            self.serve_https(listener, tls_config).await?;
189        } else {
190            self.serve_http(listener).await?;
191        }
192        Ok(())
193    }
194
195    async fn search_for_free_port(&self) -> anyhow::Result<TcpListener> {
196        let mut found_listener = None;
197        let mut addr = self.listen_addr;
198
199        for _ in 1..=MAX_RETRIES {
200            if addr.port() == u16::MAX {
201                anyhow::bail!(
202                    "Couldn't find a free port as we've reached the maximum port number. Consider retrying with a lower base port."
203                );
204            }
205
206            match TcpListener::bind(addr).await {
207                Ok(listener) => {
208                    found_listener = Some(listener);
209                    break;
210                }
211                Err(err) if err.kind() == ErrorKind::AddrInUse => {
212                    addr.set_port(addr.port() + 1);
213                    continue;
214                }
215                Err(err) => anyhow::bail!("Unable to listen on {addr}: {err:?}",),
216            }
217        }
218
219        found_listener.ok_or_else(|| anyhow::anyhow!(
220            "Couldn't find a free port in the range {}-{}. Consider retrying with a different base port.",
221            self.listen_addr.port(),
222            self.listen_addr.port() + MAX_RETRIES
223        ))
224    }
225
226    async fn serve_http(self: Arc<Self>, listener: TcpListener) -> anyhow::Result<()> {
227        self.print_startup_msgs("http", &listener)?;
228        loop {
229            let (stream, client_addr) = listener.accept().await?;
230            self.clone()
231                .serve_connection(stream, Scheme::HTTP, client_addr);
232        }
233    }
234
235    async fn serve_https(
236        self: Arc<Self>,
237        listener: TcpListener,
238        tls_config: TlsConfig,
239    ) -> anyhow::Result<()> {
240        self.print_startup_msgs("https", &listener)?;
241        let acceptor = tls_config.server_config()?;
242        loop {
243            let (stream, client_addr) = listener.accept().await?;
244            match acceptor.accept(stream).await {
245                Ok(stream) => self
246                    .clone()
247                    .serve_connection(stream, Scheme::HTTPS, client_addr),
248                Err(err) => tracing::error!(?err, "Failed to start TLS session"),
249            }
250        }
251    }
252
253    /// Handles incoming requests using an HTTP executor.
254    ///
255    /// This method handles well known paths and routes requests to the handler when the router
256    /// matches the requests path.
257    pub async fn handle(
258        self: &Arc<Self>,
259        mut req: Request<Body>,
260        server_scheme: Scheme,
261        client_addr: SocketAddr,
262    ) -> anyhow::Result<Response<Body>> {
263        strip_forbidden_headers(&mut req);
264
265        spin_telemetry::extract_trace_context(&req);
266
267        let path = req.uri().path().to_string();
268
269        tracing::info!("Processing request on path '{path}'");
270
271        // Handle well-known spin paths
272        if let Some(well_known) = path.strip_prefix(spin_http::WELL_KNOWN_PREFIX) {
273            return match well_known {
274                "health" => Ok(MatchedRoute::with_response_extension(
275                    Response::new(body::full(Bytes::from_static(b"OK"))),
276                    path,
277                )),
278                "info" => self.app_info(path),
279                _ => Self::not_found(NotFoundRouteKind::WellKnown),
280            };
281        }
282
283        match self.router.route(&path) {
284            Ok(route_match) => {
285                self.handle_trigger_route(req, route_match, server_scheme, client_addr)
286                    .await
287            }
288            Err(_) => Self::not_found(NotFoundRouteKind::Normal(path.to_string())),
289        }
290    }
291
292    /// Handles a successful route match.
293    pub async fn handle_trigger_route(
294        self: &Arc<Self>,
295        mut req: Request<Body>,
296        route_match: RouteMatch<'_, '_>,
297        server_scheme: Scheme,
298        client_addr: SocketAddr,
299    ) -> anyhow::Result<Response<Body>> {
300        set_req_uri(&mut req, server_scheme.clone())?;
301        let app_id = self
302            .trigger_app
303            .app()
304            .get_metadata(APP_NAME_KEY)?
305            .unwrap_or_else(|| "<unnamed>".into());
306
307        let lookup_key = route_match.lookup_key();
308
309        spin_telemetry::metrics::monotonic_counter!(
310            spin.request_count = 1,
311            trigger_type = "http",
312            app_id = app_id,
313            component_id = lookup_key.to_string()
314        );
315
316        let trigger_config = self
317            .component_trigger_configs
318            .get(lookup_key)
319            .with_context(|| format!("unknown routing destination '{lookup_key}'"))?;
320
321        match (&trigger_config.component, &trigger_config.static_response) {
322            (Some(component), None) => {
323                self.respond_wasm_component(
324                    req,
325                    route_match,
326                    server_scheme,
327                    client_addr,
328                    component,
329                    &trigger_config.executor,
330                )
331                .await
332            }
333            (None, Some(static_response)) => {
334                Self::respond_static_response(static_response)
335            },
336            // These error cases should have been ruled out by this point but belt and braces
337            (None, None) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - neither is specified for {}", route_match.raw_route())),
338            (Some(_), Some(_)) => Err(anyhow::anyhow!("Triggers must specify either component or static_response - both are specified for {}", route_match.raw_route())),
339        }
340    }
341
342    async fn respond_wasm_component(
343        self: &Arc<Self>,
344        req: Request<Body>,
345        route_match: RouteMatch<'_, '_>,
346        server_scheme: Scheme,
347        client_addr: SocketAddr,
348        component_id: &str,
349        executor: &Option<HttpExecutorType>,
350    ) -> anyhow::Result<Response<Body>> {
351        let mut instance_builder = self.trigger_app.prepare(component_id)?;
352
353        // Set up outbound HTTP request origin and service chaining
354        // The outbound HTTP factor is required since both inbound and outbound wasi HTTP
355        // implementations assume they use the same underlying wasmtime resource storage.
356        // Eventually, we may be able to factor this out to a separate factor.
357        let outbound_http = instance_builder
358            .factor_builder::<OutboundHttpFactor>()
359            .context(
360            "The wasi HTTP trigger was configured without the required wasi outbound http support",
361        )?;
362        let origin = SelfRequestOrigin::create(server_scheme, &self.listen_addr.to_string())?;
363        outbound_http.set_self_request_origin(origin);
364        outbound_http.set_request_interceptor(OutboundHttpInterceptor::new(self.clone()))?;
365
366        // Prepare HTTP executor
367        let handler_type = self
368            .component_handler_types
369            .get(component_id)
370            .with_context(|| format!("unknown component ID {component_id:?}"))?;
371        let executor = executor.as_ref().unwrap_or(&HttpExecutorType::Http);
372
373        let res = match executor {
374            HttpExecutorType::Http | HttpExecutorType::Wasip3Unstable => match handler_type {
375                HandlerType::Spin => {
376                    SpinHttpExecutor
377                        .execute(instance_builder, &route_match, req, client_addr)
378                        .await
379                }
380                HandlerType::Wasi0_3(indices) => {
381                    Wasip3HttpExecutor(indices)
382                        .execute(instance_builder, &route_match, req, client_addr)
383                        .await
384                }
385                HandlerType::Wasi0_2(_)
386                | HandlerType::Wasi2023_11_10(_)
387                | HandlerType::Wasi2023_10_18(_) => {
388                    WasiHttpExecutor { handler_type }
389                        .execute(instance_builder, &route_match, req, client_addr)
390                        .await
391                }
392                HandlerType::Wagi(_) => unreachable!(),
393            },
394            HttpExecutorType::Wagi(wagi_config) => {
395                let indices = match handler_type {
396                    HandlerType::Wagi(indices) => indices,
397                    _ => unreachable!(),
398                };
399                let executor = WagiHttpExecutor {
400                    wagi_config,
401                    indices,
402                };
403                executor
404                    .execute(instance_builder, &route_match, req, client_addr)
405                    .await
406            }
407        };
408        match res {
409            Ok(res) => Ok(MatchedRoute::with_response_extension(
410                res,
411                route_match.raw_route(),
412            )),
413            Err(err) => {
414                tracing::error!("Error processing request: {err:?}");
415                instrument_error(&err);
416                Self::internal_error(None, route_match.raw_route())
417            }
418        }
419    }
420
421    fn respond_static_response(
422        sr: &spin_http::config::StaticResponse,
423    ) -> anyhow::Result<Response<Body>> {
424        let mut response = Response::builder();
425
426        response = response.status(sr.status());
427        for (header_name, header_value) in sr.headers() {
428            response = response.header(header_name, header_value);
429        }
430
431        let body = match sr.body() {
432            Some(b) => body::full(b.clone().into()),
433            None => body::empty(),
434        };
435
436        Ok(response.body(body)?)
437    }
438
439    /// Returns spin status information.
440    fn app_info(&self, route: String) -> anyhow::Result<Response<Body>> {
441        let info = AppInfo::new(self.trigger_app.app());
442        let body = serde_json::to_vec_pretty(&info)?;
443        Ok(MatchedRoute::with_response_extension(
444            Response::builder()
445                .header("content-type", "application/json")
446                .body(body::full(body.into()))?,
447            route,
448        ))
449    }
450
451    /// Creates an HTTP 500 response.
452    fn internal_error(
453        body: Option<&str>,
454        route: impl Into<String>,
455    ) -> anyhow::Result<Response<Body>> {
456        let body = match body {
457            Some(body) => body::full(Bytes::copy_from_slice(body.as_bytes())),
458            None => body::empty(),
459        };
460
461        Ok(MatchedRoute::with_response_extension(
462            Response::builder()
463                .status(StatusCode::INTERNAL_SERVER_ERROR)
464                .body(body)?,
465            route,
466        ))
467    }
468
469    /// Creates an HTTP 404 response.
470    fn not_found(kind: NotFoundRouteKind) -> anyhow::Result<Response<Body>> {
471        use std::sync::atomic::{AtomicBool, Ordering};
472        static SHOWN_GENERIC_404_WARNING: AtomicBool = AtomicBool::new(false);
473        if let NotFoundRouteKind::Normal(route) = kind {
474            if !SHOWN_GENERIC_404_WARNING.fetch_or(true, Ordering::Relaxed)
475                && std::io::stderr().is_terminal()
476            {
477                terminal::warn!(
478                    "Request to {route} matched no pattern, and received a generic 404 response. To serve a more informative 404 page, add a catch-all (/...) route."
479                );
480            }
481        }
482        Ok(Response::builder()
483            .status(StatusCode::NOT_FOUND)
484            .body(body::empty())?)
485    }
486
487    fn serve_connection<S: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
488        self: Arc<Self>,
489        stream: S,
490        server_scheme: Scheme,
491        client_addr: SocketAddr,
492    ) {
493        task::spawn(async move {
494            if let Err(err) = Builder::new(TokioExecutor::new())
495                .serve_connection(
496                    TokioIo::new(stream),
497                    service_fn(move |request| {
498                        self.clone().instrumented_service_fn(
499                            server_scheme.clone(),
500                            client_addr,
501                            request,
502                        )
503                    }),
504                )
505                .await
506            {
507                tracing::warn!("Error serving HTTP connection: {err:?}");
508            }
509        });
510    }
511
512    async fn instrumented_service_fn(
513        self: Arc<Self>,
514        server_scheme: Scheme,
515        client_addr: SocketAddr,
516        request: Request<Incoming>,
517    ) -> anyhow::Result<Response<HyperOutgoingBody>> {
518        let span = http_span!(request, client_addr);
519        let method = request.method().to_string();
520        async {
521            let result = self
522                .handle(
523                    request.map(|body: Incoming| {
524                        body.map_err(wasmtime_wasi_http::hyper_response_error)
525                            .boxed()
526                    }),
527                    server_scheme,
528                    client_addr,
529                )
530                .await;
531            finalize_http_span(result, method)
532        }
533        .instrument(span)
534        .await
535    }
536
537    fn print_startup_msgs(&self, scheme: &str, listener: &TcpListener) -> anyhow::Result<()> {
538        let local_addr = listener.local_addr()?;
539        let base_url = format!("{scheme}://{local_addr:?}");
540        terminal::step!("\nServing", "{base_url}");
541        tracing::info!("Serving {base_url}");
542
543        println!("Available Routes:");
544        for (route, key) in self.router.routes() {
545            println!("  {key}: {base_url}{route}");
546            if let spin_http::routes::TriggerLookupKey::Component(component_id) = &key {
547                if let Some(component) = self.trigger_app.app().get_component(component_id) {
548                    if let Some(description) = component.get_metadata(APP_DESCRIPTION_KEY)? {
549                        println!("    {description}");
550                    }
551                }
552            }
553        }
554        Ok(())
555    }
556}
557
558/// The incoming request's scheme and authority
559///
560/// The incoming request's URI is relative to the server, so we need to set the scheme and authority.
561/// Either the `Host` header or the request's URI's authority is used as the source of truth for the authority.
562/// This function will error if the authority cannot be unambiguously determined.
563fn set_req_uri(req: &mut Request<Body>, scheme: Scheme) -> anyhow::Result<()> {
564    let uri = req.uri().clone();
565    let mut parts = uri.into_parts();
566    let headers = req.headers();
567    let header_authority = headers
568        .get(http::header::HOST)
569        .map(|h| -> anyhow::Result<Authority> {
570            let host_header = h.to_str().context("'Host' header is not valid UTF-8")?;
571            host_header
572                .parse()
573                .context("'Host' header contains an invalid authority")
574        })
575        .transpose()?;
576    let uri_authority = parts.authority;
577
578    // Get authority either from request URI or from 'Host' header
579    let authority = match (header_authority, uri_authority) {
580        (None, None) => bail!("no 'Host' header present in request"),
581        (None, Some(a)) => a,
582        (Some(a), None) => a,
583        (Some(a1), Some(a2)) => {
584            // Ensure that if `req.authority` is set, it matches what was in the `Host` header
585            // https://github.com/hyperium/hyper/issues/1612
586            if a1 != a2 {
587                return Err(anyhow::anyhow!(
588                    "authority in 'Host' header does not match authority in URI"
589                ));
590            }
591            a1
592        }
593    };
594    parts.scheme = Some(scheme);
595    parts.authority = Some(authority);
596    *req.uri_mut() = Uri::from_parts(parts).unwrap();
597    Ok(())
598}
599
600/// An HTTP executor.
601pub(crate) trait HttpExecutor {
602    fn execute<F: RuntimeFactors>(
603        &self,
604        instance_builder: TriggerInstanceBuilder<F>,
605        route_match: &RouteMatch<'_, '_>,
606        req: Request<Body>,
607        client_addr: SocketAddr,
608    ) -> impl Future<Output = anyhow::Result<Response<Body>>>;
609}