1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{async_trait, wasmtime::component::Resource};
4use spin_resource_table::Table;
5use spin_world::v2::key_value;
6use spin_world::wasi::keyvalue as wasi_keyvalue;
7use std::{collections::HashSet, sync::Arc};
8use tracing::{instrument, Level};
9
10const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
11
12pub use key_value::Error;
13
14#[async_trait]
15pub trait StoreManager: Sync + Send {
16 async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
17 fn is_defined(&self, store_name: &str) -> bool;
18
19 fn summary(&self, store_name: &str) -> Option<String> {
23 let _ = store_name;
24 None
25 }
26}
27
28#[async_trait]
29pub trait Store: Sync + Send {
30 async fn after_open(&self) -> Result<(), Error> {
31 Ok(())
32 }
33 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error>;
34 async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
35 async fn delete(&self, key: &str) -> Result<(), Error>;
36 async fn exists(&self, key: &str) -> Result<bool, Error>;
37 async fn get_keys(&self) -> Result<Vec<String>, Error>;
38 async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
39 async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
40 async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
41 async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
42 async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
43 -> Result<Arc<dyn Cas>, Error>;
44}
45
46pub struct KeyValueDispatch {
47 allowed_stores: HashSet<String>,
48 manager: Arc<dyn StoreManager>,
49 stores: Table<Arc<dyn Store>>,
50 compare_and_swaps: Table<Arc<dyn Cas>>,
51}
52
53impl KeyValueDispatch {
54 pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
55 Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
56 }
57
58 pub fn new_with_capacity(
59 allowed_stores: HashSet<String>,
60 manager: Arc<dyn StoreManager>,
61 capacity: u32,
62 ) -> Self {
63 Self {
64 allowed_stores,
65 manager,
66 stores: Table::new(capacity),
67 compare_and_swaps: Table::new(capacity),
68 }
69 }
70
71 pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
72 self.stores.get(store.rep()).context("invalid store")
73 }
74
75 pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
76 self.compare_and_swaps
77 .get(cas.rep())
78 .context("invalid compare and swap")
79 }
80
81 pub fn allowed_stores(&self) -> &HashSet<String> {
82 &self.allowed_stores
83 }
84
85 pub fn get_store_wasi<T: 'static>(
86 &self,
87 store: Resource<T>,
88 ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
89 self.stores
90 .get(store.rep())
91 .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
92 }
93
94 pub fn get_cas_wasi<T: 'static>(
95 &self,
96 cas: Resource<T>,
97 ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
98 self.compare_and_swaps
99 .get(cas.rep())
100 .ok_or(wasi_keyvalue::atomics::Error::Other(
101 "compare and swap not found".to_string(),
102 ))
103 }
104}
105
106impl key_value::Host for KeyValueDispatch {}
107
108impl key_value::HostStore for KeyValueDispatch {
109 #[instrument(name = "spin_key_value.open", skip(self), err(level = Level::INFO), fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
110 async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
111 Ok(async {
112 if self.allowed_stores.contains(&name) {
113 let store = self.manager.get(&name).await?;
114 store.after_open().await?;
115 let store_idx = self
116 .stores
117 .push(store)
118 .map_err(|()| Error::StoreTableFull)?;
119 Ok(Resource::new_own(store_idx))
120 } else {
121 Err(Error::AccessDenied)
122 }
123 }
124 .await)
125 }
126
127 #[instrument(name = "spin_key_value.get", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
128 async fn get(
129 &mut self,
130 store: Resource<key_value::Store>,
131 key: String,
132 ) -> Result<Result<Option<Vec<u8>>, Error>> {
133 let store = self.get_store(store)?;
134 Ok(store.get(&key).await)
135 }
136
137 #[instrument(name = "spin_key_value.set", skip(self, store, key, value), err(level = Level::INFO), fields(otel.kind = "client"))]
138 async fn set(
139 &mut self,
140 store: Resource<key_value::Store>,
141 key: String,
142 value: Vec<u8>,
143 ) -> Result<Result<(), Error>> {
144 let store = self.get_store(store)?;
145 Ok(store.set(&key, &value).await)
146 }
147
148 #[instrument(name = "spin_key_value.delete", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
149 async fn delete(
150 &mut self,
151 store: Resource<key_value::Store>,
152 key: String,
153 ) -> Result<Result<(), Error>> {
154 let store = self.get_store(store)?;
155 Ok(store.delete(&key).await)
156 }
157
158 #[instrument(name = "spin_key_value.exists", skip(self, store, key), err(level = Level::INFO), fields(otel.kind = "client"))]
159 async fn exists(
160 &mut self,
161 store: Resource<key_value::Store>,
162 key: String,
163 ) -> Result<Result<bool, Error>> {
164 let store = self.get_store(store)?;
165 Ok(store.exists(&key).await)
166 }
167
168 #[instrument(name = "spin_key_value.get_keys", skip(self, store), err(level = Level::INFO), fields(otel.kind = "client"))]
169 async fn get_keys(
170 &mut self,
171 store: Resource<key_value::Store>,
172 ) -> Result<Result<Vec<String>, Error>> {
173 let store = self.get_store(store)?;
174 Ok(store.get_keys().await)
175 }
176
177 async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
178 self.stores.remove(store.rep());
179 Ok(())
180 }
181}
182
183fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
184 match e {
185 Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
186 Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
187 Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
188 Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
189 }
190}
191
192impl wasi_keyvalue::store::Host for KeyValueDispatch {
193 async fn open(
194 &mut self,
195 identifier: String,
196 ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
197 if self.allowed_stores.contains(&identifier) {
198 let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
199 store.after_open().await.map_err(to_wasi_err)?;
200 let store_idx = self
201 .stores
202 .push(store)
203 .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
204 Ok(Resource::new_own(store_idx))
205 } else {
206 Err(wasi_keyvalue::store::Error::AccessDenied)
207 }
208 }
209
210 fn convert_error(
211 &mut self,
212 error: spin_world::wasi::keyvalue::store::Error,
213 ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
214 Ok(error)
215 }
216}
217
218use wasi_keyvalue::store::Bucket;
219impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
220 async fn get(
221 &mut self,
222 self_: Resource<Bucket>,
223 key: String,
224 ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
225 let store = self.get_store_wasi(self_)?;
226 store.get(&key).await.map_err(to_wasi_err)
227 }
228
229 async fn set(
230 &mut self,
231 self_: Resource<Bucket>,
232 key: String,
233 value: Vec<u8>,
234 ) -> Result<(), wasi_keyvalue::store::Error> {
235 let store = self.get_store_wasi(self_)?;
236 store.set(&key, &value).await.map_err(to_wasi_err)
237 }
238
239 async fn delete(
240 &mut self,
241 self_: Resource<Bucket>,
242 key: String,
243 ) -> Result<(), wasi_keyvalue::store::Error> {
244 let store = self.get_store_wasi(self_)?;
245 store.delete(&key).await.map_err(to_wasi_err)
246 }
247
248 async fn exists(
249 &mut self,
250 self_: Resource<Bucket>,
251 key: String,
252 ) -> Result<bool, wasi_keyvalue::store::Error> {
253 let store = self.get_store_wasi(self_)?;
254 store.exists(&key).await.map_err(to_wasi_err)
255 }
256
257 async fn list_keys(
258 &mut self,
259 self_: Resource<Bucket>,
260 cursor: Option<String>,
261 ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
262 match cursor {
263 Some(_) => Err(wasi_keyvalue::store::Error::Other(
264 "list_keys: cursor not supported".to_owned(),
265 )),
266 None => {
267 let store = self.get_store_wasi(self_)?;
268 let keys = store.get_keys().await.map_err(to_wasi_err)?;
269 Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
270 }
271 }
272 }
273
274 async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
275 self.stores.remove(rep.rep());
276 Ok(())
277 }
278}
279
280impl wasi_keyvalue::batch::Host for KeyValueDispatch {
281 #[instrument(name = "spin_key_value.get_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
282 #[allow(clippy::type_complexity)]
283 async fn get_many(
284 &mut self,
285 bucket: Resource<wasi_keyvalue::batch::Bucket>,
286 keys: Vec<String>,
287 ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
288 let store = self.get_store_wasi(bucket)?;
289 if keys.is_empty() {
290 return Ok(vec![]);
291 }
292 store.get_many(keys).await.map_err(to_wasi_err)
293 }
294
295 #[instrument(name = "spin_key_value.set_many", skip(self, bucket, key_values), err(level = Level::INFO), fields(otel.kind = "client"))]
296 async fn set_many(
297 &mut self,
298 bucket: Resource<wasi_keyvalue::batch::Bucket>,
299 key_values: Vec<(String, Vec<u8>)>,
300 ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
301 let store = self.get_store_wasi(bucket)?;
302 if key_values.is_empty() {
303 return Ok(());
304 }
305 store.set_many(key_values).await.map_err(to_wasi_err)
306 }
307
308 #[instrument(name = "spin_key_value.delete_many", skip(self, bucket, keys), err(level = Level::INFO), fields(otel.kind = "client"))]
309 async fn delete_many(
310 &mut self,
311 bucket: Resource<wasi_keyvalue::batch::Bucket>,
312 keys: Vec<String>,
313 ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
314 let store = self.get_store_wasi(bucket)?;
315 if keys.is_empty() {
316 return Ok(());
317 }
318 store.delete_many(keys).await.map_err(to_wasi_err)
319 }
320}
321
322impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
323 async fn new(
324 &mut self,
325 bucket: Resource<wasi_keyvalue::atomics::Bucket>,
326 key: String,
327 ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
328 let bucket_rep = bucket.rep();
329 let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
330 let store = self.get_store_wasi(bucket)?;
331 let cas = store
332 .new_compare_and_swap(bucket_rep, &key)
333 .await
334 .map_err(to_wasi_err)?;
335 self.compare_and_swaps
336 .push(cas)
337 .map_err(|()| {
338 spin_world::wasi::keyvalue::store::Error::Other(
339 "too many compare_and_swaps opened".to_string(),
340 )
341 })
342 .map(Resource::new_own)
343 }
344
345 async fn current(
346 &mut self,
347 cas: Resource<wasi_keyvalue::atomics::Cas>,
348 ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
349 let cas = self
350 .get_cas(cas)
351 .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
352 cas.current().await.map_err(to_wasi_err)
353 }
354
355 async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
356 self.compare_and_swaps.remove(rep.rep());
357 Ok(())
358 }
359}
360
361impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
362 fn convert_cas_error(
363 &mut self,
364 error: spin_world::wasi::keyvalue::atomics::CasError,
365 ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
366 Ok(error)
367 }
368
369 #[instrument(name = "spin_key_value.increment", skip(self, bucket, key, delta), err(level = Level::INFO), fields(otel.kind = "client"))]
370 async fn increment(
371 &mut self,
372 bucket: Resource<wasi_keyvalue::atomics::Bucket>,
373 key: String,
374 delta: i64,
375 ) -> Result<i64, wasi_keyvalue::store::Error> {
376 let store = self.get_store_wasi(bucket)?;
377 store.increment(key, delta).await.map_err(to_wasi_err)
378 }
379
380 #[instrument(name = "spin_key_value.swap", skip(self, cas_res, value), err(level = Level::INFO), fields(otel.kind = "client"))]
381 async fn swap(
382 &mut self,
383 cas_res: Resource<atomics::Cas>,
384 value: Vec<u8>,
385 ) -> Result<(), CasError> {
386 let cas_rep = cas_res.rep();
387 let cas = self
388 .get_cas(Resource::<Bucket>::new_own(cas_rep))
389 .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
390
391 match cas.swap(value).await {
392 Ok(_) => Ok(()),
393 Err(err) => match err {
394 SwapError::CasFailed(_) => {
395 let bucket = Resource::new_own(cas.bucket_rep().await);
396 let new_cas = self
397 .new(bucket, cas.key().await)
398 .await
399 .map_err(CasError::StoreError)?;
400 let new_cas_rep = new_cas.rep();
401 self.current(Resource::new_own(new_cas_rep))
402 .await
403 .map_err(CasError::StoreError)?;
404 let res = Resource::new_own(new_cas_rep);
405 Err(CasError::CasFailed(res))
406 }
407 SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
408 },
409 }
410 }
411}
412
413pub fn log_error(err: impl std::fmt::Debug) -> Error {
414 tracing::warn!("key-value error: {err:?}");
415 Error::Other(format!("{err:?}"))
416}
417
418pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
419 tracing::warn!("key-value error: {err:?}");
420 SwapError::Other(format!("{err:?}"))
421}
422
423use spin_world::v1::key_value::Error as LegacyError;
424use spin_world::wasi::keyvalue::atomics;
425use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
426
427fn to_legacy_error(value: key_value::Error) -> LegacyError {
428 match value {
429 Error::StoreTableFull => LegacyError::StoreTableFull,
430 Error::NoSuchStore => LegacyError::NoSuchStore,
431 Error::AccessDenied => LegacyError::AccessDenied,
432 Error::Other(s) => LegacyError::Io(s),
433 }
434}
435
436impl spin_world::v1::key_value::Host for KeyValueDispatch {
437 async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
438 let result = <Self as key_value::HostStore>::open(self, name).await?;
439 Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
440 }
441
442 async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
443 let this = Resource::new_borrow(store);
444 let result = <Self as key_value::HostStore>::get(self, this, key).await?;
445 Ok(result
446 .map_err(to_legacy_error)
447 .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
448 }
449
450 async fn set(
451 &mut self,
452 store: u32,
453 key: String,
454 value: Vec<u8>,
455 ) -> Result<Result<(), LegacyError>> {
456 let this = Resource::new_borrow(store);
457 let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
458 Ok(result.map_err(to_legacy_error))
459 }
460
461 async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
462 let this = Resource::new_borrow(store);
463 let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
464 Ok(result.map_err(to_legacy_error))
465 }
466
467 async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
468 let this = Resource::new_borrow(store);
469 let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
470 Ok(result.map_err(to_legacy_error))
471 }
472
473 async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
474 let this = Resource::new_borrow(store);
475 let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
476 Ok(result.map_err(to_legacy_error))
477 }
478
479 async fn close(&mut self, store: u32) -> Result<()> {
480 let this = Resource::new_borrow(store);
481 <Self as key_value::HostStore>::drop(self, this).await
482 }
483}