spin_factor_outbound_mqtt/
lib.rs

1mod host;
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use host::other_error;
7use host::InstanceState;
8use rumqttc::{AsyncClient, Event, Incoming, Outgoing, QoS};
9use spin_core::async_trait;
10use spin_factor_outbound_networking::OutboundNetworkingFactor;
11use spin_factors::{
12    ConfigureAppContext, Factor, FactorData, PrepareContext, RuntimeFactors, SelfInstanceBuilder,
13};
14use spin_world::v2::mqtt::{self as v2, Error, Qos};
15use tokio::sync::Mutex;
16
17pub use host::MqttClient;
18
19pub struct OutboundMqttFactor {
20    create_client: Arc<dyn ClientCreator>,
21}
22
23impl OutboundMqttFactor {
24    pub fn new(create_client: Arc<dyn ClientCreator>) -> Self {
25        Self { create_client }
26    }
27}
28
29impl Factor for OutboundMqttFactor {
30    type RuntimeConfig = ();
31    type AppState = ();
32    type InstanceBuilder = InstanceState;
33
34    fn init(&mut self, ctx: &mut impl spin_factors::InitContext<Self>) -> anyhow::Result<()> {
35        ctx.link_bindings(spin_world::v2::mqtt::add_to_linker::<_, FactorData<Self>>)?;
36        Ok(())
37    }
38
39    fn configure_app<T: RuntimeFactors>(
40        &self,
41        _ctx: ConfigureAppContext<T, Self>,
42    ) -> anyhow::Result<Self::AppState> {
43        Ok(())
44    }
45
46    fn prepare<T: RuntimeFactors>(
47        &self,
48        mut ctx: PrepareContext<T, Self>,
49    ) -> anyhow::Result<Self::InstanceBuilder> {
50        let allowed_hosts = ctx
51            .instance_builder::<OutboundNetworkingFactor>()?
52            .allowed_hosts();
53        Ok(InstanceState::new(
54            allowed_hosts,
55            self.create_client.clone(),
56        ))
57    }
58}
59
60impl SelfInstanceBuilder for InstanceState {}
61
62// This is a concrete implementation of the MQTT client using rumqttc.
63pub struct NetworkedMqttClient {
64    inner: rumqttc::AsyncClient,
65    event_loop: Mutex<rumqttc::EventLoop>,
66}
67
68const MQTT_CHANNEL_CAP: usize = 1000;
69
70impl NetworkedMqttClient {
71    /// Create a [`ClientCreator`] that creates a [`NetworkedMqttClient`].
72    pub fn creator() -> Arc<dyn ClientCreator> {
73        Arc::new(|address, username, password, keep_alive_interval| {
74            Ok(Arc::new(NetworkedMqttClient::create(
75                address,
76                username,
77                password,
78                keep_alive_interval,
79            )?) as _)
80        })
81    }
82
83    /// Create a new [`NetworkedMqttClient`] with the given address, username, password, and keep alive interval.
84    pub fn create(
85        address: String,
86        username: String,
87        password: String,
88        keep_alive_interval: Duration,
89    ) -> Result<Self, Error> {
90        let mut conn_opts = rumqttc::MqttOptions::parse_url(address).map_err(|e| {
91            tracing::error!("MQTT URL parse error: {e:?}");
92            Error::InvalidAddress
93        })?;
94        conn_opts.set_credentials(username, password);
95        conn_opts.set_keep_alive(keep_alive_interval);
96        let (client, event_loop) = AsyncClient::new(conn_opts, MQTT_CHANNEL_CAP);
97        Ok(Self {
98            inner: client,
99            event_loop: Mutex::new(event_loop),
100        })
101    }
102}
103
104#[async_trait]
105impl MqttClient for NetworkedMqttClient {
106    async fn publish_bytes(&self, topic: String, qos: Qos, payload: Vec<u8>) -> Result<(), Error> {
107        let qos = match qos {
108            Qos::AtMostOnce => rumqttc::QoS::AtMostOnce,
109            Qos::AtLeastOnce => rumqttc::QoS::AtLeastOnce,
110            Qos::ExactlyOnce => rumqttc::QoS::ExactlyOnce,
111        };
112        // Message published to EventLoop (not MQTT Broker)
113        self.inner
114            .publish_bytes(topic, qos, false, payload.into())
115            .await
116            .map_err(other_error)?;
117
118        // Poll event loop until outgoing publish event is iterated over to send the message to MQTT broker or capture/throw error.
119        // We may revisit this later to manage long running connections, high throughput use cases and their issues in the connection pool.
120        let mut lock = self.event_loop.lock().await;
121        loop {
122            let event = lock
123                .poll()
124                .await
125                .map_err(|err| v2::Error::ConnectionFailed(err.to_string()))?;
126
127            match (qos, event) {
128                (QoS::AtMostOnce, Event::Outgoing(Outgoing::Publish(_)))
129                | (QoS::AtLeastOnce, Event::Incoming(Incoming::PubAck(_)))
130                | (QoS::ExactlyOnce, Event::Incoming(Incoming::PubComp(_))) => break,
131
132                (_, _) => continue,
133            }
134        }
135        Ok(())
136    }
137}
138
139/// A trait for creating MQTT client.
140#[async_trait]
141pub trait ClientCreator: Send + Sync {
142    fn create(
143        &self,
144        address: String,
145        username: String,
146        password: String,
147        keep_alive_interval: Duration,
148    ) -> Result<Arc<dyn MqttClient>, Error>;
149}
150
151impl<F> ClientCreator for F
152where
153    F: Fn(String, String, String, Duration) -> Result<Arc<dyn MqttClient>, Error> + Send + Sync,
154{
155    fn create(
156        &self,
157        address: String,
158        username: String,
159        password: String,
160        keep_alive_interval: Duration,
161    ) -> Result<Arc<dyn MqttClient>, Error> {
162        self(address, username, password, keep_alive_interval)
163    }
164}