spin_factor_key_value/
lib.rs

1mod host;
2pub mod runtime_config;
3mod util;
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::Arc,
8};
9
10use anyhow::ensure;
11use spin_factors::{
12    ConfigureAppContext, Factor, FactorInstanceBuilder, InitContext, PrepareContext, RuntimeFactors,
13};
14use spin_locked_app::MetadataKey;
15
16/// Metadata key for key-value stores.
17pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
18pub use host::{log_cas_error, log_error, Error, KeyValueDispatch, Store, StoreManager};
19pub use runtime_config::RuntimeConfig;
20use spin_core::async_trait;
21pub use util::DelegatingStoreManager;
22
23/// A factor that provides key-value storage.
24#[derive(Default)]
25pub struct KeyValueFactor {
26    _priv: (),
27}
28
29impl KeyValueFactor {
30    /// Create a new KeyValueFactor.
31    pub fn new() -> Self {
32        Self { _priv: () }
33    }
34}
35
36impl Factor for KeyValueFactor {
37    type RuntimeConfig = RuntimeConfig;
38    type AppState = AppState;
39    type InstanceBuilder = InstanceBuilder;
40
41    fn init(&mut self, ctx: &mut impl InitContext<Self>) -> anyhow::Result<()> {
42        ctx.link_bindings(spin_world::v1::key_value::add_to_linker)?;
43        ctx.link_bindings(spin_world::v2::key_value::add_to_linker)?;
44        ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker)?;
45        ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker)?;
46        ctx.link_bindings(spin_world::wasi::keyvalue::atomics::add_to_linker)?;
47        Ok(())
48    }
49
50    fn configure_app<T: RuntimeFactors>(
51        &self,
52        mut ctx: ConfigureAppContext<T, Self>,
53    ) -> anyhow::Result<Self::AppState> {
54        let store_managers = ctx.take_runtime_config().unwrap_or_default();
55
56        let delegating_manager = DelegatingStoreManager::new(store_managers);
57        let store_manager = Arc::new(delegating_manager);
58
59        // Build component -> allowed stores map
60        let mut component_allowed_stores = HashMap::new();
61        for component in ctx.app().components() {
62            let component_id = component.id().to_string();
63            let key_value_stores = component
64                .get_metadata(KEY_VALUE_STORES_KEY)?
65                .unwrap_or_default()
66                .into_iter()
67                .collect::<HashSet<_>>();
68            for label in &key_value_stores {
69                // TODO: port nicer errors from KeyValueComponent (via error type?)
70                ensure!(
71                    store_manager.is_defined(label),
72                    "unknown key_value_stores label {label:?} for component {component_id:?}"
73                );
74            }
75            component_allowed_stores.insert(component_id, key_value_stores);
76            // TODO: warn (?) on unused store?
77        }
78
79        Ok(AppState {
80            store_manager,
81            component_allowed_stores,
82        })
83    }
84
85    fn prepare<T: RuntimeFactors>(
86        &self,
87        ctx: PrepareContext<T, Self>,
88    ) -> anyhow::Result<InstanceBuilder> {
89        let app_state = ctx.app_state();
90        let allowed_stores = app_state
91            .component_allowed_stores
92            .get(ctx.app_component().id())
93            .expect("component should be in component_stores")
94            .clone();
95        Ok(InstanceBuilder {
96            store_manager: app_state.store_manager.clone(),
97            allowed_stores,
98        })
99    }
100}
101
102type AppStoreManager = DelegatingStoreManager;
103
104pub struct AppState {
105    /// The store manager for the app.
106    ///
107    /// This is a cache around a delegating store manager. For `get` requests,
108    /// first checks the cache before delegating to the underlying store
109    /// manager.
110    store_manager: Arc<AppStoreManager>,
111    /// The allowed stores for each component.
112    ///
113    /// This is a map from component ID to the set of store labels that the
114    /// component is allowed to use.
115    component_allowed_stores: HashMap<String, HashSet<String>>,
116}
117
118impl AppState {
119    /// Returns the [`StoreManager::summary`] for the given store label.
120    pub fn store_summary(&self, label: &str) -> Option<String> {
121        self.store_manager.summary(label)
122    }
123
124    /// Returns true if the given store label is used by any component.
125    pub fn store_is_used(&self, label: &str) -> bool {
126        self.component_allowed_stores
127            .values()
128            .any(|stores| stores.contains(label))
129    }
130
131    /// Get a store by label.
132    pub async fn get_store(&self, label: &str) -> Option<Arc<dyn Store>> {
133        self.store_manager.get(label).await.ok()
134    }
135}
136
137/// `SwapError` are errors that occur during compare and swap operations
138#[derive(Debug, thiserror::Error)]
139pub enum SwapError {
140    #[error("{0}")]
141    CasFailed(String),
142
143    #[error("{0}")]
144    Other(String),
145}
146
147/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
148///
149/// `current` is expected to get the current value for the key associated with the CAS operation
150/// while also starting what is needed to ensure the value to be replaced will not have mutated
151/// between the time of calling `current` and `swap`. For example, a get from a backend store
152/// may provide the caller with an etag (a version stamp), which can be used with an if-match
153/// header to ensure the version updated is the version that was read (optimistic concurrency).
154/// Rather than an etag, one could start a transaction, if supported by the backing store, which
155/// would provide atomicity.
156///
157/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
158/// operation. If there was no key / value with the given key in the store, the `swap` operation
159/// should **insert** the key and value, disallowing an update.
160#[async_trait]
161pub trait Cas: Sync + Send {
162    async fn current(&self) -> anyhow::Result<Option<Vec<u8>>, Error>;
163    async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
164    async fn bucket_rep(&self) -> u32;
165    async fn key(&self) -> String;
166}
167
168pub struct InstanceBuilder {
169    /// The store manager for the app.
170    ///
171    /// This is a cache around a delegating store manager. For `get` requests,
172    /// first checks the cache before delegating to the underlying store
173    /// manager.
174    store_manager: Arc<AppStoreManager>,
175    /// The allowed stores for this component instance.
176    allowed_stores: HashSet<String>,
177}
178
179impl FactorInstanceBuilder for InstanceBuilder {
180    type InstanceState = KeyValueDispatch;
181
182    fn build(self) -> anyhow::Result<Self::InstanceState> {
183        let Self {
184            store_manager,
185            allowed_stores,
186        } = self;
187        Ok(KeyValueDispatch::new_with_capacity(
188            allowed_stores,
189            store_manager,
190            u32::MAX,
191        ))
192    }
193}