1use super::{Cas, SwapError};
2use anyhow::{Context, Result};
3use spin_core::{
4 async_trait,
5 wasmtime::component::{Accessor, FutureReader, Resource, StreamReader},
6};
7use spin_factor_otel::OtelFactorState;
8use spin_resource_table::Table;
9use spin_telemetry::traces::{self, Blame};
10use spin_world::MAX_HOST_BUFFERED_BYTES;
11use spin_world::spin::key_value::key_value as v3;
12use spin_world::v2::key_value;
13use spin_world::wasi::keyvalue as wasi_keyvalue;
14use std::{any::Any, collections::HashSet, sync::Arc};
15use tracing::instrument;
16
17const DEFAULT_STORE_TABLE_CAPACITY: u32 = 256;
18
19pub use key_value::Error;
20
21#[async_trait]
22pub trait StoreManager: Sync + Send {
23 async fn get(&self, name: &str) -> Result<Arc<dyn Store>, Error>;
24 fn is_defined(&self, store_name: &str) -> bool;
25
26 fn summary(&self, store_name: &str) -> Option<String> {
30 let _ = store_name;
31 None
32 }
33
34 fn metadata(&self) -> Arc<dyn Any> {
36 Arc::new(())
37 }
38}
39
40#[async_trait]
41pub trait Store: Sync + Send {
42 async fn after_open(&self) -> Result<(), Error> {
43 Ok(())
44 }
45 async fn get(&self, key: &str, max_result_bytes: usize) -> Result<Option<Vec<u8>>, Error>;
46 async fn set(&self, key: &str, value: &[u8]) -> Result<(), Error>;
47 async fn delete(&self, key: &str) -> Result<(), Error>;
48 async fn exists(&self, key: &str) -> Result<bool, Error>;
49 async fn get_keys(&self, max_result_bytes: usize) -> Result<Vec<String>, Error>;
50 async fn get_keys_async(
51 &self,
52 max_result_bytes: usize,
53 ) -> (
54 tokio::sync::mpsc::Receiver<String>,
55 tokio::sync::oneshot::Receiver<Result<(), v3::Error>>,
56 );
57 async fn get_many(
58 &self,
59 keys: Vec<String>,
60 max_result_bytes: usize,
61 ) -> Result<Vec<(String, Option<Vec<u8>>)>, Error>;
62 async fn set_many(&self, key_values: Vec<(String, Vec<u8>)>) -> Result<(), Error>;
63 async fn delete_many(&self, keys: Vec<String>) -> Result<(), Error>;
64 async fn increment(&self, key: String, delta: i64) -> Result<i64, Error>;
65 async fn new_compare_and_swap(&self, bucket_rep: u32, key: &str)
66 -> Result<Arc<dyn Cas>, Error>;
67}
68
69pub struct KeyValueDispatch {
70 allowed_stores: HashSet<String>,
71 manager: Arc<dyn StoreManager>,
72 stores: Table<Arc<dyn Store>>,
73 compare_and_swaps: Table<Arc<dyn Cas>>,
74 otel: OtelFactorState,
75}
76
77impl KeyValueDispatch {
78 pub fn new(allowed_stores: HashSet<String>, manager: Arc<dyn StoreManager>) -> Self {
79 Self::new_with_capacity(
80 allowed_stores,
81 manager,
82 DEFAULT_STORE_TABLE_CAPACITY,
83 Default::default(),
84 )
85 }
86
87 pub fn new_with_capacity(
88 allowed_stores: HashSet<String>,
89 manager: Arc<dyn StoreManager>,
90 capacity: u32,
91 otel: OtelFactorState,
92 ) -> Self {
93 Self {
94 allowed_stores,
95 manager,
96 stores: Table::new(capacity),
97 compare_and_swaps: Table::new(capacity),
98 otel,
99 }
100 }
101
102 pub fn get_store<T: 'static>(&self, store: Resource<T>) -> anyhow::Result<&Arc<dyn Store>> {
103 let res = self.stores.get(store.rep()).context("invalid store");
104 if let Err(err) = &res {
105 traces::mark_as_error(err, Some(Blame::Host));
106 }
107 res
108 }
109
110 pub fn get_cas<T: 'static>(&self, cas: Resource<T>) -> Result<&Arc<dyn Cas>> {
111 self.compare_and_swaps
112 .get(cas.rep())
113 .context("invalid compare and swap")
114 }
115
116 pub fn allowed_stores(&self) -> &HashSet<String> {
117 &self.allowed_stores
118 }
119
120 pub fn get_store_wasi<T: 'static>(
121 &self,
122 store: Resource<T>,
123 ) -> Result<&Arc<dyn Store>, wasi_keyvalue::store::Error> {
124 self.stores
125 .get(store.rep())
126 .ok_or(wasi_keyvalue::store::Error::NoSuchStore)
127 }
128
129 pub fn get_cas_wasi<T: 'static>(
130 &self,
131 cas: Resource<T>,
132 ) -> Result<&Arc<dyn Cas>, wasi_keyvalue::atomics::Error> {
133 self.compare_and_swaps
134 .get(cas.rep())
135 .ok_or(wasi_keyvalue::atomics::Error::Other(
136 "compare and swap not found".to_string(),
137 ))
138 }
139
140 pub fn manager_metadata(&self) -> Arc<dyn Any> {
141 self.manager.metadata()
142 }
143}
144
145impl key_value::Host for KeyValueDispatch {}
146
147impl key_value::HostStore for KeyValueDispatch {
148 #[instrument(name = "spin_key_value.open", skip(self), err, fields(otel.kind = "client", kv.backend=self.manager.summary(&name).unwrap_or("unknown".to_string())))]
149 async fn open(&mut self, name: String) -> Result<Result<Resource<key_value::Store>, Error>> {
150 self.otel.reparent_tracing_span();
151 Ok(async {
152 if self.allowed_stores.contains(&name) {
153 let store = self.manager.get(&name).await?;
154 store.after_open().await?;
155 let store_idx = self
156 .stores
157 .push(store)
158 .map_err(|()| Error::StoreTableFull)?;
159 Ok(Resource::new_own(store_idx))
160 } else {
161 Err(Error::AccessDenied)
162 }
163 }
164 .await)
165 }
166
167 #[instrument(name = "spin_key_value.get", skip_all, fields(otel.kind = "client"))]
168 async fn get(
169 &mut self,
170 store: Resource<key_value::Store>,
171 key: String,
172 ) -> Result<Result<Option<Vec<u8>>, Error>> {
173 self.otel.reparent_tracing_span();
174 let store = self.get_store(store)?;
175 Ok(store
176 .get(&key, MAX_HOST_BUFFERED_BYTES)
177 .await
178 .map_err(track_error_on_span))
179 }
180
181 #[instrument(name = "spin_key_value.set", skip_all, fields(otel.kind = "client"))]
182 async fn set(
183 &mut self,
184 store: Resource<key_value::Store>,
185 key: String,
186 value: Vec<u8>,
187 ) -> Result<Result<(), Error>> {
188 self.otel.reparent_tracing_span();
189 let store = self.get_store(store)?;
190 Ok(store.set(&key, &value).await.map_err(track_error_on_span))
191 }
192
193 #[instrument(name = "spin_key_value.delete", skip_all, fields(otel.kind = "client"))]
194 async fn delete(
195 &mut self,
196 store: Resource<key_value::Store>,
197 key: String,
198 ) -> Result<Result<(), Error>> {
199 self.otel.reparent_tracing_span();
200 let store = self.get_store(store)?;
201 Ok(store.delete(&key).await.map_err(track_error_on_span))
202 }
203
204 #[instrument(name = "spin_key_value.exists", skip_all, fields(otel.kind = "client"))]
205 async fn exists(
206 &mut self,
207 store: Resource<key_value::Store>,
208 key: String,
209 ) -> Result<Result<bool, Error>> {
210 self.otel.reparent_tracing_span();
211 let store = self.get_store(store)?;
212 Ok(store.exists(&key).await.map_err(track_error_on_span))
213 }
214
215 #[instrument(name = "spin_key_value.get_keys", skip_all, fields(otel.kind = "client"))]
216 async fn get_keys(
217 &mut self,
218 store: Resource<key_value::Store>,
219 ) -> Result<Result<Vec<String>, Error>> {
220 self.otel.reparent_tracing_span();
221 let store = self.get_store(store)?;
222 Ok(store
223 .get_keys(MAX_HOST_BUFFERED_BYTES)
224 .await
225 .map_err(track_error_on_span))
226 }
227
228 async fn drop(&mut self, store: Resource<key_value::Store>) -> Result<()> {
229 self.stores.remove(store.rep());
230 Ok(())
231 }
232}
233
234impl spin_core::wasmtime::component::HasData for KeyValueDispatch {
235 type Data<'a> = &'a mut KeyValueDispatch;
236}
237
238impl v3::Host for KeyValueDispatch {
239 fn convert_error(&mut self, err: v3::Error) -> anyhow::Result<v3::Error> {
240 Ok(err)
241 }
242}
243
244impl v3::HostStore for KeyValueDispatch {
245 async fn drop(&mut self, store: Resource<v3::Store>) -> Result<()> {
246 self.stores.remove(store.rep());
247 Ok(())
248 }
249}
250
251impl v3::HostStoreWithStore for crate::KeyValueFactorData {
252 async fn open<T>(
253 accessor: &Accessor<T, Self>,
254 label: String,
255 ) -> Result<Resource<v3::Store>, v3::Error> {
256 let (allowed, manager) = accessor.with(|mut access| {
257 let host = access.get();
258 host.otel.reparent_tracing_span();
259 (host.allowed_stores.contains(&label), host.manager.clone())
260 });
261
262 if !allowed {
263 return Err(v3::Error::AccessDenied);
264 }
265
266 let store = manager.get(&label).await.map_err(to_v3_err)?;
267 store.after_open().await.map_err(to_v3_err)?;
268
269 accessor.with(|mut access| {
270 let host = access.get();
271 host.stores
272 .push(store)
273 .map(Resource::new_own)
274 .map_err(|()| v3::Error::StoreTableFull)
275 })
276 }
277
278 async fn get<T>(
279 accessor: &Accessor<T, Self>,
280 store: Resource<v3::Store>,
281 key: String,
282 ) -> Result<Option<Vec<u8>>, v3::Error> {
283 let store = accessor
284 .with(|mut access| {
285 let host = access.get();
286 host.otel.reparent_tracing_span();
287 host.get_store(store).cloned()
288 })
289 .map_err(|_| v3::Error::NoSuchStore)?;
290 store
291 .get(&key, MAX_HOST_BUFFERED_BYTES)
292 .await
293 .map_err(to_v3_err)
294 .map_err(track_error_on_span_v3)
295 }
296
297 async fn set<T>(
298 accessor: &Accessor<T, Self>,
299 store: Resource<v3::Store>,
300 key: String,
301 value: Vec<u8>,
302 ) -> Result<(), v3::Error> {
303 let store = accessor
304 .with(|mut access| {
305 let host = access.get();
306 host.otel.reparent_tracing_span();
307 host.get_store(store).cloned()
308 })
309 .map_err(|_| v3::Error::NoSuchStore)?;
310 store
311 .set(&key, &value)
312 .await
313 .map_err(to_v3_err)
314 .map_err(track_error_on_span_v3)
315 }
316
317 async fn delete<T>(
318 accessor: &Accessor<T, Self>,
319 store: Resource<v3::Store>,
320 key: String,
321 ) -> Result<(), v3::Error> {
322 let store = accessor
323 .with(|mut access| {
324 let host = access.get();
325 host.otel.reparent_tracing_span();
326 host.get_store(store).cloned()
327 })
328 .map_err(|_| v3::Error::NoSuchStore)?;
329 store
330 .delete(&key)
331 .await
332 .map_err(to_v3_err)
333 .map_err(track_error_on_span_v3)
334 }
335
336 async fn exists<T>(
337 accessor: &Accessor<T, Self>,
338 store: Resource<v3::Store>,
339 key: String,
340 ) -> Result<bool, v3::Error> {
341 let store = accessor
342 .with(|mut access| {
343 let host = access.get();
344 host.otel.reparent_tracing_span();
345 host.get_store(store).cloned()
346 })
347 .map_err(|_| v3::Error::NoSuchStore)?;
348 store
349 .exists(&key)
350 .await
351 .map_err(to_v3_err)
352 .map_err(track_error_on_span_v3)
353 }
354
355 async fn get_keys<T>(
356 accessor: &Accessor<T, Self>,
357 store: Resource<v3::Store>,
358 ) -> Result<(StreamReader<String>, FutureReader<Result<(), v3::Error>>)> {
359 let store = accessor
360 .with(|mut access| {
361 let host = access.get();
362 host.otel.reparent_tracing_span();
363 host.get_store(store).cloned()
364 })
365 .map_err(|_| v3::Error::NoSuchStore)?;
366
367 let (keys_rx, err_rx) = store.get_keys_async(MAX_HOST_BUFFERED_BYTES).await;
368
369 let producer = spin_wasi_async::stream::producer(keys_rx);
370 let (ksr, efr) = accessor.with(|mut access| {
371 let ksr = StreamReader::new(&mut access, producer)?;
372 let efr = FutureReader::new(&mut access, err_rx)?;
373 anyhow::Ok((ksr, efr))
374 })?;
375
376 Ok((ksr, efr))
377 }
378}
379
380fn track_error_on_span(err: Error) -> Error {
382 let blame = match err {
383 Error::NoSuchStore | Error::AccessDenied => Blame::Guest,
384 Error::StoreTableFull | Error::Other(_) => Blame::Host,
385 };
386 traces::mark_as_error(&err, Some(blame));
387 err
388}
389
390fn track_error_on_span_v3(err: v3::Error) -> v3::Error {
392 let blame = match err {
393 v3::Error::NoSuchStore | v3::Error::AccessDenied => Blame::Guest,
394 v3::Error::StoreTableFull | v3::Error::Other(_) => Blame::Host,
395 };
396 traces::mark_as_error(&err, Some(blame));
397 err
398}
399
400fn to_wasi_err(e: Error) -> wasi_keyvalue::store::Error {
401 match track_error_on_span(e) {
402 Error::AccessDenied => wasi_keyvalue::store::Error::AccessDenied,
403 Error::NoSuchStore => wasi_keyvalue::store::Error::NoSuchStore,
404 Error::StoreTableFull => wasi_keyvalue::store::Error::Other("store table full".to_string()),
405 Error::Other(msg) => wasi_keyvalue::store::Error::Other(msg),
406 }
407}
408
409pub fn to_v3_err(e: Error) -> v3::Error {
410 match track_error_on_span(e) {
411 Error::AccessDenied => v3::Error::AccessDenied,
412 Error::NoSuchStore => v3::Error::NoSuchStore,
413 Error::StoreTableFull => v3::Error::StoreTableFull,
414 Error::Other(msg) => v3::Error::Other(msg),
415 }
416}
417
418impl wasi_keyvalue::store::Host for KeyValueDispatch {
419 #[instrument(name = "wasi_key_value.open", skip_all, fields(otel.kind = "client"))]
420 async fn open(
421 &mut self,
422 identifier: String,
423 ) -> Result<Resource<wasi_keyvalue::store::Bucket>, wasi_keyvalue::store::Error> {
424 if self.allowed_stores.contains(&identifier) {
425 let store = self.manager.get(&identifier).await.map_err(to_wasi_err)?;
426 store.after_open().await.map_err(to_wasi_err)?;
427 let store_idx = self
428 .stores
429 .push(store)
430 .map_err(|()| wasi_keyvalue::store::Error::Other("store table full".to_string()))?;
431 Ok(Resource::new_own(store_idx))
432 } else {
433 Err(wasi_keyvalue::store::Error::AccessDenied)
434 }
435 }
436
437 fn convert_error(
438 &mut self,
439 error: spin_world::wasi::keyvalue::store::Error,
440 ) -> std::result::Result<spin_world::wasi::keyvalue::store::Error, anyhow::Error> {
441 Ok(error)
442 }
443}
444
445use wasi_keyvalue::store::Bucket;
446impl wasi_keyvalue::store::HostBucket for KeyValueDispatch {
447 #[instrument(name = "wasi_key_value.get", skip_all, fields(otel.kind = "client"))]
448 async fn get(
449 &mut self,
450 self_: Resource<Bucket>,
451 key: String,
452 ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
453 let store = self.get_store_wasi(self_)?;
454 store
455 .get(&key, MAX_HOST_BUFFERED_BYTES)
456 .await
457 .map_err(to_wasi_err)
458 }
459
460 #[instrument(name = "wasi_key_value.set", skip_all, fields(otel.kind = "client"))]
461 async fn set(
462 &mut self,
463 self_: Resource<Bucket>,
464 key: String,
465 value: Vec<u8>,
466 ) -> Result<(), wasi_keyvalue::store::Error> {
467 let store = self.get_store_wasi(self_)?;
468 store.set(&key, &value).await.map_err(to_wasi_err)
469 }
470
471 #[instrument(name = "wasi_key_value.delete", skip_all, fields(otel.kind = "client"))]
472 async fn delete(
473 &mut self,
474 self_: Resource<Bucket>,
475 key: String,
476 ) -> Result<(), wasi_keyvalue::store::Error> {
477 let store = self.get_store_wasi(self_)?;
478 store.delete(&key).await.map_err(to_wasi_err)
479 }
480
481 #[instrument(name = "wasi_key_value.exists", skip_all, fields(otel.kind = "client"))]
482 async fn exists(
483 &mut self,
484 self_: Resource<Bucket>,
485 key: String,
486 ) -> Result<bool, wasi_keyvalue::store::Error> {
487 let store = self.get_store_wasi(self_)?;
488 store.exists(&key).await.map_err(to_wasi_err)
489 }
490
491 #[instrument(name = "wasi_key_value.list_keys", skip_all, fields(otel.kind = "client"))]
492 async fn list_keys(
493 &mut self,
494 self_: Resource<Bucket>,
495 cursor: Option<String>,
496 ) -> Result<wasi_keyvalue::store::KeyResponse, wasi_keyvalue::store::Error> {
497 match cursor {
498 Some(_) => Err(wasi_keyvalue::store::Error::Other(
499 "list_keys: cursor not supported".to_owned(),
500 )),
501 None => {
502 let store = self.get_store_wasi(self_)?;
503 let keys = store
504 .get_keys(MAX_HOST_BUFFERED_BYTES)
505 .await
506 .map_err(to_wasi_err)?;
507 Ok(wasi_keyvalue::store::KeyResponse { keys, cursor: None })
508 }
509 }
510 }
511
512 async fn drop(&mut self, rep: Resource<Bucket>) -> anyhow::Result<()> {
513 self.stores.remove(rep.rep());
514 Ok(())
515 }
516}
517
518impl wasi_keyvalue::batch::Host for KeyValueDispatch {
519 #[instrument(name = "spin_key_value.get_many", skip_all, fields(otel.kind = "client"))]
520 #[allow(clippy::type_complexity)]
521 async fn get_many(
522 &mut self,
523 bucket: Resource<wasi_keyvalue::batch::Bucket>,
524 keys: Vec<String>,
525 ) -> std::result::Result<Vec<(String, Option<Vec<u8>>)>, wasi_keyvalue::store::Error> {
526 let store = self.get_store_wasi(bucket)?;
527 if keys.is_empty() {
528 return Ok(vec![]);
529 }
530 store
531 .get_many(keys, MAX_HOST_BUFFERED_BYTES)
532 .await
533 .map_err(to_wasi_err)
534 }
535
536 #[instrument(name = "spin_key_value.set_many", skip_all, fields(otel.kind = "client"))]
537 async fn set_many(
538 &mut self,
539 bucket: Resource<wasi_keyvalue::batch::Bucket>,
540 key_values: Vec<(String, Vec<u8>)>,
541 ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
542 let store = self.get_store_wasi(bucket)?;
543 if key_values.is_empty() {
544 return Ok(());
545 }
546 store.set_many(key_values).await.map_err(to_wasi_err)
547 }
548
549 #[instrument(name = "spin_key_value.delete_many", skip_all, fields(otel.kind = "client"))]
550 async fn delete_many(
551 &mut self,
552 bucket: Resource<wasi_keyvalue::batch::Bucket>,
553 keys: Vec<String>,
554 ) -> std::result::Result<(), wasi_keyvalue::store::Error> {
555 let store = self.get_store_wasi(bucket)?;
556 if keys.is_empty() {
557 return Ok(());
558 }
559 store.delete_many(keys).await.map_err(to_wasi_err)
560 }
561}
562
563impl wasi_keyvalue::atomics::HostCas for KeyValueDispatch {
564 #[instrument(name = "wasi_key_value_cas.new", skip_all, fields(otel.kind = "client"))]
565 async fn new(
566 &mut self,
567 bucket: Resource<wasi_keyvalue::atomics::Bucket>,
568 key: String,
569 ) -> Result<Resource<wasi_keyvalue::atomics::Cas>, wasi_keyvalue::store::Error> {
570 let bucket_rep = bucket.rep();
571 let bucket: Resource<Bucket> = Resource::new_own(bucket_rep);
572 let store = self.get_store_wasi(bucket)?;
573 let cas = store
574 .new_compare_and_swap(bucket_rep, &key)
575 .await
576 .map_err(to_wasi_err)?;
577 self.compare_and_swaps
578 .push(cas)
579 .map_err(|()| {
580 spin_world::wasi::keyvalue::store::Error::Other(
581 "too many compare_and_swaps opened".to_string(),
582 )
583 })
584 .map(Resource::new_own)
585 }
586
587 #[instrument(name = "wasi_key_value_cas.current", skip_all, fields(otel.kind = "client"))]
588 async fn current(
589 &mut self,
590 cas: Resource<wasi_keyvalue::atomics::Cas>,
591 ) -> Result<Option<Vec<u8>>, wasi_keyvalue::store::Error> {
592 let cas = self
593 .get_cas(cas)
594 .map_err(|e| wasi_keyvalue::store::Error::Other(e.to_string()))?;
595 cas.current(MAX_HOST_BUFFERED_BYTES)
596 .await
597 .map_err(to_wasi_err)
598 }
599
600 async fn drop(&mut self, rep: Resource<wasi_keyvalue::atomics::Cas>) -> Result<()> {
601 self.compare_and_swaps.remove(rep.rep());
602 Ok(())
603 }
604}
605
606impl wasi_keyvalue::atomics::Host for KeyValueDispatch {
607 fn convert_cas_error(
608 &mut self,
609 error: spin_world::wasi::keyvalue::atomics::CasError,
610 ) -> std::result::Result<spin_world::wasi::keyvalue::atomics::CasError, anyhow::Error> {
611 Ok(error)
612 }
613
614 #[instrument(name = "spin_key_value.increment", skip_all, fields(otel.kind = "client"))]
615 async fn increment(
616 &mut self,
617 bucket: Resource<wasi_keyvalue::atomics::Bucket>,
618 key: String,
619 delta: i64,
620 ) -> Result<i64, wasi_keyvalue::store::Error> {
621 let store = self.get_store_wasi(bucket)?;
622 store.increment(key, delta).await.map_err(to_wasi_err)
623 }
624
625 #[instrument(name = "spin_key_value.swap", skip_all, fields(otel.kind = "client"))]
626 async fn swap(
627 &mut self,
628 cas_res: Resource<atomics::Cas>,
629 value: Vec<u8>,
630 ) -> Result<(), CasError> {
631 let cas_rep = cas_res.rep();
632 let cas = self
633 .get_cas(Resource::<Bucket>::new_own(cas_rep))
634 .map_err(|e| CasError::StoreError(atomics::Error::Other(e.to_string())))?;
635
636 match cas.swap(value).await {
637 Ok(_) => Ok(()),
638 Err(err) => match err {
639 SwapError::CasFailed(_) => {
640 let bucket = Resource::new_own(cas.bucket_rep().await);
641 let new_cas = self
642 .new(bucket, cas.key().await)
643 .await
644 .map_err(CasError::StoreError)?;
645 let new_cas_rep = new_cas.rep();
646 self.current(Resource::new_own(new_cas_rep))
647 .await
648 .map_err(CasError::StoreError)?;
649 let res = Resource::new_own(new_cas_rep);
650 Err(CasError::CasFailed(res))
651 }
652 SwapError::Other(msg) => Err(CasError::StoreError(atomics::Error::Other(msg))),
653 },
654 }
655 }
656}
657
658pub fn log_error(err: impl std::fmt::Debug) -> Error {
659 tracing::warn!("key-value error: {err:?}");
660 Error::Other(format!("{err:?}"))
661}
662
663pub fn log_error_v3(err: impl std::fmt::Debug) -> v3::Error {
664 tracing::warn!("key-value error: {err:?}");
665 v3::Error::Other(format!("{err:?}"))
666}
667
668pub fn log_cas_error(err: impl std::fmt::Debug) -> SwapError {
669 tracing::warn!("key-value error: {err:?}");
670 SwapError::Other(format!("{err:?}"))
671}
672
673use spin_world::v1::key_value::Error as LegacyError;
674use spin_world::wasi::keyvalue::atomics;
675use spin_world::wasi::keyvalue::atomics::{CasError, HostCas};
676
677fn to_legacy_error(err: Error) -> LegacyError {
678 match err {
679 Error::StoreTableFull => LegacyError::StoreTableFull,
680 Error::NoSuchStore => LegacyError::NoSuchStore,
681 Error::AccessDenied => LegacyError::AccessDenied,
682 Error::Other(s) => LegacyError::Io(s),
683 }
684}
685
686impl spin_world::v1::key_value::Host for KeyValueDispatch {
687 async fn open(&mut self, name: String) -> Result<Result<u32, LegacyError>> {
688 let result = <Self as key_value::HostStore>::open(self, name).await?;
689 Ok(result.map_err(to_legacy_error).map(|s| s.rep()))
690 }
691
692 async fn get(&mut self, store: u32, key: String) -> Result<Result<Vec<u8>, LegacyError>> {
693 let this = Resource::new_borrow(store);
694 let result = <Self as key_value::HostStore>::get(self, this, key).await?;
695 Ok(result
696 .map_err(to_legacy_error)
697 .and_then(|v| v.ok_or(LegacyError::NoSuchKey)))
698 }
699
700 async fn set(
701 &mut self,
702 store: u32,
703 key: String,
704 value: Vec<u8>,
705 ) -> Result<Result<(), LegacyError>> {
706 let this = Resource::new_borrow(store);
707 let result = <Self as key_value::HostStore>::set(self, this, key, value).await?;
708 Ok(result.map_err(to_legacy_error))
709 }
710
711 async fn delete(&mut self, store: u32, key: String) -> Result<Result<(), LegacyError>> {
712 let this = Resource::new_borrow(store);
713 let result = <Self as key_value::HostStore>::delete(self, this, key).await?;
714 Ok(result.map_err(to_legacy_error))
715 }
716
717 async fn exists(&mut self, store: u32, key: String) -> Result<Result<bool, LegacyError>> {
718 let this = Resource::new_borrow(store);
719 let result = <Self as key_value::HostStore>::exists(self, this, key).await?;
720 Ok(result.map_err(to_legacy_error))
721 }
722
723 async fn get_keys(&mut self, store: u32) -> Result<Result<Vec<String>, LegacyError>> {
724 let this = Resource::new_borrow(store);
725 let result = <Self as key_value::HostStore>::get_keys(self, this).await?;
726 Ok(result.map_err(to_legacy_error))
727 }
728
729 async fn close(&mut self, store: u32) -> Result<()> {
730 let this = Resource::new_borrow(store);
731 <Self as key_value::HostStore>::drop(self, this).await
732 }
733}