Skip to main content

spin_factor_outbound_mqtt/
lib.rs

1mod allowed_hosts;
2mod host;
3
4use std::sync::Arc;
5use std::time::Duration;
6
7use host::InstanceState;
8use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
9use spin_core::async_trait;
10use spin_factor_otel::OtelFactorState;
11use spin_factor_outbound_networking::OutboundNetworkingFactor;
12use spin_factors::{
13    ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
14};
15use spin_world::spin::mqtt::mqtt as v3;
16use spin_world::v2::mqtt as v2;
17use tokio::sync::Mutex;
18
19pub use host::MqttClient;
20
21use crate::host::other_error_v3;
22
23pub struct OutboundMqttFactor {
24    create_client: Arc<dyn ClientCreator>,
25}
26
27impl OutboundMqttFactor {
28    pub fn new(create_client: Arc<dyn ClientCreator>) -> Self {
29        Self { create_client }
30    }
31}
32
33impl Factor for OutboundMqttFactor {
34    type RuntimeConfig = ();
35    type AppState = ();
36    type InstanceBuilder = InstanceState;
37
38    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
39        ctx.link_bindings(v2::add_to_linker::<_, FactorData<Self>>)?;
40        ctx.link_bindings(v3::add_to_linker::<_, MqttFactorData>)?;
41        Ok(())
42    }
43
44    fn configure_app<T: RuntimeFactors>(
45        &self,
46        _ctx: ConfigureAppContext<T, Self>,
47    ) -> anyhow::Result<Self::AppState> {
48        Ok(())
49    }
50
51    fn prepare<T: RuntimeFactors>(
52        &self,
53        mut ctx: PrepareContext<T, Self>,
54    ) -> anyhow::Result<Self::InstanceBuilder> {
55        let allowed_hosts = ctx
56            .instance_builder::<OutboundNetworkingFactor>()?
57            .allowed_hosts();
58        let otel = OtelFactorState::from_prepare_context(&mut ctx)?;
59
60        Ok(InstanceState::new(
61            allowed_hosts,
62            self.create_client.clone(),
63            otel,
64        ))
65    }
66}
67
68impl SelfInstanceBuilder for InstanceState {}
69
70struct MqttFactorData;
71
72impl spin_core::wasmtime::component::HasData for MqttFactorData {
73    type Data<'a> = &'a mut InstanceState;
74}
75
76// This is a concrete implementation of the MQTT client using rumqttc.
77pub struct NetworkedMqttClient {
78    inner: rumqttc::AsyncClient,
79    event_loop: Mutex<rumqttc::EventLoop>,
80}
81
82const MQTT_CHANNEL_CAP: usize = 1000;
83
84impl NetworkedMqttClient {
85    /// Create a [`ClientCreator`] that creates a [`NetworkedMqttClient`].
86    pub fn creator() -> Arc<dyn ClientCreator> {
87        Arc::new(|address, username, password, keep_alive_interval| {
88            Ok(Arc::new(NetworkedMqttClient::create(
89                address,
90                username,
91                password,
92                keep_alive_interval,
93            )?) as _)
94        })
95    }
96
97    /// Create a new [`NetworkedMqttClient`] with the given address, username, password, and keep alive interval.
98    pub fn create(
99        address: String,
100        username: String,
101        password: String,
102        keep_alive_interval: Duration,
103    ) -> Result<Self, v3::Error> {
104        let mut conn_opts = rumqttc::MqttOptions::parse_url(address).map_err(|e| {
105            tracing::error!("MQTT URL parse error: {e:?}");
106            v3::Error::InvalidAddress
107        })?;
108        conn_opts.set_credentials(username, password);
109        conn_opts.set_keep_alive(keep_alive_interval);
110        let (client, event_loop) = AsyncClient::new(conn_opts, MQTT_CHANNEL_CAP);
111        Ok(Self {
112            inner: client,
113            event_loop: Mutex::new(event_loop),
114        })
115    }
116}
117
118#[async_trait]
119impl MqttClient for NetworkedMqttClient {
120    async fn publish_bytes(
121        &self,
122        topic: String,
123        qos: v3::Qos,
124        payload: Vec<u8>,
125    ) -> Result<(), v3::Error> {
126        let qos = match qos {
127            v3::Qos::AtMostOnce => rumqttc::QoS::AtMostOnce,
128            v3::Qos::AtLeastOnce => rumqttc::QoS::AtLeastOnce,
129            v3::Qos::ExactlyOnce => rumqttc::QoS::ExactlyOnce,
130        };
131        // Message published to EventLoop (not MQTT Broker)
132        self.inner
133            .publish_bytes(topic, qos, false, payload.into())
134            .await
135            .map_err(other_error_v3)?;
136
137        // Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error.
138        // We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool.
139        let mut lock = self.event_loop.lock().await;
140        loop {
141            let event = lock
142                .poll()
143                .await
144                .map_err(|err| v3::Error::ConnectionFailed(err.to_string()))?;
145
146            match (qos, event) {
147                (QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_)))
148                | (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_)))
149                | (QoS::ExactlyOnce, Event::Incoming(Incoming::PubComp(_))) => break,
150
151                (_, _) => continue,
152            }
153        }
154        Ok(())
155    }
156}
157
158/// A trait for creating MQTT client.
159#[async_trait]
160pub trait ClientCreator: Send + Sync {
161    fn create(
162        &self,
163        address: String,
164        username: String,
165        password: String,
166        keep_alive_interval: Duration,
167    ) -> Result<Arc<dyn MqttClient>, v3::Error>;
168}
169
170impl<F> ClientCreator for F
171where
172    F: Fn(String, String, String, Duration) -> Result<Arc<dyn MqttClient>, v3::Error> + Send + Sync,
173{
174    fn create(
175        &self,
176        address: String,
177        username: String,
178        password: String,
179        keep_alive_interval: Duration,
180    ) -> Result<Arc<dyn MqttClient>, v3::Error> {
181        self(address, username, password, keep_alive_interval)
182    }
183}