spin_factor_key_value/
lib.rs1mod host;
2pub mod runtime_config;
3mod util;
4
5use std::{
6 collections::{HashMap, HashSet},
7 sync::Arc,
8};
9
10use anyhow::ensure;
11use spin_factor_otel::OtelFactorState;
12use spin_factors::{
13 ConfigureAppContext, Factor, FactorData, FactorInstanceBuilder, InitContext, PrepareContext,
14 RuntimeFactors,
15};
16use spin_locked_app::MetadataKey;
17
18pub const KEY_VALUE_STORES_KEY: MetadataKey<Vec<String>> = MetadataKey::new("key_value_stores");
20pub use host::to_v3_err;
21pub use host::{
22 log_cas_error, log_error, log_error_v3, Error, KeyValueDispatch, Store, StoreManager,
23};
24pub use runtime_config::RuntimeConfig;
25use spin_core::async_trait;
26pub use spin_world::spin::key_value::key_value as v3;
27pub use util::DelegatingStoreManager;
28
29#[derive(Default)]
31pub struct KeyValueFactor {
32 _priv: (),
33}
34
35impl KeyValueFactor {
36 pub fn new() -> Self {
38 Self { _priv: () }
39 }
40}
41
42impl Factor for KeyValueFactor {
43 type RuntimeConfig = RuntimeConfig;
44 type AppState = AppState;
45 type InstanceBuilder = InstanceBuilder;
46
47 fn init(&mut self, ctx: &mut impl InitContext<Self>) -> anyhow::Result<()> {
48 ctx.link_bindings(spin_world::v1::key_value::add_to_linker::<_, FactorData<Self>>)?;
49 ctx.link_bindings(spin_world::v2::key_value::add_to_linker::<_, FactorData<Self>>)?;
50 ctx.link_bindings(
51 spin_world::spin::key_value::key_value::add_to_linker::<_, KeyValueFactorData>,
52 )?;
53 ctx.link_bindings(spin_world::wasi::keyvalue::store::add_to_linker::<_, FactorData<Self>>)?;
54 ctx.link_bindings(spin_world::wasi::keyvalue::batch::add_to_linker::<_, FactorData<Self>>)?;
55 ctx.link_bindings(
56 spin_world::wasi::keyvalue::atomics::add_to_linker::<_, FactorData<Self>>,
57 )?;
58 Ok(())
59 }
60
61 fn configure_app<T: RuntimeFactors>(
62 &self,
63 mut ctx: ConfigureAppContext<T, Self>,
64 ) -> anyhow::Result<Self::AppState> {
65 let store_managers = ctx.take_runtime_config().unwrap_or_default();
66
67 let delegating_manager = DelegatingStoreManager::new(store_managers);
68 let store_manager = Arc::new(delegating_manager);
69
70 let mut component_allowed_stores = HashMap::new();
72 for component in ctx.app().components() {
73 let component_id = component.id().to_string();
74 let key_value_stores = component
75 .get_metadata(KEY_VALUE_STORES_KEY)?
76 .unwrap_or_default()
77 .into_iter()
78 .collect::<HashSet<_>>();
79 for label in &key_value_stores {
80 ensure!(
82 store_manager.is_defined(label),
83 "unknown key_value_stores label {label:?} for component {component_id:?}"
84 );
85 }
86 component_allowed_stores.insert(component_id, key_value_stores);
87 }
89
90 Ok(AppState {
91 store_manager,
92 component_allowed_stores,
93 })
94 }
95
96 fn prepare<T: RuntimeFactors>(
97 &self,
98 mut ctx: PrepareContext<T, Self>,
99 ) -> anyhow::Result<InstanceBuilder> {
100 let app_state = ctx.app_state();
101 let allowed_stores = app_state
102 .component_allowed_stores
103 .get(ctx.app_component().id())
104 .expect("component should be in component_stores")
105 .clone();
106 let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
107 Ok(InstanceBuilder {
108 store_manager: app_state.store_manager.clone(),
109 allowed_stores,
110 otel,
111 })
112 }
113}
114
115type AppStoreManager = DelegatingStoreManager;
116
117pub struct AppState {
118 store_manager: Arc<AppStoreManager>,
124 component_allowed_stores: HashMap<String, HashSet<String>>,
129}
130
131impl AppState {
132 pub fn store_summary(&self, label: &str) -> Option<String> {
134 self.store_manager.summary(label)
135 }
136
137 pub fn store_is_used(&self, label: &str) -> bool {
139 self.component_allowed_stores
140 .values()
141 .any(|stores| stores.contains(label))
142 }
143
144 pub async fn get_store(&self, label: &str) -> Option<Arc<dyn Store>> {
146 self.store_manager.get(label).await.ok()
147 }
148}
149
150#[derive(Debug, thiserror::Error)]
152pub enum SwapError {
153 #[error("{0}")]
154 CasFailed(String),
155
156 #[error("{0}")]
157 Other(String),
158}
159
160#[async_trait]
174pub trait Cas: Sync + Send {
175 async fn current(&self, max_result_bytes: usize) -> anyhow::Result<Option<Vec<u8>>, Error>;
176 async fn swap(&self, value: Vec<u8>) -> anyhow::Result<(), SwapError>;
177 async fn bucket_rep(&self) -> u32;
178 async fn key(&self) -> String;
179}
180
181pub struct InstanceBuilder {
182 store_manager: Arc<AppStoreManager>,
188 allowed_stores: HashSet<String>,
190 otel: OtelFactorState,
191}
192
193impl FactorInstanceBuilder for InstanceBuilder {
194 type InstanceState = KeyValueDispatch;
195
196 fn build(self) -> anyhow::Result<Self::InstanceState> {
197 let Self {
198 store_manager,
199 allowed_stores,
200 otel,
201 } = self;
202 Ok(KeyValueDispatch::new_with_capacity(
203 allowed_stores,
204 store_manager,
205 u32::MAX,
206 otel,
207 ))
208 }
209}
210
211pub struct KeyValueFactorData(KeyValueFactor);
212
213impl spin_core::wasmtime::component::HasData for KeyValueFactorData {
214 type Data<'a> = &'a mut KeyValueDispatch;
215}