spin_factor_key_value/
lib.rs

1mod host;
2pub mod runtime_config;
3mod util;
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::Arc,
8};
9
10use anyhow::ensure;
11use spin_factors::{
12    ConfigureAppContext, Factor, FactorData, FactorInstanceBuilder, InitContext, PrepareContext,
13    RuntimeFactors,
14};
15use spin_locked_app::MetadataKey;
16
17/// Metadata key for key-value stores.
18pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
19pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
20pub use runtime_config::RuntimeConfig;
21use spin_core::async_trait;
22pub use util::DelegatingStoreManager;
23
24/// A factor that provides key-value storage.
25#[derive(Default)]
26pub struct KeyValueFactor {
27    _priv: (),
28}
29
30impl KeyValueFactor {
31    /// Create a new KeyValueFactor.
32    pub fn new() -> Self {
33        Self { _priv: () }
34    }
35}
36
37impl Factor for KeyValueFactor {
38    type RuntimeConfig = RuntimeConfig;
39    type AppState = AppState;
40    type InstanceBuilder = InstanceBuilder;
41
42    fn init(&mut self, ctx: &mut impl InitContext<Self>) -> anyhow::Result<()> {
43        ctx.link_bindings(spin_world::v1::key_value::add_to_linker::<_, FactorData<Self>>)?;
44        ctx.link_bindings(spin_world::v2::key_value::add_to_linker::<_, FactorData<Self>>)?;
45        ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker::<_, FactorData<Self>>)?;
46        ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker::<_, FactorData<Self>>)?;
47        ctx.link_bindings(
48            spin_world::wasi::keyvalue::atomics::add_to_linker::<_, FactorData<Self>>,
49        )?;
50        Ok(())
51    }
52
53    fn configure_app<T: RuntimeFactors>(
54        &self,
55        mut ctx: ConfigureAppContext<T, Self>,
56    ) -> anyhow::Result<Self::AppState> {
57        let store_managers = ctx.take_runtime_config().unwrap_or_default();
58
59        let delegating_manager = DelegatingStoreManager::new(store_managers);
60        let store_manager = Arc::new(delegating_manager);
61
62        // Build component -> allowed stores map
63        let mut component_allowed_stores = HashMap::new();
64        for component in ctx.app().components() {
65            let component_id = component.id().to_string();
66            let key_value_stores = component
67                .get_metadata(KEY_VALUE_STORES_KEY)?
68                .unwrap_or_default()
69                .into_iter()
70                .collect::<HashSet<_>>();
71            for label in &key_value_stores {
72                // TODO: port nicer errors from KeyValueComponent (via error type?)
73                ensure!(
74                    store_manager.is_defined(label),
75                    "unknown key_value_stores label {label:?} for component {component_id:?}"
76                );
77            }
78            component_allowed_stores.insert(component_id, key_value_stores);
79            // TODO: warn (?) on unused store?
80        }
81
82        Ok(AppState {
83            store_manager,
84            component_allowed_stores,
85        })
86    }
87
88    fn prepare<T: RuntimeFactors>(
89        &self,
90        ctx: PrepareContext<T, Self>,
91    ) -> anyhow::Result<InstanceBuilder> {
92        let app_state = ctx.app_state();
93        let allowed_stores = app_state
94            .component_allowed_stores
95            .get(ctx.app_component().id())
96            .expect("component should be in component_stores")
97            .clone();
98        Ok(InstanceBuilder {
99            store_manager: app_state.store_manager.clone(),
100            allowed_stores,
101        })
102    }
103}
104
105type AppStoreManager = DelegatingStoreManager;
106
107pub struct AppState {
108    /// The store manager for the app.
109    ///
110    /// This is a cache around a delegating store manager. For `get` requests,
111    /// first checks the cache before delegating to the underlying store
112    /// manager.
113    store_manager: Arc<AppStoreManager>,
114    /// The allowed stores for each component.
115    ///
116    /// This is a map from component ID to the set of store labels that the
117    /// component is allowed to use.
118    component_allowed_stores: HashMap<String, HashSet<String>>,
119}
120
121impl AppState {
122    /// Returns the [`StoreManager::summary`] for the given store label.
123    pub fn store_summary(&self, label: &str) -> Option<String> {
124        self.store_manager.summary(label)
125    }
126
127    /// Returns true if the given store label is used by any component.
128    pub fn store_is_used(&self, label: &str) -> bool {
129        self.component_allowed_stores
130            .values()
131            .any(|stores| stores.contains(label))
132    }
133
134    /// Get a store by label.
135    pub async fn get_store(&self, label: &str) -> Option<Arc<dyn Store>> {
136        self.store_manager.get(label).await.ok()
137    }
138}
139
140/// `SwapError` are errors that occur during compare and swap operations
141#[derive(Debug, thiserror::Error)]
142pub enum SwapError {
143    #[error("{0}")]
144    CasFailed(String),
145
146    #[error("{0}")]
147    Other(String),
148}
149
150/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
151///
152/// `current` is expected to get the current value for the key associated with the CAS operation
153/// while also starting what is needed to ensure the value to be replaced will not have mutated
154/// between the time of calling `current` and `swap`. For example, a get from a backend store
155/// may provide the caller with an etag (a version stamp), which can be used with an if-match
156/// header to ensure the version updated is the version that was read (optimistic concurrency).
157/// Rather than an etag, one could start a transaction, if supported by the backing store, which
158/// would provide atomicity.
159///
160/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
161/// operation. If there was no key / value with the given key in the store, the `swap` operation
162/// should **insert** the key and value, disallowing an update.
163#[async_trait]
164pub trait Cas: Sync + Send {
165    async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
166    async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
167    async fn bucket_rep(&self) -> u32;
168    async fn key(&self) -> String;
169}
170
171pub struct InstanceBuilder {
172    /// The store manager for the app.
173    ///
174    /// This is a cache around a delegating store manager. For `get` requests,
175    /// first checks the cache before delegating to the underlying store
176    /// manager.
177    store_manager: Arc<AppStoreManager>,
178    /// The allowed stores for this component instance.
179    allowed_stores: HashSet<String>,
180}
181
182impl FactorInstanceBuilder for InstanceBuilder {
183    type InstanceState = KeyValueDispatch;
184
185    fn build(self) -> anyhow::Result<Self::InstanceState> {
186        let Self {
187            store_manager,
188            allowed_stores,
189        } = self;
190        Ok(KeyValueDispatch::new_with_capacity(
191            allowed_stores,
192            store_manager,
193            u32::MAX,
194        ))
195    }
196}