1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{async_trait, wasmtime::component::Resource};
4use spin_resource_table::Table;
5use spin_telemetry::traces::{self, Blame};
6use spin_world::v2::key_value;
7use spin_world::wasi::keyvalue as wasi_keyvalue;
8use std::{collections::HashSet, sync::Arc};
9use tracing::instrument;
10
11const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
12
13pub use key_value::Error;
14
15#[async_trait]
16pub trait StoreManager: Sync + Send {
17 async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
18 fn is_defined(&self, store_name: &str) -> bool;
19
20 fn summary(&self, store_name: &str) -> Option<String> {
24 let _ = store_name;
25 None
26 }
27}
28
29#[async_trait]
30pub trait Store: Sync + Send {
31 async fn after_open(&self) -> Result<(), Error> {
32 Ok(())
33 }
34 async fn get(&self, key: &str) -> Result<Option<Vec<u8>>, Error>;
35 async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
36 async fn delete(&self, key: &str) -> Result<(), Error>;
37 async fn exists(&self, key: &str) -> Result<bool, Error>;
38 async fn get_keys(&self) -> Result<Vec<String>, Error>;
39 async fn get_many(&self, keys: Vec<String>) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
40 async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
41 async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
42 async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
43 async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
44 -> Result<Arc<dyn Cas>, Error>;
45}
46
47pub struct KeyValueDispatch {
48 allowed_stores: HashSet<String>,
49 manager: Arc<dyn StoreManager>,
50 stores: Table<Arc<dyn Store>>,
51 compare_and_swaps: Table<Arc<dyn Cas>>,
52}
53
54impl KeyValueDispatch {
55 pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
56 Self::new_with_capacity(allowed_stores, manager, DEFAULT_STORE_TABLE_CAPACITY)
57 }
58
59 pub fn new_with_capacity(
60 allowed_stores: HashSet<String>,
61 manager: Arc<dyn StoreManager>,
62 capacity: u32,
63 ) -> Self {
64 Self {
65 allowed_stores,
66 manager,
67 stores: Table::new(capacity),
68 compare_and_swaps: Table::new(capacity),
69 }
70 }
71
72 pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
73 let res = self.stores.get(store.rep()).context("invalid store");
74 if let Err(err) = &res {
75 traces::mark_as_error(err, Some(Blame::Host));
76 }
77 res
78 }
79
80 pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
81 self.compare_and_swaps
82 .get(cas.rep())
83 .context("invalid compare and swap")
84 }
85
86 pub fn allowed_stores(&self) -> &HashSet<String> {
87 &self.allowed_stores
88 }
89
90 pub fn get_store_wasi<T: 'static>(
91 &self,
92 store: Resource<T>,
93 ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
94 self.stores
95 .get(store.rep())
96 .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
97 }
98
99 pub fn get_cas_wasi<T: 'static>(
100 &self,
101 cas: Resource<T>,
102 ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
103 self.compare_and_swaps
104 .get(cas.rep())
105 .ok_or(wasi_keyvalue::atomics::Error::Other(
106 "compare and swap not found".to_string(),
107 ))
108 }
109}
110
111impl key_value::Host for KeyValueDispatch {}
112
113impl key_value::HostStore for KeyValueDispatch {
114 #[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
115 async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
116 Ok(async {
117 if self.allowed_stores.contains(&name) {
118 let store = self.manager.get(&name).await?;
119 store.after_open().await?;
120 let store_idx = self
121 .stores
122 .push(store)
123 .map_err(|()| Error::StoreTableFull)?;
124 Ok(Resource::new_own(store_idx))
125 } else {
126 Err(Error::AccessDenied)
127 }
128 }
129 .await)
130 }
131
132 #[instrument(name = "spin_key_value.get", skip_all, fields(otel.kind = "client"))]
133 async fn get(
134 &mut self,
135 store: Resource<key_value::Store>,
136 key: String,
137 ) -> Result<Result<Option<Vec<u8>>, Error>> {
138 let store = self.get_store(store)?;
139 Ok(store.get(&key).await.map_err(track_error_on_span))
140 }
141
142 #[instrument(name = "spin_key_value.set", skip_all, fields(otel.kind = "client"))]
143 async fn set(
144 &mut self,
145 store: Resource<key_value::Store>,
146 key: String,
147 value: Vec<u8>,
148 ) -> Result<Result<(), Error>> {
149 let store = self.get_store(store)?;
150 Ok(store.set(&key, &value).await.map_err(track_error_on_span))
151 }
152
153 #[instrument(name = "spin_key_value.delete", skip_all, fields(otel.kind = "client"))]
154 async fn delete(
155 &mut self,
156 store: Resource<key_value::Store>,
157 key: String,
158 ) -> Result<Result<(), Error>> {
159 let store = self.get_store(store)?;
160 Ok(store.delete(&key).await.map_err(track_error_on_span))
161 }
162
163 #[instrument(name = "spin_key_value.exists", skip_all, fields(otel.kind = "client"))]
164 async fn exists(
165 &mut self,
166 store: Resource<key_value::Store>,
167 key: String,
168 ) -> Result<Result<bool, Error>> {
169 let store = self.get_store(store)?;
170 Ok(store.exists(&key).await.map_err(track_error_on_span))
171 }
172
173 #[instrument(name = "spin_key_value.get_keys", skip_all, fields(otel.kind = "client"))]
174 async fn get_keys(
175 &mut self,
176 store: Resource<key_value::Store>,
177 ) -> Result<Result<Vec<String>, Error>> {
178 let store = self.get_store(store)?;
179 Ok(store.get_keys().await.map_err(track_error_on_span))
180 }
181
182 async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
183 self.stores.remove(store.rep());
184 Ok(())
185 }
186}
187
188fn track_error_on_span(err: Error) -> Error {
190 let blame = match err {
191 Error::NoSuchStore | Error::AccessDenied => Blame::Guest,
192 Error::StoreTableFull | Error::Other(_) => Blame::Host,
193 };
194 traces::mark_as_error(&err, Some(blame));
195 err
196}
197
198fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
199 match track_error_on_span(e) {
200 Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
201 Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
202 Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
203 Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
204 }
205}
206
207impl wasi_keyvalue::store::Host for KeyValueDispatch {
208 #[instrument(name = "wasi_key_value.open", skip_all, fields(otel.kind = "client"))]
209 async fn open(
210 &mut self,
211 identifier: String,
212 ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
213 if self.allowed_stores.contains(&identifier) {
214 let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
215 store.after_open().await.map_err(to_wasi_err)?;
216 let store_idx = self
217 .stores
218 .push(store)
219 .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
220 Ok(Resource::new_own(store_idx))
221 } else {
222 Err(wasi_keyvalue::store::Error::AccessDenied)
223 }
224 }
225
226 fn convert_error(
227 &mut self,
228 error: spin_world::wasi::keyvalue::store::Error,
229 ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
230 Ok(error)
231 }
232}
233
234use wasi_keyvalue::store::Bucket;
235impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
236 #[instrument(name = "wasi_key_value.get", skip_all, fields(otel.kind = "client"))]
237 async fn get(
238 &mut self,
239 self_: Resource<Bucket>,
240 key: String,
241 ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
242 let store = self.get_store_wasi(self_)?;
243 store.get(&key).await.map_err(to_wasi_err)
244 }
245
246 #[instrument(name = "wasi_key_value.set", skip_all, fields(otel.kind = "client"))]
247 async fn set(
248 &mut self,
249 self_: Resource<Bucket>,
250 key: String,
251 value: Vec<u8>,
252 ) -> Result<(), wasi_keyvalue::store::Error> {
253 let store = self.get_store_wasi(self_)?;
254 store.set(&key, &value).await.map_err(to_wasi_err)
255 }
256
257 #[instrument(name = "wasi_key_value.delete", skip_all, fields(otel.kind = "client"))]
258 async fn delete(
259 &mut self,
260 self_: Resource<Bucket>,
261 key: String,
262 ) -> Result<(), wasi_keyvalue::store::Error> {
263 let store = self.get_store_wasi(self_)?;
264 store.delete(&key).await.map_err(to_wasi_err)
265 }
266
267 #[instrument(name = "wasi_key_value.exists", skip_all, fields(otel.kind = "client"))]
268 async fn exists(
269 &mut self,
270 self_: Resource<Bucket>,
271 key: String,
272 ) -> Result<bool, wasi_keyvalue::store::Error> {
273 let store = self.get_store_wasi(self_)?;
274 store.exists(&key).await.map_err(to_wasi_err)
275 }
276
277 #[instrument(name = "wasi_key_value.list_keys", skip_all, fields(otel.kind = "client"))]
278 async fn list_keys(
279 &mut self,
280 self_: Resource<Bucket>,
281 cursor: Option<String>,
282 ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
283 match cursor {
284 Some(_) => Err(wasi_keyvalue::store::Error::Other(
285 "list_keys: cursor not supported".to_owned(),
286 )),
287 None => {
288 let store = self.get_store_wasi(self_)?;
289 let keys = store.get_keys().await.map_err(to_wasi_err)?;
290 Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
291 }
292 }
293 }
294
295 async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
296 self.stores.remove(rep.rep());
297 Ok(())
298 }
299}
300
301impl wasi_keyvalue::batch::Host for KeyValueDispatch {
302 #[instrument(name = "spin_key_value.get_many", skip_all, fields(otel.kind = "client"))]
303 #[allow(clippy::type_complexity)]
304 async fn get_many(
305 &mut self,
306 bucket: Resource<wasi_keyvalue::batch::Bucket>,
307 keys: Vec<String>,
308 ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
309 let store = self.get_store_wasi(bucket)?;
310 if keys.is_empty() {
311 return Ok(vec![]);
312 }
313 store.get_many(keys).await.map_err(to_wasi_err)
314 }
315
316 #[instrument(name = "spin_key_value.set_many", skip_all, fields(otel.kind = "client"))]
317 async fn set_many(
318 &mut self,
319 bucket: Resource<wasi_keyvalue::batch::Bucket>,
320 key_values: Vec<(String, Vec<u8>)>,
321 ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
322 let store = self.get_store_wasi(bucket)?;
323 if key_values.is_empty() {
324 return Ok(());
325 }
326 store.set_many(key_values).await.map_err(to_wasi_err)
327 }
328
329 #[instrument(name = "spin_key_value.delete_many", skip_all, fields(otel.kind = "client"))]
330 async fn delete_many(
331 &mut self,
332 bucket: Resource<wasi_keyvalue::batch::Bucket>,
333 keys: Vec<String>,
334 ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
335 let store = self.get_store_wasi(bucket)?;
336 if keys.is_empty() {
337 return Ok(());
338 }
339 store.delete_many(keys).await.map_err(to_wasi_err)
340 }
341}
342
343impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
344 #[instrument(name = "wasi_key_value_cas.new", skip_all, fields(otel.kind = "client"))]
345 async fn new(
346 &mut self,
347 bucket: Resource<wasi_keyvalue::atomics::Bucket>,
348 key: String,
349 ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
350 let bucket_rep = bucket.rep();
351 let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
352 let store = self.get_store_wasi(bucket)?;
353 let cas = store
354 .new_compare_and_swap(bucket_rep, &key)
355 .await
356 .map_err(to_wasi_err)?;
357 self.compare_and_swaps
358 .push(cas)
359 .map_err(|()| {
360 spin_world::wasi::keyvalue::store::Error::Other(
361 "too many compare_and_swaps opened".to_string(),
362 )
363 })
364 .map(Resource::new_own)
365 }
366
367 #[instrument(name = "wasi_key_value_cas.current", skip_all, fields(otel.kind = "client"))]
368 async fn current(
369 &mut self,
370 cas: Resource<wasi_keyvalue::atomics::Cas>,
371 ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
372 let cas = self
373 .get_cas(cas)
374 .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
375 cas.current().await.map_err(to_wasi_err)
376 }
377
378 async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
379 self.compare_and_swaps.remove(rep.rep());
380 Ok(())
381 }
382}
383
384impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
385 fn convert_cas_error(
386 &mut self,
387 error: spin_world::wasi::keyvalue::atomics::CasError,
388 ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
389 Ok(error)
390 }
391
392 #[instrument(name = "spin_key_value.increment", skip_all, fields(otel.kind = "client"))]
393 async fn increment(
394 &mut self,
395 bucket: Resource<wasi_keyvalue::atomics::Bucket>,
396 key: String,
397 delta: i64,
398 ) -> Result<i64, wasi_keyvalue::store::Error> {
399 let store = self.get_store_wasi(bucket)?;
400 store.increment(key, delta).await.map_err(to_wasi_err)
401 }
402
403 #[instrument(name = "spin_key_value.swap", skip_all, fields(otel.kind = "client"))]
404 async fn swap(
405 &mut self,
406 cas_res: Resource<atomics::Cas>,
407 value: Vec<u8>,
408 ) -> Result<(), CasError> {
409 let cas_rep = cas_res.rep();
410 let cas = self
411 .get_cas(Resource::<Bucket>::new_own(cas_rep))
412 .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
413
414 match cas.swap(value).await {
415 Ok(_) => Ok(()),
416 Err(err) => match err {
417 SwapError::CasFailed(_) => {
418 let bucket = Resource::new_own(cas.bucket_rep().await);
419 let new_cas = self
420 .new(bucket, cas.key().await)
421 .await
422 .map_err(CasError::StoreError)?;
423 let new_cas_rep = new_cas.rep();
424 self.current(Resource::new_own(new_cas_rep))
425 .await
426 .map_err(CasError::StoreError)?;
427 let res = Resource::new_own(new_cas_rep);
428 Err(CasError::CasFailed(res))
429 }
430 SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
431 },
432 }
433 }
434}
435
436pub fn log_error(err: impl std::fmt::Debug) -> Error {
437 tracing::warn!("key-value error: {err:?}");
438 Error::Other(format!("{err:?}"))
439}
440
441pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
442 tracing::warn!("key-value error: {err:?}");
443 SwapError::Other(format!("{err:?}"))
444}
445
446use spin_world::v1::key_value::Error as LegacyError;
447use spin_world::wasi::keyvalue::atomics;
448use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
449
450fn to_legacy_error(err: Error) -> LegacyError {
451 match err {
452 Error::StoreTableFull => LegacyError::StoreTableFull,
453 Error::NoSuchStore => LegacyError::NoSuchStore,
454 Error::AccessDenied => LegacyError::AccessDenied,
455 Error::Other(s) => LegacyError::Io(s),
456 }
457}
458
459impl spin_world::v1::key_value::Host for KeyValueDispatch {
460 async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
461 let result = <Self as key_value::HostStore>::open(self, name).await?;
462 Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
463 }
464
465 async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
466 let this = Resource::new_borrow(store);
467 let result = <Self as key_value::HostStore>::get(self, this, key).await?;
468 Ok(result
469 .map_err(to_legacy_error)
470 .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
471 }
472
473 async fn set(
474 &mut self,
475 store: u32,
476 key: String,
477 value: Vec<u8>,
478 ) -> Result<Result<(), LegacyError>> {
479 let this = Resource::new_borrow(store);
480 let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
481 Ok(result.map_err(to_legacy_error))
482 }
483
484 async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
485 let this = Resource::new_borrow(store);
486 let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
487 Ok(result.map_err(to_legacy_error))
488 }
489
490 async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
491 let this = Resource::new_borrow(store);
492 let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
493 Ok(result.map_err(to_legacy_error))
494 }
495
496 async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
497 let this = Resource::new_borrow(store);
498 let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
499 Ok(result.map_err(to_legacy_error))
500 }
501
502 async fn close(&mut self, store: u32) -> Result<()> {
503 let this = Resource::new_borrow(store);
504 <Self as key_value::HostStore>::drop(self, this).await
505 }
506}