spin_factor_outbound_mqtt/
lib.rs1mod allowed_hosts;
2mod host;
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use host::InstanceState;
8use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
9use spin_core::async_trait;
10use spin_factor_otel::OtelFactorState;
11use spin_factor_outbound_networking::OutboundNetworkingFactor;
12use spin_factors::{
13 ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
14};
15use spin_world::spin::mqtt::mqtt as v3;
16use spin_world::v2::mqtt as v2;
17use tokio::sync::Mutex;
18
19pub use host::MqttClient;
20
21use crate::host::other_error_v3;
22
23pub struct OutboundMqttFactor {
24 create_client: Arc<dyn ClientCreator>,
25}
26
27impl OutboundMqttFactor {
28 pub fn new(create_client: Arc<dyn ClientCreator>) -> Self {
29 Self { create_client }
30 }
31}
32
33impl Factor for OutboundMqttFactor {
34 type RuntimeConfig = ();
35 type AppState = ();
36 type InstanceBuilder = InstanceState;
37
38 fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
39 ctx.link_bindings(v2::add_to_linker::<_, FactorData<Self>>)?;
40 ctx.link_bindings(v3::add_to_linker::<_, MqttFactorData>)?;
41 Ok(())
42 }
43
44 fn configure_app<T: RuntimeFactors>(
45 &self,
46 _ctx: ConfigureAppContext<T, Self>,
47 ) -> anyhow::Result<Self::AppState> {
48 Ok(())
49 }
50
51 fn prepare<T: RuntimeFactors>(
52 &self,
53 mut ctx: PrepareContext<T, Self>,
54 ) -> anyhow::Result<Self::InstanceBuilder> {
55 let allowed_hosts = ctx
56 .instance_builder::<OutboundNetworkingFactor>()?
57 .allowed_hosts();
58 let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
59
60 Ok(InstanceState::new(
61 allowed_hosts,
62 self.create_client.clone(),
63 otel,
64 ))
65 }
66}
67
68impl SelfInstanceBuilder for InstanceState {}
69
70struct MqttFactorData;
71
72impl spin_core::wasmtime::component::HasData for MqttFactorData {
73 type Data<'a> = &'a mut InstanceState;
74}
75
76pub struct NetworkedMqttClient {
78 inner: rumqttc::AsyncClient,
79 event_loop: Mutex<rumqttc::EventLoop>,
80}
81
82const MQTT_CHANNEL_CAP: usize = 1000;
83
84impl NetworkedMqttClient {
85 pub fn creator() -> Arc<dyn ClientCreator> {
87 Arc::new(|address, username, password, keep_alive_interval| {
88 Ok(Arc::new(NetworkedMqttClient::create(
89 address,
90 username,
91 password,
92 keep_alive_interval,
93 )?) as _)
94 })
95 }
96
97 pub fn create(
99 address: String,
100 username: String,
101 password: String,
102 keep_alive_interval: Duration,
103 ) -> Result<Self, v3::Error> {
104 let mut conn_opts = rumqttc::MqttOptions::parse_url(address).map_err(|e| {
105 tracing::error!("MQTT URL parse error: {e:?}");
106 v3::Error::InvalidAddress
107 })?;
108 conn_opts.set_credentials(username, password);
109 conn_opts.set_keep_alive(keep_alive_interval);
110 let (client, event_loop) = AsyncClient::new(conn_opts, MQTT_CHANNEL_CAP);
111 Ok(Self {
112 inner: client,
113 event_loop: Mutex::new(event_loop),
114 })
115 }
116}
117
118#[async_trait]
119impl MqttClient for NetworkedMqttClient {
120 async fn publish_bytes(
121 &self,
122 topic: String,
123 qos: v3::Qos,
124 payload: Vec<u8>,
125 ) -> Result<(), v3::Error> {
126 let qos = match qos {
127 v3::Qos::AtMostOnce => rumqttc::QoS::AtMostOnce,
128 v3::Qos::AtLeastOnce => rumqttc::QoS::AtLeastOnce,
129 v3::Qos::ExactlyOnce => rumqttc::QoS::ExactlyOnce,
130 };
131 self.inner
133 .publish_bytes(topic, qos, false, payload.into())
134 .await
135 .map_err(other_error_v3)?;
136
137 let mut lock = self.event_loop.lock().await;
140 loop {
141 let event = lock
142 .poll()
143 .await
144 .map_err(|err| v3::Error::ConnectionFailed(err.to_string()))?;
145
146 match (qos, event) {
147 (QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_)))
148 | (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_)))
149 | (QoS::ExactlyOnce, Event::Incoming(Incoming::PubComp(_))) => break,
150
151 (_, _) => continue,
152 }
153 }
154 Ok(())
155 }
156}
157
158#[async_trait]
160pub trait ClientCreator: Send + Sync {
161 fn create(
162 &self,
163 address: String,
164 username: String,
165 password: String,
166 keep_alive_interval: Duration,
167 ) -> Result<Arc<dyn MqttClient>, v3::Error>;
168}
169
170impl<F> ClientCreator for F
171where
172 F: Fn(String, String, String, Duration) -> Result<Arc<dyn MqttClient>, v3::Error> + Send + Sync,
173{
174 fn create(
175 &self,
176 address: String,
177 username: String,
178 password: String,
179 keep_alive_interval: Duration,
180 ) -> Result<Arc<dyn MqttClient>, v3::Error> {
181 self(address, username, password, keep_alive_interval)
182 }
183}