spin_factor_key_value/
host.rs

1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{async_trait, wasmtime::component::Resource};
4use spin_resource_table::Table;
5use spin_world::v2::key_value;
6use spin_world::wasi::keyvalue as wasi_keyvalue;
7use std::{collections::HashSet, sync::Arc};
8use tracing::{instrument, Level};
9
10const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
11
12pub use key_value::Error;
13
14#[async_trait]
15pub trait StoreManager: Sync + Send {
16    async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
17    fn is_defined(&self, store_name: &str) -> bool;
18
19    /// A human-readable summary of the given store's configuration
20    ///
21    /// Example: "Redis at localhost:1234"
22    fn summary(&self, store_name: &str) -> Option<String> {
23        let _ = store_name;
24        None
25    }
26}
27
28#[async_trait]
29pub trait Store: Sync + Send {
30    async fn after_open(&self) -> Result<(), Error> {
31        Ok(())
32    }
33    async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error>;
34    async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
35    async fn delete(&self, key: &str) -> Result<(), Error>;
36    async fn exists(&self, key: &str) -> Result<bool, Error>;
37    async fn get_keys(&self) -> Result<Vec<String>, Error>;
38    async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
39    async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
40    async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
41    async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
42    async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
43        -> Result<Arc<dyn Cas>, Error>;
44}
45
46pub struct KeyValueDispatch {
47    allowed_stores: HashSet<String>,
48    manager: Arc<dyn StoreManager>,
49    stores: Table<Arc<dyn Store>>,
50    compare_and_swaps: Table<Arc<dyn Cas>>,
51}
52
53impl KeyValueDispatch {
54    pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
55        Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
56    }
57
58    pub fn new_with_capacity(
59        allowed_stores: HashSet<String>,
60        manager: Arc<dyn StoreManager>,
61        capacity: u32,
62    ) -> Self {
63        Self {
64            allowed_stores,
65            manager,
66            stores: Table::new(capacity),
67            compare_and_swaps: Table::new(capacity),
68        }
69    }
70
71    pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
72        self.stores.get(store.rep()).context("invalid store")
73    }
74
75    pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
76        self.compare_and_swaps
77            .get(cas.rep())
78            .context("invalid compare and swap")
79    }
80
81    pub fn allowed_stores(&self) -> &HashSet<String> {
82        &self.allowed_stores
83    }
84
85    pub fn get_store_wasi<T: 'static>(
86        &self,
87        store: Resource<T>,
88    ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
89        self.stores
90            .get(store.rep())
91            .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
92    }
93
94    pub fn get_cas_wasi<T: 'static>(
95        &self,
96        cas: Resource<T>,
97    ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
98        self.compare_and_swaps
99            .get(cas.rep())
100            .ok_or(wasi_keyvalue::atomics::Error::Other(
101                "compare and swap not found".to_string(),
102            ))
103    }
104}
105
106impl key_value::Host for KeyValueDispatch {}
107
108impl key_value::HostStore for KeyValueDispatch {
109    #[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
110    async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
111        Ok(async {
112            if self.allowed_stores.contains(&name) {
113                let store = self.manager.get(&name).await?;
114                store.after_open().await?;
115                let store_idx = self
116                    .stores
117                    .push(store)
118                    .map_err(|()| Error::StoreTableFull)?;
119                Ok(Resource::new_own(store_idx))
120            } else {
121                Err(Error::AccessDenied)
122            }
123        }
124        .await)
125    }
126
127    #[instrument(name = "spin_key_value.get", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
128    async fn get(
129        &mut self,
130        store: Resource<key_value::Store>,
131        key: String,
132    ) -> Result<Result<Option<Vec<u8>>, Error>> {
133        let store = self.get_store(store)?;
134        Ok(store.get(&key).await)
135    }
136
137    #[instrument(name = "spin_key_value.set", skip(self, store, key, value), err(level = Level::INFO), fields(otel.kind = "client"))]
138    async fn set(
139        &mut self,
140        store: Resource<key_value::Store>,
141        key: String,
142        value: Vec<u8>,
143    ) -> Result<Result<(), Error>> {
144        let store = self.get_store(store)?;
145        Ok(store.set(&key, &value).await)
146    }
147
148    #[instrument(name = "spin_key_value.delete", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
149    async fn delete(
150        &mut self,
151        store: Resource<key_value::Store>,
152        key: String,
153    ) -> Result<Result<(), Error>> {
154        let store = self.get_store(store)?;
155        Ok(store.delete(&key).await)
156    }
157
158    #[instrument(name = "spin_key_value.exists", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
159    async fn exists(
160        &mut self,
161        store: Resource<key_value::Store>,
162        key: String,
163    ) -> Result<Result<bool, Error>> {
164        let store = self.get_store(store)?;
165        Ok(store.exists(&key).await)
166    }
167
168    #[instrument(name = "spin_key_value.get_keys", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
169    async fn get_keys(
170        &mut self,
171        store: Resource<key_value::Store>,
172    ) -> Result<Result<Vec<String>, Error>> {
173        let store = self.get_store(store)?;
174        Ok(store.get_keys().await)
175    }
176
177    async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
178        self.stores.remove(store.rep());
179        Ok(())
180    }
181}
182
183fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
184    match e {
185        Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
186        Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
187        Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
188        Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
189    }
190}
191
192impl wasi_keyvalue::store::Host for KeyValueDispatch {
193    async fn open(
194        &mut self,
195        identifier: String,
196    ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
197        if self.allowed_stores.contains(&identifier) {
198            let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
199            store.after_open().await.map_err(to_wasi_err)?;
200            let store_idx = self
201                .stores
202                .push(store)
203                .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
204            Ok(Resource::new_own(store_idx))
205        } else {
206            Err(wasi_keyvalue::store::Error::AccessDenied)
207        }
208    }
209
210    fn convert_error(
211        &mut self,
212        error: spin_world::wasi::keyvalue::store::Error,
213    ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
214        Ok(error)
215    }
216}
217
218use wasi_keyvalue::store::Bucket;
219impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
220    async fn get(
221        &mut self,
222        self_: Resource<Bucket>,
223        key: String,
224    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
225        let store = self.get_store_wasi(self_)?;
226        store.get(&key).await.map_err(to_wasi_err)
227    }
228
229    async fn set(
230        &mut self,
231        self_: Resource<Bucket>,
232        key: String,
233        value: Vec<u8>,
234    ) -> Result<(), wasi_keyvalue::store::Error> {
235        let store = self.get_store_wasi(self_)?;
236        store.set(&key, &value).await.map_err(to_wasi_err)
237    }
238
239    async fn delete(
240        &mut self,
241        self_: Resource<Bucket>,
242        key: String,
243    ) -> Result<(), wasi_keyvalue::store::Error> {
244        let store = self.get_store_wasi(self_)?;
245        store.delete(&key).await.map_err(to_wasi_err)
246    }
247
248    async fn exists(
249        &mut self,
250        self_: Resource<Bucket>,
251        key: String,
252    ) -> Result<bool, wasi_keyvalue::store::Error> {
253        let store = self.get_store_wasi(self_)?;
254        store.exists(&key).await.map_err(to_wasi_err)
255    }
256
257    async fn list_keys(
258        &mut self,
259        self_: Resource<Bucket>,
260        cursor: Option<String>,
261    ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
262        match cursor {
263            Some(_) => Err(wasi_keyvalue::store::Error::Other(
264                "list_keys: cursor not supported".to_owned(),
265            )),
266            None => {
267                let store = self.get_store_wasi(self_)?;
268                let keys = store.get_keys().await.map_err(to_wasi_err)?;
269                Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
270            }
271        }
272    }
273
274    async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
275        self.stores.remove(rep.rep());
276        Ok(())
277    }
278}
279
280impl wasi_keyvalue::batch::Host for KeyValueDispatch {
281    #[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
282    #[allow(clippy::type_complexity)]
283    async fn get_many(
284        &mut self,
285        bucket: Resource<wasi_keyvalue::batch::Bucket>,
286        keys: Vec<String>,
287    ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
288        let store = self.get_store_wasi(bucket)?;
289        if keys.is_empty() {
290            return Ok(vec![]);
291        }
292        store.get_many(keys).await.map_err(to_wasi_err)
293    }
294
295    #[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
296    async fn set_many(
297        &mut self,
298        bucket: Resource<wasi_keyvalue::batch::Bucket>,
299        key_values: Vec<(String, Vec<u8>)>,
300    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
301        let store = self.get_store_wasi(bucket)?;
302        if key_values.is_empty() {
303            return Ok(());
304        }
305        store.set_many(key_values).await.map_err(to_wasi_err)
306    }
307
308    #[instrument(name = "spin_key_value.delete_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
309    async fn delete_many(
310        &mut self,
311        bucket: Resource<wasi_keyvalue::batch::Bucket>,
312        keys: Vec<String>,
313    ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
314        let store = self.get_store_wasi(bucket)?;
315        if keys.is_empty() {
316            return Ok(());
317        }
318        store.delete_many(keys).await.map_err(to_wasi_err)
319    }
320}
321
322impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
323    async fn new(
324        &mut self,
325        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
326        key: String,
327    ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
328        let bucket_rep = bucket.rep();
329        let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
330        let store = self.get_store_wasi(bucket)?;
331        let cas = store
332            .new_compare_and_swap(bucket_rep, &key)
333            .await
334            .map_err(to_wasi_err)?;
335        self.compare_and_swaps
336            .push(cas)
337            .map_err(|()| {
338                spin_world::wasi::keyvalue::store::Error::Other(
339                    "too many compare_and_swaps opened".to_string(),
340                )
341            })
342            .map(Resource::new_own)
343    }
344
345    async fn current(
346        &mut self,
347        cas: Resource<wasi_keyvalue::atomics::Cas>,
348    ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
349        let cas = self
350            .get_cas(cas)
351            .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
352        cas.current().await.map_err(to_wasi_err)
353    }
354
355    async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
356        self.compare_and_swaps.remove(rep.rep());
357        Ok(())
358    }
359}
360
361impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
362    fn convert_cas_error(
363        &mut self,
364        error: spin_world::wasi::keyvalue::atomics::CasError,
365    ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
366        Ok(error)
367    }
368
369    #[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
370    async fn increment(
371        &mut self,
372        bucket: Resource<wasi_keyvalue::atomics::Bucket>,
373        key: String,
374        delta: i64,
375    ) -> Result<i64, wasi_keyvalue::store::Error> {
376        let store = self.get_store_wasi(bucket)?;
377        store.increment(key, delta).await.map_err(to_wasi_err)
378    }
379
380    #[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
381    async fn swap(
382        &mut self,
383        cas_res: Resource<atomics::Cas>,
384        value: Vec<u8>,
385    ) -> Result<(), CasError> {
386        let cas_rep = cas_res.rep();
387        let cas = self
388            .get_cas(Resource::<Bucket>::new_own(cas_rep))
389            .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
390
391        match cas.swap(value).await {
392            Ok(_) => Ok(()),
393            Err(err) => match err {
394                SwapError::CasFailed(_) => {
395                    let bucket = Resource::new_own(cas.bucket_rep().await);
396                    let new_cas = self
397                        .new(bucket, cas.key().await)
398                        .await
399                        .map_err(CasError::StoreError)?;
400                    let new_cas_rep = new_cas.rep();
401                    self.current(Resource::new_own(new_cas_rep))
402                        .await
403                        .map_err(CasError::StoreError)?;
404                    let res = Resource::new_own(new_cas_rep);
405                    Err(CasError::CasFailed(res))
406                }
407                SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
408            },
409        }
410    }
411}
412
413pub fn log_error(err: impl std::fmt::Debug) -> Error {
414    tracing::warn!("key-value error: {err:?}");
415    Error::Other(format!("{err:?}"))
416}
417
418pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
419    tracing::warn!("key-value error: {err:?}");
420    SwapError::Other(format!("{err:?}"))
421}
422
423use spin_world::v1::key_value::Error as LegacyError;
424use spin_world::wasi::keyvalue::atomics;
425use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
426
427fn to_legacy_error(value: key_value::Error) -> LegacyError {
428    match value {
429        Error::StoreTableFull => LegacyError::StoreTableFull,
430        Error::NoSuchStore => LegacyError::NoSuchStore,
431        Error::AccessDenied => LegacyError::AccessDenied,
432        Error::Other(s) => LegacyError::Io(s),
433    }
434}
435
436impl spin_world::v1::key_value::Host for KeyValueDispatch {
437    async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
438        let result = <Self as key_value::HostStore>::open(self, name).await?;
439        Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
440    }
441
442    async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
443        let this = Resource::new_borrow(store);
444        let result = <Self as key_value::HostStore>::get(self, this, key).await?;
445        Ok(result
446            .map_err(to_legacy_error)
447            .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
448    }
449
450    async fn set(
451        &mut self,
452        store: u32,
453        key: String,
454        value: Vec<u8>,
455    ) -> Result<Result<(), LegacyError>> {
456        let this = Resource::new_borrow(store);
457        let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
458        Ok(result.map_err(to_legacy_error))
459    }
460
461    async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
462        let this = Resource::new_borrow(store);
463        let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
464        Ok(result.map_err(to_legacy_error))
465    }
466
467    async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
468        let this = Resource::new_borrow(store);
469        let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
470        Ok(result.map_err(to_legacy_error))
471    }
472
473    async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
474        let this = Resource::new_borrow(store);
475        let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
476        Ok(result.map_err(to_legacy_error))
477    }
478
479    async fn close(&mut self, store: u32) -> Result<()> {
480        let this = Resource::new_borrow(store);
481        <Self as key_value::HostStore>::drop(self, this).await
482    }
483}