spin_factor_key_value/
host.rs

1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{async_trait, wasmtime::component::Resource};
4use spin_resource_table::Table;
5use spin_telemetry::traces::{self, Blame};
6use spin_world::v2::key_value;
7use spin_world::wasi::keyvalue as wasi_keyvalue;
8use std::{collections::HashSet, sync::Arc};
9use tracing::instrument;
10
11const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
12
13pub use key_value::Error;
14
15#[async_trait]
16pub trait StoreManager: Sync + Send {
17    async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
18    fn is_defined(&self, store_name: &str) -> bool;
19
20    /// A human-readable summary of the given store's configuration
21    ///
22    /// Example: "Redis at localhost:1234"
23    fn summary(&self, store_name: &str) -> Option<String> {
24        let _ = store_name;
25        None
26    }
27}
28
29#[async_trait]
30pub trait Store: Sync + Send {
31    async fn after_open(&self) -> Result<(), Error> {
32        Ok(())
33    }
34    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error>;
35    async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
36    async fn delete(&self, key: &str) -> Result<(), Error>;
37    async fn exists(&self, key: &str) -> Result<bool, Error>;
38    async fn get_keys(&self) -> Result<Vec<String>, Error>;
39    async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
40    async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
41    async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
42    async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
43    async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
44        -> Result<Arc<dyn Cas>, Error>;
45}
46
47pub struct KeyValueDispatch {
48    allowed_stores: HashSet<String>,
49    manager: Arc<dyn StoreManager>,
50    stores: Table<Arc<dyn Store>>,
51    compare_and_swaps: Table<Arc<dyn Cas>>,
52}
53
54impl KeyValueDispatch {
55    pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
56        Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
57    }
58
59    pub fn new_with_capacity(
60        allowed_stores: HashSet<String>,
61        manager: Arc<dyn StoreManager>,
62        capacity: u32,
63    ) -> Self {
64        Self {
65            allowed_stores,
66            manager,
67            stores: Table::new(capacity),
68            compare_and_swaps: Table::new(capacity),
69        }
70    }
71
72    pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
73        let res = self.stores.get(store.rep()).context("invalid store");
74        if let Err(err) = &res {
75            traces::mark_as_error(err, Some(Blame::Host));
76        }
77        res
78    }
79
80    pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
81        self.compare_and_swaps
82            .get(cas.rep())
83            .context("invalid compare and swap")
84    }
85
86    pub fn allowed_stores(&self) -> &HashSet<String> {
87        &self.allowed_stores
88    }
89
90    pub fn get_store_wasi<T: 'static>(
91        &self,
92        store: Resource<T>,
93    ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
94        self.stores
95            .get(store.rep())
96            .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
97    }
98
99    pub fn get_cas_wasi<T: 'static>(
100        &self,
101        cas: Resource<T>,
102    ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
103        self.compare_and_swaps
104            .get(cas.rep())
105            .ok_or(wasi_keyvalue::atomics::Error::Other(
106                "compare and swap not found".to_string(),
107            ))
108    }
109}
110
111impl key_value::Host for KeyValueDispatch {}
112
113impl key_value::HostStore for KeyValueDispatch {
114    #[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
115    async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
116        Ok(async {
117            if self.allowed_stores.contains(&name) {
118                let store = self.manager.get(&name).await?;
119                store.after_open().await?;
120                let store_idx = self
121                    .stores
122                    .push(store)
123                    .map_err(|()| Error::StoreTableFull)?;
124                Ok(Resource::new_own(store_idx))
125            } else {
126                Err(Error::AccessDenied)
127            }
128        }
129        .await)
130    }
131
132    #[instrument(name = "spin_key_value.get", skip_all, fields(otel.kind = "client"))]
133    async fn get(
134        &mut self,
135        store: Resource<key_value::Store>,
136        key: String,
137    ) -> Result<Result<Option<Vec<u8>>, Error>> {
138        let store = self.get_store(store)?;
139        Ok(store.get(&key).await.map_err(track_error_on_span))
140    }
141
142    #[instrument(name = "spin_key_value.set", skip_all, fields(otel.kind = "client"))]
143    async fn set(
144        &mut self,
145        store: Resource<key_value::Store>,
146        key: String,
147        value: Vec<u8>,
148    ) -> Result<Result<(), Error>> {
149        let store = self.get_store(store)?;
150        Ok(store.set(&key, &value).await.map_err(track_error_on_span))
151    }
152
153    #[instrument(name = "spin_key_value.delete", skip_all, fields(otel.kind = "client"))]
154    async fn delete(
155        &mut self,
156        store: Resource<key_value::Store>,
157        key: String,
158    ) -> Result<Result<(), Error>> {
159        let store = self.get_store(store)?;
160        Ok(store.delete(&key).await.map_err(track_error_on_span))
161    }
162
163    #[instrument(name = "spin_key_value.exists", skip_all, fields(otel.kind = "client"))]
164    async fn exists(
165        &mut self,
166        store: Resource<key_value::Store>,
167        key: String,
168    ) -> Result<Result<bool, Error>> {
169        let store = self.get_store(store)?;
170        Ok(store.exists(&key).await.map_err(track_error_on_span))
171    }
172
173    #[instrument(name = "spin_key_value.get_keys", skip_all, fields(otel.kind = "client"))]
174    async fn get_keys(
175        &mut self,
176        store: Resource<key_value::Store>,
177    ) -> Result<Result<Vec<String>, Error>> {
178        let store = self.get_store(store)?;
179        Ok(store.get_keys().await.map_err(track_error_on_span))
180    }
181
182    async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
183        self.stores.remove(store.rep());
184        Ok(())
185    }
186}
187
188/// Make sure that infrastructure related errors are tracked in the current span.
189fn track_error_on_span(err: Error) -> Error {
190    let blame = match err {
191        Error::NoSuchStore | Error::AccessDenied => Blame::Guest,
192        Error::StoreTableFull | Error::Other(_) => Blame::Host,
193    };
194    traces::mark_as_error(&err, Some(blame));
195    err
196}
197
198fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
199    match track_error_on_span(e) {
200        Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
201        Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
202        Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
203        Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
204    }
205}
206
207impl wasi_keyvalue::store::Host for KeyValueDispatch {
208    #[instrument(name = "wasi_key_value.open", skip_all, fields(otel.kind = "client"))]
209    async fn open(
210        &mut self,
211        identifier: String,
212    ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
213        if self.allowed_stores.contains(&identifier) {
214            let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
215            store.after_open().await.map_err(to_wasi_err)?;
216            let store_idx = self
217                .stores
218                .push(store)
219                .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
220            Ok(Resource::new_own(store_idx))
221        } else {
222            Err(wasi_keyvalue::store::Error::AccessDenied)
223        }
224    }
225
226    fn convert_error(
227        &mut self,
228        error: spin_world::wasi::keyvalue::store::Error,
229    ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
230        Ok(error)
231    }
232}
233
234use wasi_keyvalue::store::Bucket;
235impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
236    #[instrument(name = "wasi_key_value.get", skip_all, fields(otel.kind = "client"))]
237    async fn get(
238        &mut self,
239        self_: Resource<Bucket>,
240        key: String,
241    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
242        let store = self.get_store_wasi(self_)?;
243        store.get(&key).await.map_err(to_wasi_err)
244    }
245
246    #[instrument(name = "wasi_key_value.set", skip_all, fields(otel.kind = "client"))]
247    async fn set(
248        &mut self,
249        self_: Resource<Bucket>,
250        key: String,
251        value: Vec<u8>,
252    ) -> Result<(), wasi_keyvalue::store::Error> {
253        let store = self.get_store_wasi(self_)?;
254        store.set(&key, &value).await.map_err(to_wasi_err)
255    }
256
257    #[instrument(name = "wasi_key_value.delete", skip_all, fields(otel.kind = "client"))]
258    async fn delete(
259        &mut self,
260        self_: Resource<Bucket>,
261        key: String,
262    ) -> Result<(), wasi_keyvalue::store::Error> {
263        let store = self.get_store_wasi(self_)?;
264        store.delete(&key).await.map_err(to_wasi_err)
265    }
266
267    #[instrument(name = "wasi_key_value.exists", skip_all, fields(otel.kind = "client"))]
268    async fn exists(
269        &mut self,
270        self_: Resource<Bucket>,
271        key: String,
272    ) -> Result<bool, wasi_keyvalue::store::Error> {
273        let store = self.get_store_wasi(self_)?;
274        store.exists(&key).await.map_err(to_wasi_err)
275    }
276
277    #[instrument(name = "wasi_key_value.list_keys", skip_all, fields(otel.kind = "client"))]
278    async fn list_keys(
279        &mut self,
280        self_: Resource<Bucket>,
281        cursor: Option<String>,
282    ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
283        match cursor {
284            Some(_) => Err(wasi_keyvalue::store::Error::Other(
285                "list_keys: cursor not supported".to_owned(),
286            )),
287            None => {
288                let store = self.get_store_wasi(self_)?;
289                let keys = store.get_keys().await.map_err(to_wasi_err)?;
290                Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
291            }
292        }
293    }
294
295    async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
296        self.stores.remove(rep.rep());
297        Ok(())
298    }
299}
300
301impl wasi_keyvalue::batch::Host for KeyValueDispatch {
302    #[instrument(name = "spin_key_value.get_many", skip_all, fields(otel.kind = "client"))]
303    #[allow(clippy::type_complexity)]
304    async fn get_many(
305        &mut self,
306        bucket: Resource<wasi_keyvalue::batch::Bucket>,
307        keys: Vec<String>,
308    ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
309        let store = self.get_store_wasi(bucket)?;
310        if keys.is_empty() {
311            return Ok(vec![]);
312        }
313        store.get_many(keys).await.map_err(to_wasi_err)
314    }
315
316    #[instrument(name = "spin_key_value.set_many", skip_all, fields(otel.kind = "client"))]
317    async fn set_many(
318        &mut self,
319        bucket: Resource<wasi_keyvalue::batch::Bucket>,
320        key_values: Vec<(String, Vec<u8>)>,
321    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
322        let store = self.get_store_wasi(bucket)?;
323        if key_values.is_empty() {
324            return Ok(());
325        }
326        store.set_many(key_values).await.map_err(to_wasi_err)
327    }
328
329    #[instrument(name = "spin_key_value.delete_many", skip_all, fields(otel.kind = "client"))]
330    async fn delete_many(
331        &mut self,
332        bucket: Resource<wasi_keyvalue::batch::Bucket>,
333        keys: Vec<String>,
334    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
335        let store = self.get_store_wasi(bucket)?;
336        if keys.is_empty() {
337            return Ok(());
338        }
339        store.delete_many(keys).await.map_err(to_wasi_err)
340    }
341}
342
343impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
344    #[instrument(name = "wasi_key_value_cas.new", skip_all, fields(otel.kind = "client"))]
345    async fn new(
346        &mut self,
347        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
348        key: String,
349    ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
350        let bucket_rep = bucket.rep();
351        let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
352        let store = self.get_store_wasi(bucket)?;
353        let cas = store
354            .new_compare_and_swap(bucket_rep, &key)
355            .await
356            .map_err(to_wasi_err)?;
357        self.compare_and_swaps
358            .push(cas)
359            .map_err(|()| {
360                spin_world::wasi::keyvalue::store::Error::Other(
361                    "too many compare_and_swaps opened".to_string(),
362                )
363            })
364            .map(Resource::new_own)
365    }
366
367    #[instrument(name = "wasi_key_value_cas.current", skip_all, fields(otel.kind = "client"))]
368    async fn current(
369        &mut self,
370        cas: Resource<wasi_keyvalue::atomics::Cas>,
371    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
372        let cas = self
373            .get_cas(cas)
374            .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
375        cas.current().await.map_err(to_wasi_err)
376    }
377
378    async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
379        self.compare_and_swaps.remove(rep.rep());
380        Ok(())
381    }
382}
383
384impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
385    fn convert_cas_error(
386        &mut self,
387        error: spin_world::wasi::keyvalue::atomics::CasError,
388    ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
389        Ok(error)
390    }
391
392    #[instrument(name = "spin_key_value.increment", skip_all, fields(otel.kind = "client"))]
393    async fn increment(
394        &mut self,
395        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
396        key: String,
397        delta: i64,
398    ) -> Result<i64, wasi_keyvalue::store::Error> {
399        let store = self.get_store_wasi(bucket)?;
400        store.increment(key, delta).await.map_err(to_wasi_err)
401    }
402
403    #[instrument(name = "spin_key_value.swap", skip_all, fields(otel.kind = "client"))]
404    async fn swap(
405        &mut self,
406        cas_res: Resource<atomics::Cas>,
407        value: Vec<u8>,
408    ) -> Result<(), CasError> {
409        let cas_rep = cas_res.rep();
410        let cas = self
411            .get_cas(Resource::<Bucket>::new_own(cas_rep))
412            .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
413
414        match cas.swap(value).await {
415            Ok(_) => Ok(()),
416            Err(err) => match err {
417                SwapError::CasFailed(_) => {
418                    let bucket = Resource::new_own(cas.bucket_rep().await);
419                    let new_cas = self
420                        .new(bucket, cas.key().await)
421                        .await
422                        .map_err(CasError::StoreError)?;
423                    let new_cas_rep = new_cas.rep();
424                    self.current(Resource::new_own(new_cas_rep))
425                        .await
426                        .map_err(CasError::StoreError)?;
427                    let res = Resource::new_own(new_cas_rep);
428                    Err(CasError::CasFailed(res))
429                }
430                SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
431            },
432        }
433    }
434}
435
436pub fn log_error(err: impl std::fmt::Debug) -> Error {
437    tracing::warn!("key-value error: {err:?}");
438    Error::Other(format!("{err:?}"))
439}
440
441pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
442    tracing::warn!("key-value error: {err:?}");
443    SwapError::Other(format!("{err:?}"))
444}
445
446use spin_world::v1::key_value::Error as LegacyError;
447use spin_world::wasi::keyvalue::atomics;
448use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
449
450fn to_legacy_error(err: Error) -> LegacyError {
451    match err {
452        Error::StoreTableFull => LegacyError::StoreTableFull,
453        Error::NoSuchStore => LegacyError::NoSuchStore,
454        Error::AccessDenied => LegacyError::AccessDenied,
455        Error::Other(s) => LegacyError::Io(s),
456    }
457}
458
459impl spin_world::v1::key_value::Host for KeyValueDispatch {
460    async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
461        let result = <Self as key_value::HostStore>::open(self, name).await?;
462        Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
463    }
464
465    async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
466        let this = Resource::new_borrow(store);
467        let result = <Self as key_value::HostStore>::get(self, this, key).await?;
468        Ok(result
469            .map_err(to_legacy_error)
470            .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
471    }
472
473    async fn set(
474        &mut self,
475        store: u32,
476        key: String,
477        value: Vec<u8>,
478    ) -> Result<Result<(), LegacyError>> {
479        let this = Resource::new_borrow(store);
480        let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
481        Ok(result.map_err(to_legacy_error))
482    }
483
484    async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
485        let this = Resource::new_borrow(store);
486        let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
487        Ok(result.map_err(to_legacy_error))
488    }
489
490    async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
491        let this = Resource::new_borrow(store);
492        let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
493        Ok(result.map_err(to_legacy_error))
494    }
495
496    async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
497        let this = Resource::new_borrow(store);
498        let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
499        Ok(result.map_err(to_legacy_error))
500    }
501
502    async fn close(&mut self, store: u32) -> Result<()> {
503        let this = Resource::new_borrow(store);
504        <Self as key_value::HostStore>::drop(self, this).await
505    }
506}