Skip to main content

spin_factor_key_value/
lib.rs

1mod host;
2pub mod runtime_config;
3mod util;
4
5use std::{
6    collections::{HashMap, HashSet},
7    sync::Arc,
8};
9
10use anyhow::ensure;
11use spin_factor_otel::OtelFactorState;
12use spin_factors::{
13    ConfigureAppContext, Factor, FactorData, FactorInstanceBuilder, InitContext, PrepareContext,
14    RuntimeFactors,
15};
16use spin_locked_app::MetadataKey;
17
18/// Metadata key for key-value stores.
19pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
20pub use host::to_v3_err;
21pub use host::{
22    log_cas_error, log_error, log_error_v3, Error, KeyValueDispatch, Store, StoreManager,
23};
24pub use runtime_config::RuntimeConfig;
25use spin_core::async_trait;
26pub use spin_world::spin::key_value::key_value as v3;
27pub use util::DelegatingStoreManager;
28
29/// A factor that provides key-value storage.
30#[derive(Default)]
31pub struct KeyValueFactor {
32    _priv: (),
33}
34
35impl KeyValueFactor {
36    /// Create a new KeyValueFactor.
37    pub fn new() -> Self {
38        Self { _priv: () }
39    }
40}
41
42impl Factor for KeyValueFactor {
43    type RuntimeConfig = RuntimeConfig;
44    type AppState = AppState;
45    type InstanceBuilder = InstanceBuilder;
46
47    fn init(&mut self, ctx: &mut impl InitContext<Self>) -> anyhow::Result<()> {
48        ctx.link_bindings(spin_world::v1::key_value::add_to_linker::<_, FactorData<Self>>)?;
49        ctx.link_bindings(spin_world::v2::key_value::add_to_linker::<_, FactorData<Self>>)?;
50        ctx.link_bindings(
51            spin_world::spin::key_value::key_value::add_to_linker::<_, KeyValueFactorData>,
52        )?;
53        ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker::<_, FactorData<Self>>)?;
54        ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker::<_, FactorData<Self>>)?;
55        ctx.link_bindings(
56            spin_world::wasi::keyvalue::atomics::add_to_linker::<_, FactorData<Self>>,
57        )?;
58        Ok(())
59    }
60
61    fn configure_app<T: RuntimeFactors>(
62        &self,
63        mut ctx: ConfigureAppContext<T, Self>,
64    ) -> anyhow::Result<Self::AppState> {
65        let store_managers = ctx.take_runtime_config().unwrap_or_default();
66
67        let delegating_manager = DelegatingStoreManager::new(store_managers);
68        let store_manager = Arc::new(delegating_manager);
69
70        // Build component -> allowed stores map
71        let mut component_allowed_stores = HashMap::new();
72        for component in ctx.app().components() {
73            let component_id = component.id().to_string();
74            let key_value_stores = component
75                .get_metadata(KEY_VALUE_STORES_KEY)?
76                .unwrap_or_default()
77                .into_iter()
78                .collect::<HashSet<_>>();
79            for label in &key_value_stores {
80                // TODO: port nicer errors from KeyValueComponent (via error type?)
81                ensure!(
82                    store_manager.is_defined(label),
83                    "unknown key_value_stores label {label:?} for component {component_id:?}"
84                );
85            }
86            component_allowed_stores.insert(component_id, key_value_stores);
87            // TODO: warn (?) on unused store?
88        }
89
90        Ok(AppState {
91            store_manager,
92            component_allowed_stores,
93        })
94    }
95
96    fn prepare<T: RuntimeFactors>(
97        &self,
98        mut ctx: PrepareContext<T, Self>,
99    ) -> anyhow::Result<InstanceBuilder> {
100        let app_state = ctx.app_state();
101        let allowed_stores = app_state
102            .component_allowed_stores
103            .get(ctx.app_component().id())
104            .expect("component should be in component_stores")
105            .clone();
106        let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
107        Ok(InstanceBuilder {
108            store_manager: app_state.store_manager.clone(),
109            allowed_stores,
110            otel,
111        })
112    }
113}
114
115type AppStoreManager = DelegatingStoreManager;
116
117pub struct AppState {
118    /// The store manager for the app.
119    ///
120    /// This is a cache around a delegating store manager. For `get` requests,
121    /// first checks the cache before delegating to the underlying store
122    /// manager.
123    store_manager: Arc<AppStoreManager>,
124    /// The allowed stores for each component.
125    ///
126    /// This is a map from component ID to the set of store labels that the
127    /// component is allowed to use.
128    component_allowed_stores: HashMap<String, HashSet<String>>,
129}
130
131impl AppState {
132    /// Returns the [`StoreManager::summary`] for the given store label.
133    pub fn store_summary(&self, label: &str) -> Option<String> {
134        self.store_manager.summary(label)
135    }
136
137    /// Returns true if the given store label is used by any component.
138    pub fn store_is_used(&self, label: &str) -> bool {
139        self.component_allowed_stores
140            .values()
141            .any(|stores| stores.contains(label))
142    }
143
144    /// Get a store by label.
145    pub async fn get_store(&self, label: &str) -> Option<Arc<dyn Store>> {
146        self.store_manager.get(label).await.ok()
147    }
148}
149
150/// `SwapError` are errors that occur during compare and swap operations
151#[derive(Debug, thiserror::Error)]
152pub enum SwapError {
153    #[error("{0}")]
154    CasFailed(String),
155
156    #[error("{0}")]
157    Other(String),
158}
159
160/// `Cas` trait describes the interface a key value compare and swap implementor must fulfill.
161///
162/// `current` is expected to get the current value for the key associated with the CAS operation
163/// while also starting what is needed to ensure the value to be replaced will not have mutated
164/// between the time of calling `current` and `swap`. For example, a get from a backend store
165/// may provide the caller with an etag (a version stamp), which can be used with an if-match
166/// header to ensure the version updated is the version that was read (optimistic concurrency).
167/// Rather than an etag, one could start a transaction, if supported by the backing store, which
168/// would provide atomicity.
169///
170/// `swap` is expected to replace the old value with the new value respecting the atomicity of the
171/// operation. If there was no key / value with the given key in the store, the `swap` operation
172/// should **insert** the key and value, disallowing an update.
173#[async_trait]
174pub trait Cas: Sync + Send {
175    async fn current(&self, max_result_bytes: usize) -> anyhow::Result<Option<Vec<u8>>, Error>;
176    async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
177    async fn bucket_rep(&self) -> u32;
178    async fn key(&self) -> String;
179}
180
181pub struct InstanceBuilder {
182    /// The store manager for the app.
183    ///
184    /// This is a cache around a delegating store manager. For `get` requests,
185    /// first checks the cache before delegating to the underlying store
186    /// manager.
187    store_manager: Arc<AppStoreManager>,
188    /// The allowed stores for this component instance.
189    allowed_stores: HashSet<String>,
190    otel: OtelFactorState,
191}
192
193impl FactorInstanceBuilder for InstanceBuilder {
194    type InstanceState = KeyValueDispatch;
195
196    fn build(self) -> anyhow::Result<Self::InstanceState> {
197        let Self {
198            store_manager,
199            allowed_stores,
200            otel,
201        } = self;
202        Ok(KeyValueDispatch::new_with_capacity(
203            allowed_stores,
204            store_manager,
205            u32::MAX,
206            otel,
207        ))
208    }
209}
210
211pub struct KeyValueFactorData(KeyValueFactor);
212
213impl spin_core::wasmtime::component::HasData for KeyValueFactorData {
214    type Data<'a> = &'a mut KeyValueDispatch;
215}