Skip to main content

spin_factor_key_value/
host.rs

1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{
4    async_trait,
5    wasmtime::component::{Accessor, FutureReader, Resource, StreamReader},
6};
7use spin_factor_otel::OtelFactorState;
8use spin_resource_table::Table;
9use spin_telemetry::traces::{self, Blame};
10use spin_world::spin::key_value::key_value as v3;
11use spin_world::v2::key_value;
12use spin_world::wasi::keyvalue as wasi_keyvalue;
13use spin_world::MAX_HOST_BUFFERED_BYTES;
14use std::{any::Any, collections::HashSet, sync::Arc};
15use tracing::instrument;
16
17const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
18
19pub use key_value::Error;
20
21#[async_trait]
22pub trait StoreManager: Sync + Send {
23    async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
24    fn is_defined(&self, store_name: &str) -> bool;
25
26    /// A human-readable summary of the given store's configuration
27    ///
28    /// Example: "Redis at localhost:1234"
29    fn summary(&self, store_name: &str) -> Option<String> {
30        let _ = store_name;
31        None
32    }
33
34    /// Metadata about the store manager that can be accessed for data collection or debugging purposes. This is opaque and can be in any format the store manager chooses.
35    fn metadata(&self) -> Arc<dyn Any> {
36        Arc::new(())
37    }
38}
39
40#[async_trait]
41pub trait Store: Sync + Send {
42    async fn after_open(&self) -> Result<(), Error> {
43        Ok(())
44    }
45    async fn get(&self, key: &str, max_result_bytes: usize) -> Result<Option<Vec<u8>>, Error>;
46    async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
47    async fn delete(&self, key: &str) -> Result<(), Error>;
48    async fn exists(&self, key: &str) -> Result<bool, Error>;
49    async fn get_keys(&self, max_result_bytes: usize) -> Result<Vec<String>, Error>;
50    async fn get_keys_async(
51        &self,
52        max_result_bytes: usize,
53    ) -> (
54        tokio::sync::mpsc::Receiver<String>,
55        tokio::sync::oneshot::Receiver<Result<(), v3::Error>>,
56    );
57    async fn get_many(
58        &self,
59        keys: Vec<String>,
60        max_result_bytes: usize,
61    ) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
62    async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
63    async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
64    async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
65    async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
66        -> Result<Arc<dyn Cas>, Error>;
67}
68
69pub struct KeyValueDispatch {
70    allowed_stores: HashSet<String>,
71    manager: Arc<dyn StoreManager>,
72    stores: Table<Arc<dyn Store>>,
73    compare_and_swaps: Table<Arc<dyn Cas>>,
74    otel: OtelFactorState,
75}
76
77impl KeyValueDispatch {
78    pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
79        Self::new_with_capacity(
80            allowed_stores,
81            manager,
82            DEFAULT_STORE_TABLE_CAPACITY,
83            Default::default(),
84        )
85    }
86
87    pub fn new_with_capacity(
88        allowed_stores: HashSet<String>,
89        manager: Arc<dyn StoreManager>,
90        capacity: u32,
91        otel: OtelFactorState,
92    ) -> Self {
93        Self {
94            allowed_stores,
95            manager,
96            stores: Table::new(capacity),
97            compare_and_swaps: Table::new(capacity),
98            otel,
99        }
100    }
101
102    pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
103        let res = self.stores.get(store.rep()).context("invalid store");
104        if let Err(err) = &res {
105            traces::mark_as_error(err, Some(Blame::Host));
106        }
107        res
108    }
109
110    pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
111        self.compare_and_swaps
112            .get(cas.rep())
113            .context("invalid compare and swap")
114    }
115
116    pub fn allowed_stores(&self) -> &HashSet<String> {
117        &self.allowed_stores
118    }
119
120    pub fn get_store_wasi<T: 'static>(
121        &self,
122        store: Resource<T>,
123    ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
124        self.stores
125            .get(store.rep())
126            .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
127    }
128
129    pub fn get_cas_wasi<T: 'static>(
130        &self,
131        cas: Resource<T>,
132    ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
133        self.compare_and_swaps
134            .get(cas.rep())
135            .ok_or(wasi_keyvalue::atomics::Error::Other(
136                "compare and swap not found".to_string(),
137            ))
138    }
139
140    pub fn manager_metadata(&self) -> Arc<dyn Any> {
141        self.manager.metadata()
142    }
143}
144
145impl key_value::Host for KeyValueDispatch {}
146
147impl key_value::HostStore for KeyValueDispatch {
148    #[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
149    async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
150        self.otel.reparent_tracing_span();
151        Ok(async {
152            if self.allowed_stores.contains(&name) {
153                let store = self.manager.get(&name).await?;
154                store.after_open().await?;
155                let store_idx = self
156                    .stores
157                    .push(store)
158                    .map_err(|()| Error::StoreTableFull)?;
159                Ok(Resource::new_own(store_idx))
160            } else {
161                Err(Error::AccessDenied)
162            }
163        }
164        .await)
165    }
166
167    #[instrument(name = "spin_key_value.get", skip_all, fields(otel.kind = "client"))]
168    async fn get(
169        &mut self,
170        store: Resource<key_value::Store>,
171        key: String,
172    ) -> Result<Result<Option<Vec<u8>>, Error>> {
173        self.otel.reparent_tracing_span();
174        let store = self.get_store(store)?;
175        Ok(store
176            .get(&key, MAX_HOST_BUFFERED_BYTES)
177            .await
178            .map_err(track_error_on_span))
179    }
180
181    #[instrument(name = "spin_key_value.set", skip_all, fields(otel.kind = "client"))]
182    async fn set(
183        &mut self,
184        store: Resource<key_value::Store>,
185        key: String,
186        value: Vec<u8>,
187    ) -> Result<Result<(), Error>> {
188        self.otel.reparent_tracing_span();
189        let store = self.get_store(store)?;
190        Ok(store.set(&key, &value).await.map_err(track_error_on_span))
191    }
192
193    #[instrument(name = "spin_key_value.delete", skip_all, fields(otel.kind = "client"))]
194    async fn delete(
195        &mut self,
196        store: Resource<key_value::Store>,
197        key: String,
198    ) -> Result<Result<(), Error>> {
199        self.otel.reparent_tracing_span();
200        let store = self.get_store(store)?;
201        Ok(store.delete(&key).await.map_err(track_error_on_span))
202    }
203
204    #[instrument(name = "spin_key_value.exists", skip_all, fields(otel.kind = "client"))]
205    async fn exists(
206        &mut self,
207        store: Resource<key_value::Store>,
208        key: String,
209    ) -> Result<Result<bool, Error>> {
210        self.otel.reparent_tracing_span();
211        let store = self.get_store(store)?;
212        Ok(store.exists(&key).await.map_err(track_error_on_span))
213    }
214
215    #[instrument(name = "spin_key_value.get_keys", skip_all, fields(otel.kind = "client"))]
216    async fn get_keys(
217        &mut self,
218        store: Resource<key_value::Store>,
219    ) -> Result<Result<Vec<String>, Error>> {
220        self.otel.reparent_tracing_span();
221        let store = self.get_store(store)?;
222        Ok(store
223            .get_keys(MAX_HOST_BUFFERED_BYTES)
224            .await
225            .map_err(track_error_on_span))
226    }
227
228    async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
229        self.stores.remove(store.rep());
230        Ok(())
231    }
232}
233
234impl spin_core::wasmtime::component::HasData for KeyValueDispatch {
235    type Data<'a> = &'a mut KeyValueDispatch;
236}
237
238impl v3::Host for KeyValueDispatch {}
239
240impl v3::HostStore for KeyValueDispatch {
241    async fn drop(&mut self, store: Resource<v3::Store>) -> Result<()> {
242        self.stores.remove(store.rep());
243        Ok(())
244    }
245}
246
247impl v3::HostStoreWithStore for crate::KeyValueFactorData {
248    async fn open<T>(
249        accessor: &Accessor<T, Self>,
250        label: String,
251    ) -> Result<Result<Resource<v3::Store>, v3::Error>> {
252        let (allowed, manager) = accessor.with(|mut access| {
253            let host = access.get();
254            host.otel.reparent_tracing_span();
255            (host.allowed_stores.contains(&label), host.manager.clone())
256        });
257
258        if !allowed {
259            return Ok(Err(v3::Error::AccessDenied));
260        }
261
262        let store = manager.get(&label).await?;
263        store.after_open().await?;
264
265        let rsrc = accessor.with(|mut access| {
266            let host = access.get();
267            host.stores
268                .push(store)
269                .map(Resource::new_own)
270                .map_err(|()| v3::Error::StoreTableFull)
271        });
272
273        Ok(rsrc)
274    }
275
276    async fn get<T>(
277        accessor: &Accessor<T, Self>,
278        store: Resource<v3::Store>,
279        key: String,
280    ) -> Result<Result<Option<Vec<u8>>, v3::Error>> {
281        let store = accessor.with(|mut access| {
282            let host = access.get();
283            host.otel.reparent_tracing_span();
284            host.get_store(store).cloned()
285        })?;
286        Ok(store
287            .get(&key, MAX_HOST_BUFFERED_BYTES)
288            .await
289            .map_err(to_v3_err)
290            .map_err(track_error_on_span_v3))
291    }
292
293    async fn set<T>(
294        accessor: &Accessor<T, Self>,
295        store: Resource<v3::Store>,
296        key: String,
297        value: Vec<u8>,
298    ) -> Result<Result<(), v3::Error>> {
299        let store = accessor.with(|mut access| {
300            let host = access.get();
301            host.otel.reparent_tracing_span();
302            host.get_store(store).cloned()
303        })?;
304        Ok(store
305            .set(&key, &value)
306            .await
307            .map_err(to_v3_err)
308            .map_err(track_error_on_span_v3))
309    }
310
311    async fn delete<T>(
312        accessor: &Accessor<T, Self>,
313        store: Resource<v3::Store>,
314        key: String,
315    ) -> Result<Result<(), v3::Error>> {
316        let store = accessor.with(|mut access| {
317            let host = access.get();
318            host.otel.reparent_tracing_span();
319            host.get_store(store).cloned()
320        })?;
321        Ok(store
322            .delete(&key)
323            .await
324            .map_err(to_v3_err)
325            .map_err(track_error_on_span_v3))
326    }
327
328    async fn exists<T>(
329        accessor: &Accessor<T, Self>,
330        store: Resource<v3::Store>,
331        key: String,
332    ) -> Result<Result<bool, v3::Error>> {
333        let store = accessor.with(|mut access| {
334            let host = access.get();
335            host.otel.reparent_tracing_span();
336            host.get_store(store).cloned()
337        })?;
338        Ok(store
339            .exists(&key)
340            .await
341            .map_err(to_v3_err)
342            .map_err(track_error_on_span_v3))
343    }
344
345    async fn get_keys<T>(
346        accessor: &Accessor<T, Self>,
347        store: Resource<v3::Store>,
348    ) -> Result<(StreamReader<String>, FutureReader<Result<(), v3::Error>>)> {
349        let store = accessor.with(|mut access| {
350            let host = access.get();
351            host.otel.reparent_tracing_span();
352            host.get_store(store).cloned()
353        })?;
354
355        let (keys_rx, err_rx) = store.get_keys_async(MAX_HOST_BUFFERED_BYTES).await;
356
357        let producer = spin_wasi_async::stream::producer(keys_rx);
358        let (ksr, efr) = accessor.with(|mut access| {
359            let ksr = StreamReader::new(&mut access, producer);
360            let efr = FutureReader::new(&mut access, err_rx);
361            (ksr, efr)
362        });
363
364        Ok((ksr, efr))
365    }
366}
367
368/// Make sure that infrastructure related errors are tracked in the current span.
369fn track_error_on_span(err: Error) -> Error {
370    let blame = match err {
371        Error::NoSuchStore | Error::AccessDenied => Blame::Guest,
372        Error::StoreTableFull | Error::Other(_) => Blame::Host,
373    };
374    traces::mark_as_error(&err, Some(blame));
375    err
376}
377
378/// Make sure that infrastructure related errors are tracked in the current span.
379fn track_error_on_span_v3(err: v3::Error) -> v3::Error {
380    let blame = match err {
381        v3::Error::NoSuchStore | v3::Error::AccessDenied => Blame::Guest,
382        v3::Error::StoreTableFull | v3::Error::Other(_) => Blame::Host,
383    };
384    traces::mark_as_error(&err, Some(blame));
385    err
386}
387
388fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
389    match track_error_on_span(e) {
390        Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
391        Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
392        Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
393        Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
394    }
395}
396
397pub fn to_v3_err(e: Error) -> v3::Error {
398    match track_error_on_span(e) {
399        Error::AccessDenied => v3::Error::AccessDenied,
400        Error::NoSuchStore => v3::Error::NoSuchStore,
401        Error::StoreTableFull => v3::Error::StoreTableFull,
402        Error::Other(msg) => v3::Error::Other(msg),
403    }
404}
405
406impl wasi_keyvalue::store::Host for KeyValueDispatch {
407    #[instrument(name = "wasi_key_value.open", skip_all, fields(otel.kind = "client"))]
408    async fn open(
409        &mut self,
410        identifier: String,
411    ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
412        if self.allowed_stores.contains(&identifier) {
413            let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
414            store.after_open().await.map_err(to_wasi_err)?;
415            let store_idx = self
416                .stores
417                .push(store)
418                .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
419            Ok(Resource::new_own(store_idx))
420        } else {
421            Err(wasi_keyvalue::store::Error::AccessDenied)
422        }
423    }
424
425    fn convert_error(
426        &mut self,
427        error: spin_world::wasi::keyvalue::store::Error,
428    ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
429        Ok(error)
430    }
431}
432
433use wasi_keyvalue::store::Bucket;
434impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
435    #[instrument(name = "wasi_key_value.get", skip_all, fields(otel.kind = "client"))]
436    async fn get(
437        &mut self,
438        self_: Resource<Bucket>,
439        key: String,
440    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
441        let store = self.get_store_wasi(self_)?;
442        store
443            .get(&key, MAX_HOST_BUFFERED_BYTES)
444            .await
445            .map_err(to_wasi_err)
446    }
447
448    #[instrument(name = "wasi_key_value.set", skip_all, fields(otel.kind = "client"))]
449    async fn set(
450        &mut self,
451        self_: Resource<Bucket>,
452        key: String,
453        value: Vec<u8>,
454    ) -> Result<(), wasi_keyvalue::store::Error> {
455        let store = self.get_store_wasi(self_)?;
456        store.set(&key, &value).await.map_err(to_wasi_err)
457    }
458
459    #[instrument(name = "wasi_key_value.delete", skip_all, fields(otel.kind = "client"))]
460    async fn delete(
461        &mut self,
462        self_: Resource<Bucket>,
463        key: String,
464    ) -> Result<(), wasi_keyvalue::store::Error> {
465        let store = self.get_store_wasi(self_)?;
466        store.delete(&key).await.map_err(to_wasi_err)
467    }
468
469    #[instrument(name = "wasi_key_value.exists", skip_all, fields(otel.kind = "client"))]
470    async fn exists(
471        &mut self,
472        self_: Resource<Bucket>,
473        key: String,
474    ) -> Result<bool, wasi_keyvalue::store::Error> {
475        let store = self.get_store_wasi(self_)?;
476        store.exists(&key).await.map_err(to_wasi_err)
477    }
478
479    #[instrument(name = "wasi_key_value.list_keys", skip_all, fields(otel.kind = "client"))]
480    async fn list_keys(
481        &mut self,
482        self_: Resource<Bucket>,
483        cursor: Option<String>,
484    ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
485        match cursor {
486            Some(_) => Err(wasi_keyvalue::store::Error::Other(
487                "list_keys: cursor not supported".to_owned(),
488            )),
489            None => {
490                let store = self.get_store_wasi(self_)?;
491                let keys = store
492                    .get_keys(MAX_HOST_BUFFERED_BYTES)
493                    .await
494                    .map_err(to_wasi_err)?;
495                Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
496            }
497        }
498    }
499
500    async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
501        self.stores.remove(rep.rep());
502        Ok(())
503    }
504}
505
506impl wasi_keyvalue::batch::Host for KeyValueDispatch {
507    #[instrument(name = "spin_key_value.get_many", skip_all, fields(otel.kind = "client"))]
508    #[allow(clippy::type_complexity)]
509    async fn get_many(
510        &mut self,
511        bucket: Resource<wasi_keyvalue::batch::Bucket>,
512        keys: Vec<String>,
513    ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
514        let store = self.get_store_wasi(bucket)?;
515        if keys.is_empty() {
516            return Ok(vec![]);
517        }
518        store
519            .get_many(keys, MAX_HOST_BUFFERED_BYTES)
520            .await
521            .map_err(to_wasi_err)
522    }
523
524    #[instrument(name = "spin_key_value.set_many", skip_all, fields(otel.kind = "client"))]
525    async fn set_many(
526        &mut self,
527        bucket: Resource<wasi_keyvalue::batch::Bucket>,
528        key_values: Vec<(String, Vec<u8>)>,
529    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
530        let store = self.get_store_wasi(bucket)?;
531        if key_values.is_empty() {
532            return Ok(());
533        }
534        store.set_many(key_values).await.map_err(to_wasi_err)
535    }
536
537    #[instrument(name = "spin_key_value.delete_many", skip_all, fields(otel.kind = "client"))]
538    async fn delete_many(
539        &mut self,
540        bucket: Resource<wasi_keyvalue::batch::Bucket>,
541        keys: Vec<String>,
542    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
543        let store = self.get_store_wasi(bucket)?;
544        if keys.is_empty() {
545            return Ok(());
546        }
547        store.delete_many(keys).await.map_err(to_wasi_err)
548    }
549}
550
551impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
552    #[instrument(name = "wasi_key_value_cas.new", skip_all, fields(otel.kind = "client"))]
553    async fn new(
554        &mut self,
555        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
556        key: String,
557    ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
558        let bucket_rep = bucket.rep();
559        let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
560        let store = self.get_store_wasi(bucket)?;
561        let cas = store
562            .new_compare_and_swap(bucket_rep, &key)
563            .await
564            .map_err(to_wasi_err)?;
565        self.compare_and_swaps
566            .push(cas)
567            .map_err(|()| {
568                spin_world::wasi::keyvalue::store::Error::Other(
569                    "too many compare_and_swaps opened".to_string(),
570                )
571            })
572            .map(Resource::new_own)
573    }
574
575    #[instrument(name = "wasi_key_value_cas.current", skip_all, fields(otel.kind = "client"))]
576    async fn current(
577        &mut self,
578        cas: Resource<wasi_keyvalue::atomics::Cas>,
579    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
580        let cas = self
581            .get_cas(cas)
582            .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
583        cas.current(MAX_HOST_BUFFERED_BYTES)
584            .await
585            .map_err(to_wasi_err)
586    }
587
588    async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
589        self.compare_and_swaps.remove(rep.rep());
590        Ok(())
591    }
592}
593
594impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
595    fn convert_cas_error(
596        &mut self,
597        error: spin_world::wasi::keyvalue::atomics::CasError,
598    ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
599        Ok(error)
600    }
601
602    #[instrument(name = "spin_key_value.increment", skip_all, fields(otel.kind = "client"))]
603    async fn increment(
604        &mut self,
605        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
606        key: String,
607        delta: i64,
608    ) -> Result<i64, wasi_keyvalue::store::Error> {
609        let store = self.get_store_wasi(bucket)?;
610        store.increment(key, delta).await.map_err(to_wasi_err)
611    }
612
613    #[instrument(name = "spin_key_value.swap", skip_all, fields(otel.kind = "client"))]
614    async fn swap(
615        &mut self,
616        cas_res: Resource<atomics::Cas>,
617        value: Vec<u8>,
618    ) -> Result<(), CasError> {
619        let cas_rep = cas_res.rep();
620        let cas = self
621            .get_cas(Resource::<Bucket>::new_own(cas_rep))
622            .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
623
624        match cas.swap(value).await {
625            Ok(_) => Ok(()),
626            Err(err) => match err {
627                SwapError::CasFailed(_) => {
628                    let bucket = Resource::new_own(cas.bucket_rep().await);
629                    let new_cas = self
630                        .new(bucket, cas.key().await)
631                        .await
632                        .map_err(CasError::StoreError)?;
633                    let new_cas_rep = new_cas.rep();
634                    self.current(Resource::new_own(new_cas_rep))
635                        .await
636                        .map_err(CasError::StoreError)?;
637                    let res = Resource::new_own(new_cas_rep);
638                    Err(CasError::CasFailed(res))
639                }
640                SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
641            },
642        }
643    }
644}
645
646pub fn log_error(err: impl std::fmt::Debug) -> Error {
647    tracing::warn!("key-value error: {err:?}");
648    Error::Other(format!("{err:?}"))
649}
650
651pub fn log_error_v3(err: impl std::fmt::Debug) -> v3::Error {
652    tracing::warn!("key-value error: {err:?}");
653    v3::Error::Other(format!("{err:?}"))
654}
655
656pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
657    tracing::warn!("key-value error: {err:?}");
658    SwapError::Other(format!("{err:?}"))
659}
660
661use spin_world::v1::key_value::Error as LegacyError;
662use spin_world::wasi::keyvalue::atomics;
663use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
664
665fn to_legacy_error(err: Error) -> LegacyError {
666    match err {
667        Error::StoreTableFull => LegacyError::StoreTableFull,
668        Error::NoSuchStore => LegacyError::NoSuchStore,
669        Error::AccessDenied => LegacyError::AccessDenied,
670        Error::Other(s) => LegacyError::Io(s),
671    }
672}
673
674impl spin_world::v1::key_value::Host for KeyValueDispatch {
675    async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
676        let result = <Self as key_value::HostStore>::open(self, name).await?;
677        Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
678    }
679
680    async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
681        let this = Resource::new_borrow(store);
682        let result = <Self as key_value::HostStore>::get(self, this, key).await?;
683        Ok(result
684            .map_err(to_legacy_error)
685            .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
686    }
687
688    async fn set(
689        &mut self,
690        store: u32,
691        key: String,
692        value: Vec<u8>,
693    ) -> Result<Result<(), LegacyError>> {
694        let this = Resource::new_borrow(store);
695        let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
696        Ok(result.map_err(to_legacy_error))
697    }
698
699    async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
700        let this = Resource::new_borrow(store);
701        let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
702        Ok(result.map_err(to_legacy_error))
703    }
704
705    async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
706        let this = Resource::new_borrow(store);
707        let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
708        Ok(result.map_err(to_legacy_error))
709    }
710
711    async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
712        let this = Resource::new_borrow(store);
713        let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
714        Ok(result.map_err(to_legacy_error))
715    }
716
717    async fn close(&mut self, store: u32) -> Result<()> {
718        let this = Resource::new_borrow(store);
719        <Self as key_value::HostStore>::drop(self, this).await
720    }
721}