Skip to main content

spin_factor_key_value/
host.rs

1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{
4    async_trait,
5    wasmtime::component::{Accessor, FutureReader, Resource, StreamReader},
6};
7use spin_factor_otel::OtelFactorState;
8use spin_resource_table::Table;
9use spin_telemetry::traces::{self, Blame};
10use spin_world::MAX_HOST_BUFFERED_BYTES;
11use spin_world::spin::key_value::key_value as v3;
12use spin_world::v2::key_value;
13use spin_world::wasi::keyvalue as wasi_keyvalue;
14use std::{any::Any, collections::HashSet, sync::Arc};
15use tracing::instrument;
16
17const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
18
19pub use key_value::Error;
20
21#[async_trait]
22pub trait StoreManager: Sync + Send {
23    async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
24    fn is_defined(&self, store_name: &str) -> bool;
25
26    /// A human-readable summary of the given store's configuration
27    ///
28    /// Example: "Redis at localhost:1234"
29    fn summary(&self, store_name: &str) -> Option<String> {
30        let _ = store_name;
31        None
32    }
33
34    /// Metadata about the store manager that can be accessed for data collection or debugging purposes. This is opaque and can be in any format the store manager chooses.
35    fn metadata(&self) -> Arc<dyn Any> {
36        Arc::new(())
37    }
38}
39
40#[async_trait]
41pub trait Store: Sync + Send {
42    async fn after_open(&self) -> Result<(), Error> {
43        Ok(())
44    }
45    async fn get(&self, key: &str, max_result_bytes: usize) -> Result<Option<Vec<u8>>, Error>;
46    async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
47    async fn delete(&self, key: &str) -> Result<(), Error>;
48    async fn exists(&self, key: &str) -> Result<bool, Error>;
49    async fn get_keys(&self, max_result_bytes: usize) -> Result<Vec<String>, Error>;
50    async fn get_keys_async(
51        &self,
52        max_result_bytes: usize,
53    ) -> (
54        tokio::sync::mpsc::Receiver<String>,
55        tokio::sync::oneshot::Receiver<Result<(), v3::Error>>,
56    );
57    async fn get_many(
58        &self,
59        keys: Vec<String>,
60        max_result_bytes: usize,
61    ) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
62    async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
63    async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
64    async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
65    async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
66    -> Result<Arc<dyn Cas>, Error>;
67}
68
69pub struct KeyValueDispatch {
70    allowed_stores: HashSet<String>,
71    manager: Arc<dyn StoreManager>,
72    stores: Table<Arc<dyn Store>>,
73    compare_and_swaps: Table<Arc<dyn Cas>>,
74    otel: OtelFactorState,
75}
76
77impl KeyValueDispatch {
78    pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
79        Self::new_with_capacity(
80            allowed_stores,
81            manager,
82            DEFAULT_STORE_TABLE_CAPACITY,
83            Default::default(),
84        )
85    }
86
87    pub fn new_with_capacity(
88        allowed_stores: HashSet<String>,
89        manager: Arc<dyn StoreManager>,
90        capacity: u32,
91        otel: OtelFactorState,
92    ) -> Self {
93        Self {
94            allowed_stores,
95            manager,
96            stores: Table::new(capacity),
97            compare_and_swaps: Table::new(capacity),
98            otel,
99        }
100    }
101
102    pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
103        let res = self.stores.get(store.rep()).context("invalid store");
104        if let Err(err) = &res {
105            traces::mark_as_error(err, Some(Blame::Host));
106        }
107        res
108    }
109
110    pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
111        self.compare_and_swaps
112            .get(cas.rep())
113            .context("invalid compare and swap")
114    }
115
116    pub fn allowed_stores(&self) -> &HashSet<String> {
117        &self.allowed_stores
118    }
119
120    pub fn get_store_wasi<T: 'static>(
121        &self,
122        store: Resource<T>,
123    ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
124        self.stores
125            .get(store.rep())
126            .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
127    }
128
129    pub fn get_cas_wasi<T: 'static>(
130        &self,
131        cas: Resource<T>,
132    ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
133        self.compare_and_swaps
134            .get(cas.rep())
135            .ok_or(wasi_keyvalue::atomics::Error::Other(
136                "compare and swap not found".to_string(),
137            ))
138    }
139
140    pub fn manager_metadata(&self) -> Arc<dyn Any> {
141        self.manager.metadata()
142    }
143}
144
145impl key_value::Host for KeyValueDispatch {}
146
147impl key_value::HostStore for KeyValueDispatch {
148    #[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
149    async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
150        self.otel.reparent_tracing_span();
151        Ok(async {
152            if self.allowed_stores.contains(&name) {
153                let store = self.manager.get(&name).await?;
154                store.after_open().await?;
155                let store_idx = self
156                    .stores
157                    .push(store)
158                    .map_err(|()| Error::StoreTableFull)?;
159                Ok(Resource::new_own(store_idx))
160            } else {
161                Err(Error::AccessDenied)
162            }
163        }
164        .await)
165    }
166
167    #[instrument(name = "spin_key_value.get", skip_all, fields(otel.kind = "client"))]
168    async fn get(
169        &mut self,
170        store: Resource<key_value::Store>,
171        key: String,
172    ) -> Result<Result<Option<Vec<u8>>, Error>> {
173        self.otel.reparent_tracing_span();
174        let store = self.get_store(store)?;
175        Ok(store
176            .get(&key, MAX_HOST_BUFFERED_BYTES)
177            .await
178            .map_err(track_error_on_span))
179    }
180
181    #[instrument(name = "spin_key_value.set", skip_all, fields(otel.kind = "client"))]
182    async fn set(
183        &mut self,
184        store: Resource<key_value::Store>,
185        key: String,
186        value: Vec<u8>,
187    ) -> Result<Result<(), Error>> {
188        self.otel.reparent_tracing_span();
189        let store = self.get_store(store)?;
190        Ok(store.set(&key, &value).await.map_err(track_error_on_span))
191    }
192
193    #[instrument(name = "spin_key_value.delete", skip_all, fields(otel.kind = "client"))]
194    async fn delete(
195        &mut self,
196        store: Resource<key_value::Store>,
197        key: String,
198    ) -> Result<Result<(), Error>> {
199        self.otel.reparent_tracing_span();
200        let store = self.get_store(store)?;
201        Ok(store.delete(&key).await.map_err(track_error_on_span))
202    }
203
204    #[instrument(name = "spin_key_value.exists", skip_all, fields(otel.kind = "client"))]
205    async fn exists(
206        &mut self,
207        store: Resource<key_value::Store>,
208        key: String,
209    ) -> Result<Result<bool, Error>> {
210        self.otel.reparent_tracing_span();
211        let store = self.get_store(store)?;
212        Ok(store.exists(&key).await.map_err(track_error_on_span))
213    }
214
215    #[instrument(name = "spin_key_value.get_keys", skip_all, fields(otel.kind = "client"))]
216    async fn get_keys(
217        &mut self,
218        store: Resource<key_value::Store>,
219    ) -> Result<Result<Vec<String>, Error>> {
220        self.otel.reparent_tracing_span();
221        let store = self.get_store(store)?;
222        Ok(store
223            .get_keys(MAX_HOST_BUFFERED_BYTES)
224            .await
225            .map_err(track_error_on_span))
226    }
227
228    async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
229        self.stores.remove(store.rep());
230        Ok(())
231    }
232}
233
234impl spin_core::wasmtime::component::HasData for KeyValueDispatch {
235    type Data<'a> = &'a mut KeyValueDispatch;
236}
237
238impl v3::Host for KeyValueDispatch {
239    fn convert_error(&mut self, err: v3::Error) -> anyhow::Result<v3::Error> {
240        Ok(err)
241    }
242}
243
244impl v3::HostStore for KeyValueDispatch {
245    async fn drop(&mut self, store: Resource<v3::Store>) -> Result<()> {
246        self.stores.remove(store.rep());
247        Ok(())
248    }
249}
250
251impl v3::HostStoreWithStore for crate::KeyValueFactorData {
252    async fn open<T>(
253        accessor: &Accessor<T, Self>,
254        label: String,
255    ) -> Result<Resource<v3::Store>, v3::Error> {
256        let (allowed, manager) = accessor.with(|mut access| {
257            let host = access.get();
258            host.otel.reparent_tracing_span();
259            (host.allowed_stores.contains(&label), host.manager.clone())
260        });
261
262        if !allowed {
263            return Err(v3::Error::AccessDenied);
264        }
265
266        let store = manager.get(&label).await.map_err(to_v3_err)?;
267        store.after_open().await.map_err(to_v3_err)?;
268
269        accessor.with(|mut access| {
270            let host = access.get();
271            host.stores
272                .push(store)
273                .map(Resource::new_own)
274                .map_err(|()| v3::Error::StoreTableFull)
275        })
276    }
277
278    async fn get<T>(
279        accessor: &Accessor<T, Self>,
280        store: Resource<v3::Store>,
281        key: String,
282    ) -> Result<Option<Vec<u8>>, v3::Error> {
283        let store = accessor
284            .with(|mut access| {
285                let host = access.get();
286                host.otel.reparent_tracing_span();
287                host.get_store(store).cloned()
288            })
289            .map_err(|_| v3::Error::NoSuchStore)?;
290        store
291            .get(&key, MAX_HOST_BUFFERED_BYTES)
292            .await
293            .map_err(to_v3_err)
294            .map_err(track_error_on_span_v3)
295    }
296
297    async fn set<T>(
298        accessor: &Accessor<T, Self>,
299        store: Resource<v3::Store>,
300        key: String,
301        value: Vec<u8>,
302    ) -> Result<(), v3::Error> {
303        let store = accessor
304            .with(|mut access| {
305                let host = access.get();
306                host.otel.reparent_tracing_span();
307                host.get_store(store).cloned()
308            })
309            .map_err(|_| v3::Error::NoSuchStore)?;
310        store
311            .set(&key, &value)
312            .await
313            .map_err(to_v3_err)
314            .map_err(track_error_on_span_v3)
315    }
316
317    async fn delete<T>(
318        accessor: &Accessor<T, Self>,
319        store: Resource<v3::Store>,
320        key: String,
321    ) -> Result<(), v3::Error> {
322        let store = accessor
323            .with(|mut access| {
324                let host = access.get();
325                host.otel.reparent_tracing_span();
326                host.get_store(store).cloned()
327            })
328            .map_err(|_| v3::Error::NoSuchStore)?;
329        store
330            .delete(&key)
331            .await
332            .map_err(to_v3_err)
333            .map_err(track_error_on_span_v3)
334    }
335
336    async fn exists<T>(
337        accessor: &Accessor<T, Self>,
338        store: Resource<v3::Store>,
339        key: String,
340    ) -> Result<bool, v3::Error> {
341        let store = accessor
342            .with(|mut access| {
343                let host = access.get();
344                host.otel.reparent_tracing_span();
345                host.get_store(store).cloned()
346            })
347            .map_err(|_| v3::Error::NoSuchStore)?;
348        store
349            .exists(&key)
350            .await
351            .map_err(to_v3_err)
352            .map_err(track_error_on_span_v3)
353    }
354
355    async fn get_keys<T>(
356        accessor: &Accessor<T, Self>,
357        store: Resource<v3::Store>,
358    ) -> Result<(StreamReader<String>, FutureReader<Result<(), v3::Error>>)> {
359        let store = accessor
360            .with(|mut access| {
361                let host = access.get();
362                host.otel.reparent_tracing_span();
363                host.get_store(store).cloned()
364            })
365            .map_err(|_| v3::Error::NoSuchStore)?;
366
367        let (keys_rx, err_rx) = store.get_keys_async(MAX_HOST_BUFFERED_BYTES).await;
368
369        let producer = spin_wasi_async::stream::producer(keys_rx);
370        let (ksr, efr) = accessor.with(|mut access| {
371            let ksr = StreamReader::new(&mut access, producer)?;
372            let efr = FutureReader::new(&mut access, err_rx)?;
373            anyhow::Ok((ksr, efr))
374        })?;
375
376        Ok((ksr, efr))
377    }
378}
379
380/// Make sure that infrastructure related errors are tracked in the current span.
381fn track_error_on_span(err: Error) -> Error {
382    let blame = match err {
383        Error::NoSuchStore | Error::AccessDenied => Blame::Guest,
384        Error::StoreTableFull | Error::Other(_) => Blame::Host,
385    };
386    traces::mark_as_error(&err, Some(blame));
387    err
388}
389
390/// Make sure that infrastructure related errors are tracked in the current span.
391fn track_error_on_span_v3(err: v3::Error) -> v3::Error {
392    let blame = match err {
393        v3::Error::NoSuchStore | v3::Error::AccessDenied => Blame::Guest,
394        v3::Error::StoreTableFull | v3::Error::Other(_) => Blame::Host,
395    };
396    traces::mark_as_error(&err, Some(blame));
397    err
398}
399
400fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
401    match track_error_on_span(e) {
402        Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
403        Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
404        Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
405        Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
406    }
407}
408
409pub fn to_v3_err(e: Error) -> v3::Error {
410    match track_error_on_span(e) {
411        Error::AccessDenied => v3::Error::AccessDenied,
412        Error::NoSuchStore => v3::Error::NoSuchStore,
413        Error::StoreTableFull => v3::Error::StoreTableFull,
414        Error::Other(msg) => v3::Error::Other(msg),
415    }
416}
417
418impl wasi_keyvalue::store::Host for KeyValueDispatch {
419    #[instrument(name = "wasi_key_value.open", skip_all, fields(otel.kind = "client"))]
420    async fn open(
421        &mut self,
422        identifier: String,
423    ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
424        if self.allowed_stores.contains(&identifier) {
425            let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
426            store.after_open().await.map_err(to_wasi_err)?;
427            let store_idx = self
428                .stores
429                .push(store)
430                .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
431            Ok(Resource::new_own(store_idx))
432        } else {
433            Err(wasi_keyvalue::store::Error::AccessDenied)
434        }
435    }
436
437    fn convert_error(
438        &mut self,
439        error: spin_world::wasi::keyvalue::store::Error,
440    ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
441        Ok(error)
442    }
443}
444
445use wasi_keyvalue::store::Bucket;
446impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
447    #[instrument(name = "wasi_key_value.get", skip_all, fields(otel.kind = "client"))]
448    async fn get(
449        &mut self,
450        self_: Resource<Bucket>,
451        key: String,
452    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
453        let store = self.get_store_wasi(self_)?;
454        store
455            .get(&key, MAX_HOST_BUFFERED_BYTES)
456            .await
457            .map_err(to_wasi_err)
458    }
459
460    #[instrument(name = "wasi_key_value.set", skip_all, fields(otel.kind = "client"))]
461    async fn set(
462        &mut self,
463        self_: Resource<Bucket>,
464        key: String,
465        value: Vec<u8>,
466    ) -> Result<(), wasi_keyvalue::store::Error> {
467        let store = self.get_store_wasi(self_)?;
468        store.set(&key, &value).await.map_err(to_wasi_err)
469    }
470
471    #[instrument(name = "wasi_key_value.delete", skip_all, fields(otel.kind = "client"))]
472    async fn delete(
473        &mut self,
474        self_: Resource<Bucket>,
475        key: String,
476    ) -> Result<(), wasi_keyvalue::store::Error> {
477        let store = self.get_store_wasi(self_)?;
478        store.delete(&key).await.map_err(to_wasi_err)
479    }
480
481    #[instrument(name = "wasi_key_value.exists", skip_all, fields(otel.kind = "client"))]
482    async fn exists(
483        &mut self,
484        self_: Resource<Bucket>,
485        key: String,
486    ) -> Result<bool, wasi_keyvalue::store::Error> {
487        let store = self.get_store_wasi(self_)?;
488        store.exists(&key).await.map_err(to_wasi_err)
489    }
490
491    #[instrument(name = "wasi_key_value.list_keys", skip_all, fields(otel.kind = "client"))]
492    async fn list_keys(
493        &mut self,
494        self_: Resource<Bucket>,
495        cursor: Option<String>,
496    ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
497        match cursor {
498            Some(_) => Err(wasi_keyvalue::store::Error::Other(
499                "list_keys: cursor not supported".to_owned(),
500            )),
501            None => {
502                let store = self.get_store_wasi(self_)?;
503                let keys = store
504                    .get_keys(MAX_HOST_BUFFERED_BYTES)
505                    .await
506                    .map_err(to_wasi_err)?;
507                Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
508            }
509        }
510    }
511
512    async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
513        self.stores.remove(rep.rep());
514        Ok(())
515    }
516}
517
518impl wasi_keyvalue::batch::Host for KeyValueDispatch {
519    #[instrument(name = "spin_key_value.get_many", skip_all, fields(otel.kind = "client"))]
520    #[allow(clippy::type_complexity)]
521    async fn get_many(
522        &mut self,
523        bucket: Resource<wasi_keyvalue::batch::Bucket>,
524        keys: Vec<String>,
525    ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
526        let store = self.get_store_wasi(bucket)?;
527        if keys.is_empty() {
528            return Ok(vec![]);
529        }
530        store
531            .get_many(keys, MAX_HOST_BUFFERED_BYTES)
532            .await
533            .map_err(to_wasi_err)
534    }
535
536    #[instrument(name = "spin_key_value.set_many", skip_all, fields(otel.kind = "client"))]
537    async fn set_many(
538        &mut self,
539        bucket: Resource<wasi_keyvalue::batch::Bucket>,
540        key_values: Vec<(String, Vec<u8>)>,
541    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
542        let store = self.get_store_wasi(bucket)?;
543        if key_values.is_empty() {
544            return Ok(());
545        }
546        store.set_many(key_values).await.map_err(to_wasi_err)
547    }
548
549    #[instrument(name = "spin_key_value.delete_many", skip_all, fields(otel.kind = "client"))]
550    async fn delete_many(
551        &mut self,
552        bucket: Resource<wasi_keyvalue::batch::Bucket>,
553        keys: Vec<String>,
554    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
555        let store = self.get_store_wasi(bucket)?;
556        if keys.is_empty() {
557            return Ok(());
558        }
559        store.delete_many(keys).await.map_err(to_wasi_err)
560    }
561}
562
563impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
564    #[instrument(name = "wasi_key_value_cas.new", skip_all, fields(otel.kind = "client"))]
565    async fn new(
566        &mut self,
567        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
568        key: String,
569    ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
570        let bucket_rep = bucket.rep();
571        let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
572        let store = self.get_store_wasi(bucket)?;
573        let cas = store
574            .new_compare_and_swap(bucket_rep, &key)
575            .await
576            .map_err(to_wasi_err)?;
577        self.compare_and_swaps
578            .push(cas)
579            .map_err(|()| {
580                spin_world::wasi::keyvalue::store::Error::Other(
581                    "too many compare_and_swaps opened".to_string(),
582                )
583            })
584            .map(Resource::new_own)
585    }
586
587    #[instrument(name = "wasi_key_value_cas.current", skip_all, fields(otel.kind = "client"))]
588    async fn current(
589        &mut self,
590        cas: Resource<wasi_keyvalue::atomics::Cas>,
591    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
592        let cas = self
593            .get_cas(cas)
594            .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
595        cas.current(MAX_HOST_BUFFERED_BYTES)
596            .await
597            .map_err(to_wasi_err)
598    }
599
600    async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
601        self.compare_and_swaps.remove(rep.rep());
602        Ok(())
603    }
604}
605
606impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
607    fn convert_cas_error(
608        &mut self,
609        error: spin_world::wasi::keyvalue::atomics::CasError,
610    ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
611        Ok(error)
612    }
613
614    #[instrument(name = "spin_key_value.increment", skip_all, fields(otel.kind = "client"))]
615    async fn increment(
616        &mut self,
617        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
618        key: String,
619        delta: i64,
620    ) -> Result<i64, wasi_keyvalue::store::Error> {
621        let store = self.get_store_wasi(bucket)?;
622        store.increment(key, delta).await.map_err(to_wasi_err)
623    }
624
625    #[instrument(name = "spin_key_value.swap", skip_all, fields(otel.kind = "client"))]
626    async fn swap(
627        &mut self,
628        cas_res: Resource<atomics::Cas>,
629        value: Vec<u8>,
630    ) -> Result<(), CasError> {
631        let cas_rep = cas_res.rep();
632        let cas = self
633            .get_cas(Resource::<Bucket>::new_own(cas_rep))
634            .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
635
636        match cas.swap(value).await {
637            Ok(_) => Ok(()),
638            Err(err) => match err {
639                SwapError::CasFailed(_) => {
640                    let bucket = Resource::new_own(cas.bucket_rep().await);
641                    let new_cas = self
642                        .new(bucket, cas.key().await)
643                        .await
644                        .map_err(CasError::StoreError)?;
645                    let new_cas_rep = new_cas.rep();
646                    self.current(Resource::new_own(new_cas_rep))
647                        .await
648                        .map_err(CasError::StoreError)?;
649                    let res = Resource::new_own(new_cas_rep);
650                    Err(CasError::CasFailed(res))
651                }
652                SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
653            },
654        }
655    }
656}
657
658pub fn log_error(err: impl std::fmt::Debug) -> Error {
659    tracing::warn!("key-value error: {err:?}");
660    Error::Other(format!("{err:?}"))
661}
662
663pub fn log_error_v3(err: impl std::fmt::Debug) -> v3::Error {
664    tracing::warn!("key-value error: {err:?}");
665    v3::Error::Other(format!("{err:?}"))
666}
667
668pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
669    tracing::warn!("key-value error: {err:?}");
670    SwapError::Other(format!("{err:?}"))
671}
672
673use spin_world::v1::key_value::Error as LegacyError;
674use spin_world::wasi::keyvalue::atomics;
675use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
676
677fn to_legacy_error(err: Error) -> LegacyError {
678    match err {
679        Error::StoreTableFull => LegacyError::StoreTableFull,
680        Error::NoSuchStore => LegacyError::NoSuchStore,
681        Error::AccessDenied => LegacyError::AccessDenied,
682        Error::Other(s) => LegacyError::Io(s),
683    }
684}
685
686impl spin_world::v1::key_value::Host for KeyValueDispatch {
687    async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
688        let result = <Self as key_value::HostStore>::open(self, name).await?;
689        Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
690    }
691
692    async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
693        let this = Resource::new_borrow(store);
694        let result = <Self as key_value::HostStore>::get(self, this, key).await?;
695        Ok(result
696            .map_err(to_legacy_error)
697            .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
698    }
699
700    async fn set(
701        &mut self,
702        store: u32,
703        key: String,
704        value: Vec<u8>,
705    ) -> Result<Result<(), LegacyError>> {
706        let this = Resource::new_borrow(store);
707        let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
708        Ok(result.map_err(to_legacy_error))
709    }
710
711    async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
712        let this = Resource::new_borrow(store);
713        let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
714        Ok(result.map_err(to_legacy_error))
715    }
716
717    async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
718        let this = Resource::new_borrow(store);
719        let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
720        Ok(result.map_err(to_legacy_error))
721    }
722
723    async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
724        let this = Resource::new_borrow(store);
725        let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
726        Ok(result.map_err(to_legacy_error))
727    }
728
729    async fn close(&mut self, store: u32) -> Result<()> {
730        let this = Resource::new_borrow(store);
731        <Self as key_value::HostStore>::drop(self, this).await
732    }
733}