Skip to main content

spin_factor_outbound_redis/
host.rs

1use std::net::SocketAddr;
2
3use anyhow::Result;
4use redis::AsyncConnectionConfig;
5use redis::io::AsyncDNSResolver;
6use redis::{AsyncCommands, FromRedisValue, Value, aio::MultiplexedConnection};
7use spin_core::wasmtime::component::{Accessor, Resource};
8use spin_factor_otel::OtelFactorState;
9use spin_factor_outbound_networking::config::blocked_networks::BlockedNetworks;
10use spin_world::MAX_HOST_BUFFERED_BYTES;
11use spin_world::spin::redis::redis as v3;
12use spin_world::v1::{redis as v1, redis_types};
13use spin_world::v2::redis as v2;
14use tracing::field::Empty;
15use tracing::{Level, instrument};
16
17use crate::allowed_hosts::AllowedHostChecker;
18
19pub struct InstanceState {
20    pub(crate) allowed_host_checker: AllowedHostChecker,
21    pub blocked_networks: BlockedNetworks,
22    pub connections: spin_resource_table::Table<MultiplexedConnection>,
23    pub otel: OtelFactorState,
24}
25
26impl InstanceState {
27    async fn is_address_allowed(&self, address: &str) -> Result<bool> {
28        self.allowed_host_checker.is_address_allowed(address).await
29    }
30
31    async fn establish_connection(
32        &mut self,
33        address: String,
34    ) -> Result<Resource<v2::Connection>, v2::Error> {
35        let config = AsyncConnectionConfig::new()
36            .set_dns_resolver(SpinDnsResolver(self.blocked_networks.clone()));
37        let conn = redis::Client::open(address.as_str())
38            .map_err(|_| v2::Error::InvalidAddress)?
39            .get_multiplexed_async_connection_with_config(&config)
40            .await
41            .map_err(other_error_v2)?;
42        self.connections
43            .push(conn)
44            .map(Resource::new_own)
45            .map_err(|_| v2::Error::TooManyConnections)
46    }
47
48    async fn get_conn(
49        &mut self,
50        connection: Resource<v2::Connection>,
51    ) -> Result<&mut MultiplexedConnection, v2::Error> {
52        self.connections
53            .get_mut(connection.rep())
54            .ok_or(v2::Error::Other(
55                "could not find connection for resource".into(),
56            ))
57    }
58
59    fn get_conn_v3(
60        &mut self,
61        connection: Resource<v3::Connection>,
62    ) -> Result<MultiplexedConnection, v3::Error> {
63        self.connections
64            .get(connection.rep())
65            .cloned()
66            .ok_or(v3::Error::Other(
67                "could not find connection for resource".into(),
68            ))
69    }
70}
71
72mod operations {
73    use super::*;
74
75    pub async fn publish(
76        conn: &mut MultiplexedConnection,
77        channel: String,
78        payload: v3::Payload,
79    ) -> Result<(), v3::Error> {
80        // The `let () =` syntax is needed to suppress a warning when the result type is inferred.
81        // You can read more about the issue here: <https://github.com/redis-rs/redis-rs/issues/1228>
82        let () = conn
83            .publish(&channel, &payload)
84            .await
85            .map_err(other_error_v3)?;
86        Ok(())
87    }
88
89    pub async fn get(
90        conn: &mut MultiplexedConnection,
91        key: String,
92    ) -> Result<Option<Vec<u8>>, v3::Error> {
93        let value = conn
94            .get::<_, Option<Vec<u8>>>(&key)
95            .await
96            .map_err(other_error_v3)?;
97
98        // Currently there's no way to stream a `GET` result using the `redis`
99        // crate without buffering, so the damage (in terms of host memory
100        // usage) is already done, but we can still enforce the limit:
101        if std::mem::size_of::<Option<Vec<u8>>>() + value.as_ref().map(|v| v.len()).unwrap_or(0)
102            > MAX_HOST_BUFFERED_BYTES
103        {
104            Err(v3::Error::Other(format!(
105                "query result exceeds limit of {MAX_HOST_BUFFERED_BYTES} bytes"
106            )))
107        } else {
108            Ok(value)
109        }
110    }
111
112    pub async fn set(
113        conn: &mut MultiplexedConnection,
114        key: String,
115        value: Vec<u8>,
116    ) -> Result<(), v3::Error> {
117        // The `let () =` syntax is needed to suppress a warning when the result type is inferred.
118        // You can read more about the issue here: <https://github.com/redis-rs/redis-rs/issues/1228>
119        let () = conn.set(&key, &value).await.map_err(other_error_v3)?;
120        Ok(())
121    }
122
123    pub async fn incr(conn: &mut MultiplexedConnection, key: String) -> Result<i64, v3::Error> {
124        conn.incr(&key, 1).await.map_err(other_error_v3)
125    }
126
127    pub async fn del(
128        conn: &mut MultiplexedConnection,
129        keys: Vec<String>,
130    ) -> Result<u32, v3::Error> {
131        conn.del(&keys).await.map_err(other_error_v3)
132    }
133
134    pub async fn sadd(
135        conn: &mut MultiplexedConnection,
136        key: String,
137        values: Vec<String>,
138    ) -> Result<u32, v3::Error> {
139        let value = conn.sadd(&key, &values).await.map_err(|e| {
140            if e.kind() == redis::ErrorKind::TypeError {
141                v3::Error::TypeError
142            } else {
143                v3::Error::Other(e.to_string())
144            }
145        })?;
146        Ok(value)
147    }
148
149    pub async fn smembers(
150        conn: &mut MultiplexedConnection,
151        key: String,
152    ) -> Result<Vec<String>, v3::Error> {
153        conn.smembers(&key).await.map_err(other_error_v3)
154    }
155
156    pub async fn srem(
157        conn: &mut MultiplexedConnection,
158        key: String,
159        values: Vec<String>,
160    ) -> Result<u32, v3::Error> {
161        conn.srem(&key, &values).await.map_err(other_error_v3)
162    }
163
164    pub async fn execute(
165        conn: &mut MultiplexedConnection,
166        command: String,
167        arguments: impl IntoIterator<Item = v3::RedisParameter>,
168    ) -> Result<RedisResults, v3::Error> {
169        let mut cmd = redis::cmd(&command);
170        arguments.into_iter().for_each(|value| match value {
171            v3::RedisParameter::Int64(v) => {
172                cmd.arg(v);
173            }
174            v3::RedisParameter::Binary(v) => {
175                cmd.arg(v);
176            }
177        });
178
179        let results = cmd
180            .query_async::<RedisResults>(conn)
181            .await
182            .map_err(other_error_v3)?;
183
184        // Currently there's no way to stream results using the `redis`
185        // crate without buffering, so the damage (in terms of host memory
186        // usage) is already done, but we can still enforce the limit:
187        if std::mem::size_of::<Vec<v3::RedisResult>>()
188            + results.0.iter().map(memory_size).sum::<usize>()
189            > MAX_HOST_BUFFERED_BYTES
190        {
191            Err(v3::Error::Other(format!(
192                "query result exceeds limit of {MAX_HOST_BUFFERED_BYTES} bytes"
193            )))
194        } else {
195            Ok(results)
196        }
197    }
198}
199
200impl v3::Host for crate::InstanceState {
201    fn convert_error(&mut self, error: v3::Error) -> anyhow::Result<v3::Error> {
202        Ok(error)
203    }
204}
205
206impl v3::HostConnection for crate::InstanceState {
207    async fn drop(&mut self, connection: Resource<v3::Connection>) -> anyhow::Result<()> {
208        self.connections.remove(connection.rep());
209        Ok(())
210    }
211}
212
213impl crate::RedisFactorData {
214    fn get_conn<T: Send>(
215        accessor: &Accessor<T, Self>,
216        connection: Resource<v3::Connection>,
217    ) -> Result<MultiplexedConnection, v3::Error> {
218        accessor.with(|mut access| {
219            let host = access.get();
220            host.otel.reparent_tracing_span();
221            host.get_conn_v3(connection)
222        })
223    }
224}
225
226impl v3::HostConnectionWithStore for crate::RedisFactorData {
227    #[instrument(name = "spin_outbound_redis.open_connection", skip(accessor, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", db.address = Empty, server.port = Empty, db.namespace = Empty))]
228    async fn open<T: Send>(
229        accessor: &Accessor<T, Self>,
230        address: String,
231    ) -> Result<Resource<v3::Connection>, v3::Error> {
232        let (allowed_host_checker, blocked_networks) = accessor.with(|mut access| {
233            let host = access.get();
234            host.otel.reparent_tracing_span();
235            (
236                host.allowed_host_checker.clone(),
237                host.blocked_networks.clone(),
238            )
239        });
240
241        if !allowed_host_checker
242            .is_address_allowed(&address)
243            .await
244            .map_err(|e| v3::Error::Other(e.to_string()))?
245        {
246            return Err(v3::Error::InvalidAddress);
247        }
248
249        let config =
250            AsyncConnectionConfig::new().set_dns_resolver(SpinDnsResolver(blocked_networks));
251        let conn = redis::Client::open(address.as_str())
252            .map_err(|_| v3::Error::InvalidAddress)?
253            .get_multiplexed_async_connection_with_config(&config)
254            .await
255            .map_err(other_error_v3)?;
256
257        accessor.with(|mut access| {
258            let host = access.get();
259            host.connections
260                .push(conn)
261                .map(Resource::new_own)
262                .map_err(|_| v3::Error::TooManyConnections)
263        })
264    }
265
266    #[instrument(name = "spin_outbound_redis.publish", skip(accessor, connection, payload), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("PUBLISH {}", channel)))]
267    async fn publish<T: Send>(
268        accessor: &Accessor<T, Self>,
269        connection: Resource<v3::Connection>,
270        channel: String,
271        payload: v3::Payload,
272    ) -> Result<(), v3::Error> {
273        let mut conn = Self::get_conn(accessor, connection)?;
274        operations::publish(&mut conn, channel, payload).await
275    }
276
277    #[instrument(name = "spin_outbound_redis.get", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("GET {}", key)))]
278    async fn get<T: Send>(
279        accessor: &Accessor<T, Self>,
280        connection: Resource<v3::Connection>,
281        key: String,
282    ) -> Result<Option<v3::Payload>, v3::Error> {
283        let mut conn = Self::get_conn(accessor, connection)?;
284        operations::get(&mut conn, key).await
285    }
286
287    #[instrument(name = "spin_outbound_redis.set", skip(accessor, connection, value), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SET {}", key)))]
288    async fn set<T: Send>(
289        accessor: &Accessor<T, Self>,
290        connection: Resource<v3::Connection>,
291        key: String,
292        value: v3::Payload,
293    ) -> Result<(), v3::Error> {
294        let mut conn = Self::get_conn(accessor, connection)?;
295        operations::set(&mut conn, key, value).await
296    }
297
298    #[instrument(name = "spin_outbound_redis.incr", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("INCRBY {} 1", key)))]
299    async fn incr<T: Send>(
300        accessor: &Accessor<T, Self>,
301        connection: Resource<v3::Connection>,
302        key: String,
303    ) -> Result<i64, v3::Error> {
304        let mut conn = Self::get_conn(accessor, connection)?;
305        operations::incr(&mut conn, key).await
306    }
307
308    #[instrument(name = "spin_outbound_redis.del", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("DEL {}", keys.join(" "))))]
309    async fn del<T: Send>(
310        accessor: &Accessor<T, Self>,
311        connection: Resource<v3::Connection>,
312        keys: Vec<String>,
313    ) -> Result<u32, v3::Error> {
314        let mut conn = Self::get_conn(accessor, connection)?;
315        operations::del(&mut conn, keys).await
316    }
317
318    #[instrument(name = "spin_outbound_redis.sadd", skip(accessor, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SADD {} {}", key, values.join(" "))))]
319    async fn sadd<T: Send>(
320        accessor: &Accessor<T, Self>,
321        connection: Resource<v3::Connection>,
322        key: String,
323        values: Vec<String>,
324    ) -> Result<u32, v3::Error> {
325        let mut conn = Self::get_conn(accessor, connection)?;
326        operations::sadd(&mut conn, key, values).await
327    }
328
329    #[instrument(name = "spin_outbound_redis.smembers", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SMEMBERS {}", key)))]
330    async fn smembers<T: Send>(
331        accessor: &Accessor<T, Self>,
332        connection: Resource<v3::Connection>,
333        key: String,
334    ) -> Result<Vec<String>, v3::Error> {
335        let mut conn = Self::get_conn(accessor, connection)?;
336        operations::smembers(&mut conn, key).await
337    }
338
339    #[instrument(name = "spin_outbound_redis.srem", skip(accessor, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SREM {} {}", key, values.join(" "))))]
340    async fn srem<T: Send>(
341        accessor: &Accessor<T, Self>,
342        connection: Resource<v3::Connection>,
343        key: String,
344        values: Vec<String>,
345    ) -> Result<u32, v3::Error> {
346        let mut conn = Self::get_conn(accessor, connection)?;
347        operations::srem(&mut conn, key, values).await
348    }
349
350    #[instrument(name = "spin_outbound_redis.execute", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("{}", command)))]
351    async fn execute<T: Send>(
352        accessor: &Accessor<T, Self>,
353        connection: Resource<v3::Connection>,
354        command: String,
355        arguments: Vec<v3::RedisParameter>,
356    ) -> Result<Vec<v3::RedisResult>, v3::Error> {
357        let mut conn = Self::get_conn(accessor, connection)?;
358        Ok(operations::execute(&mut conn, command, arguments)
359            .await?
360            .into_v3())
361    }
362}
363
364impl v2::Host for crate::InstanceState {
365    fn convert_error(&mut self, error: v2::Error) -> Result<v2::Error> {
366        Ok(error)
367    }
368}
369
370impl v2::HostConnection for crate::InstanceState {
371    #[instrument(name = "spin_outbound_redis.open_connection", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", db.address = Empty, server.port = Empty, db.namespace = Empty))]
372    async fn open(&mut self, address: String) -> Result<Resource<v2::Connection>, v2::Error> {
373        self.otel.reparent_tracing_span();
374        if !self
375            .is_address_allowed(&address)
376            .await
377            .map_err(|e| v2::Error::Other(e.to_string()))?
378        {
379            return Err(v2::Error::InvalidAddress);
380        }
381
382        self.establish_connection(address).await
383    }
384
385    #[instrument(name = "spin_outbound_redis.publish", skip(self, connection, payload), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("PUBLISH {}", channel)))]
386    async fn publish(
387        &mut self,
388        connection: Resource<v2::Connection>,
389        channel: String,
390        payload: Vec<u8>,
391    ) -> Result<(), v2::Error> {
392        self.otel.reparent_tracing_span();
393
394        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
395
396        Ok(operations::publish(conn, channel, payload).await?)
397    }
398
399    #[instrument(name = "spin_outbound_redis.get", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("GET {}", key)))]
400    async fn get(
401        &mut self,
402        connection: Resource<v2::Connection>,
403        key: String,
404    ) -> Result<Option<Vec<u8>>, v2::Error> {
405        self.otel.reparent_tracing_span();
406
407        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
408
409        Ok(operations::get(conn, key).await?)
410    }
411
412    #[instrument(name = "spin_outbound_redis.set", skip(self, connection, value), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SET {}", key)))]
413    async fn set(
414        &mut self,
415        connection: Resource<v2::Connection>,
416        key: String,
417        value: Vec<u8>,
418    ) -> Result<(), v2::Error> {
419        self.otel.reparent_tracing_span();
420
421        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
422        Ok(operations::set(conn, key, value).await?)
423    }
424
425    #[instrument(name = "spin_outbound_redis.incr", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("INCRBY {} 1", key)))]
426    async fn incr(
427        &mut self,
428        connection: Resource<v2::Connection>,
429        key: String,
430    ) -> Result<i64, v2::Error> {
431        self.otel.reparent_tracing_span();
432
433        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
434        Ok(operations::incr(conn, key).await?)
435    }
436
437    #[instrument(name = "spin_outbound_redis.del", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("DEL {}", keys.join(" "))))]
438    async fn del(
439        &mut self,
440        connection: Resource<v2::Connection>,
441        keys: Vec<String>,
442    ) -> Result<u32, v2::Error> {
443        self.otel.reparent_tracing_span();
444
445        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
446        Ok(operations::del(conn, keys).await?)
447    }
448
449    #[instrument(name = "spin_outbound_redis.sadd", skip(self, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SADD {} {}", key, values.join(" "))))]
450    async fn sadd(
451        &mut self,
452        connection: Resource<v2::Connection>,
453        key: String,
454        values: Vec<String>,
455    ) -> Result<u32, v2::Error> {
456        self.otel.reparent_tracing_span();
457
458        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
459        Ok(operations::sadd(conn, key, values).await?)
460    }
461
462    #[instrument(name = "spin_outbound_redis.smembers", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SMEMBERS {}", key)))]
463    async fn smembers(
464        &mut self,
465        connection: Resource<v2::Connection>,
466        key: String,
467    ) -> Result<Vec<String>, v2::Error> {
468        self.otel.reparent_tracing_span();
469
470        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
471        Ok(operations::smembers(conn, key).await?)
472    }
473
474    #[instrument(name = "spin_outbound_redis.srem", skip(self, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SREM {} {}", key, values.join(" "))))]
475    async fn srem(
476        &mut self,
477        connection: Resource<v2::Connection>,
478        key: String,
479        values: Vec<String>,
480    ) -> Result<u32, v2::Error> {
481        self.otel.reparent_tracing_span();
482
483        let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
484        Ok(operations::srem(conn, key, values).await?)
485    }
486
487    #[instrument(name = "spin_outbound_redis.execute", skip(self, connection, arguments), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("{}", command)))]
488    async fn execute(
489        &mut self,
490        connection: Resource<v2::Connection>,
491        command: String,
492        arguments: Vec<v2::RedisParameter>,
493    ) -> Result<Vec<v2::RedisResult>, v2::Error> {
494        fn to_v3_param(value: v2::RedisParameter) -> v3::RedisParameter {
495            match value {
496                v2::RedisParameter::Int64(v) => v3::RedisParameter::Int64(v),
497                v2::RedisParameter::Binary(v) => v3::RedisParameter::Binary(v),
498            }
499        }
500
501        self.otel.reparent_tracing_span();
502
503        let conn = self.get_conn(connection).await?;
504
505        let arguments = arguments.into_iter().map(to_v3_param);
506        Ok(operations::execute(conn, command, arguments)
507            .await?
508            .into_v2())
509    }
510
511    async fn drop(&mut self, connection: Resource<v2::Connection>) -> anyhow::Result<()> {
512        self.connections.remove(connection.rep());
513        Ok(())
514    }
515}
516
517fn other_error_v2(e: impl std::fmt::Display) -> v2::Error {
518    v2::Error::Other(e.to_string())
519}
520
521fn other_error_v3(e: impl std::fmt::Display) -> v3::Error {
522    v3::Error::Other(e.to_string())
523}
524
525/// Delegate a function call to the v2::HostConnection implementation
526macro_rules! delegate {
527    ($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{
528        if !$self.is_address_allowed(&$address).await.map_err(|_| v1::Error::Error)?  {
529            return Err(v1::Error::Error);
530        }
531        let connection = match $self.establish_connection($address).await {
532            Ok(c) => c,
533            Err(_) => return Err(v1::Error::Error),
534        };
535        <Self as v2::HostConnection>::$name($self, connection, $($arg),*)
536            .await
537            .map_err(|_| v1::Error::Error)
538    }};
539}
540
541impl v1::Host for crate::InstanceState {
542    async fn publish(
543        &mut self,
544        address: String,
545        channel: String,
546        payload: Vec<u8>,
547    ) -> Result<(), v1::Error> {
548        delegate!(self.publish(address, channel, payload))
549    }
550
551    async fn get(&mut self, address: String, key: String) -> Result<Vec<u8>, v1::Error> {
552        delegate!(self.get(address, key)).map(|v| v.unwrap_or_default())
553    }
554
555    async fn set(&mut self, address: String, key: String, value: Vec<u8>) -> Result<(), v1::Error> {
556        delegate!(self.set(address, key, value))
557    }
558
559    async fn incr(&mut self, address: String, key: String) -> Result<i64, v1::Error> {
560        delegate!(self.incr(address, key))
561    }
562
563    async fn del(&mut self, address: String, keys: Vec<String>) -> Result<i64, v1::Error> {
564        delegate!(self.del(address, keys)).map(|v| v as i64)
565    }
566
567    async fn sadd(
568        &mut self,
569        address: String,
570        key: String,
571        values: Vec<String>,
572    ) -> Result<i64, v1::Error> {
573        delegate!(self.sadd(address, key, values)).map(|v| v as i64)
574    }
575
576    async fn smembers(&mut self, address: String, key: String) -> Result<Vec<String>, v1::Error> {
577        delegate!(self.smembers(address, key))
578    }
579
580    async fn srem(
581        &mut self,
582        address: String,
583        key: String,
584        values: Vec<String>,
585    ) -> Result<i64, v1::Error> {
586        delegate!(self.srem(address, key, values)).map(|v| v as i64)
587    }
588
589    async fn execute(
590        &mut self,
591        address: String,
592        command: String,
593        arguments: Vec<v1::RedisParameter>,
594    ) -> Result<Vec<v1::RedisResult>, v1::Error> {
595        delegate!(self.execute(
596            address,
597            command,
598            arguments.into_iter().map(Into::into).collect()
599        ))
600        .map(|v| v.into_iter().map(Into::into).collect())
601    }
602}
603
604impl redis_types::Host for crate::InstanceState {
605    fn convert_error(&mut self, error: redis_types::Error) -> Result<redis_types::Error> {
606        Ok(error)
607    }
608}
609
610struct RedisResults(Vec<v3::RedisResult>);
611
612impl RedisResults {
613    fn into_v2(self) -> Vec<v2::RedisResult> {
614        fn into_v2(value: v3::RedisResult) -> v2::RedisResult {
615            match value {
616                v3::RedisResult::Nil => v2::RedisResult::Nil,
617                v3::RedisResult::Status(v) => v2::RedisResult::Status(v),
618                v3::RedisResult::Int64(v) => v2::RedisResult::Int64(v),
619                v3::RedisResult::Binary(v) => v2::RedisResult::Binary(v),
620            }
621        }
622
623        self.0.into_iter().map(into_v2).collect()
624    }
625
626    fn into_v3(self) -> Vec<v3::RedisResult> {
627        self.0
628    }
629}
630
631impl FromRedisValue for RedisResults {
632    fn from_redis_value(value: &Value) -> redis::RedisResult<Self> {
633        fn append(values: &mut Vec<v3::RedisResult>, value: &Value) -> redis::RedisResult<()> {
634            match value {
635                Value::Nil => {
636                    values.push(v3::RedisResult::Nil);
637                    Ok(())
638                }
639                Value::Int(v) => {
640                    values.push(v3::RedisResult::Int64(*v));
641                    Ok(())
642                }
643                Value::BulkString(bytes) => {
644                    values.push(v3::RedisResult::Binary(bytes.to_owned()));
645                    Ok(())
646                }
647                Value::SimpleString(s) => {
648                    values.push(v3::RedisResult::Status(s.to_owned()));
649                    Ok(())
650                }
651                Value::Okay => {
652                    values.push(v3::RedisResult::Status("OK".to_string()));
653                    Ok(())
654                }
655                Value::Map(_) => Err(redis::RedisError::from((
656                    redis::ErrorKind::TypeError,
657                    "Could not convert Redis response",
658                    "Redis Map type is not supported".to_string(),
659                ))),
660                Value::Attribute { .. } => Err(redis::RedisError::from((
661                    redis::ErrorKind::TypeError,
662                    "Could not convert Redis response",
663                    "Redis Attribute type is not supported".to_string(),
664                ))),
665                Value::Array(arr) | Value::Set(arr) => {
666                    arr.iter().try_for_each(|value| append(values, value))
667                }
668                Value::Double(v) => {
669                    values.push(v3::RedisResult::Binary(v.to_string().into_bytes()));
670                    Ok(())
671                }
672                Value::VerbatimString { .. } => Err(redis::RedisError::from((
673                    redis::ErrorKind::TypeError,
674                    "Could not convert Redis response",
675                    "Redis string with format attribute is not supported".to_string(),
676                ))),
677                Value::Boolean(v) => {
678                    values.push(v3::RedisResult::Int64(if *v { 1 } else { 0 }));
679                    Ok(())
680                }
681                Value::BigNumber(v) => {
682                    values.push(v3::RedisResult::Binary(v.to_string().as_bytes().to_owned()));
683                    Ok(())
684                }
685                Value::Push { .. } => Err(redis::RedisError::from((
686                    redis::ErrorKind::TypeError,
687                    "Could not convert Redis response",
688                    "Redis Pub/Sub types are not supported".to_string(),
689                ))),
690                Value::ServerError(err) => Err(redis::RedisError::from((
691                    redis::ErrorKind::ResponseError,
692                    "Server error",
693                    format!("{err:?}"),
694                ))),
695            }
696        }
697        let mut values = Vec::new();
698        append(&mut values, value)?;
699        Ok(RedisResults(values))
700    }
701}
702
703fn memory_size(value: &v3::RedisResult) -> usize {
704    match value {
705        v3::RedisResult::Nil | v3::RedisResult::Int64(_) => std::mem::size_of::<v3::RedisResult>(),
706        v3::RedisResult::Binary(b) => std::mem::size_of::<v3::RedisResult>() + b.len(),
707        v3::RedisResult::Status(s) => std::mem::size_of::<v3::RedisResult>() + s.len(),
708    }
709}
710
711/// Resolves DNS using Tokio's resolver, filtering out blocked IPs.
712struct SpinDnsResolver(BlockedNetworks);
713
714impl AsyncDNSResolver for SpinDnsResolver {
715    fn resolve<'a, 'b: 'a>(
716        &'a self,
717        host: &'b str,
718        port: u16,
719    ) -> redis::RedisFuture<'a, Box<dyn Iterator<Item = std::net::SocketAddr> + Send + 'a>> {
720        Box::pin(async move {
721            let mut addrs = tokio::net::lookup_host((host, port))
722                .await?
723                .collect::<Vec<_>>();
724            // Remove blocked IPs
725            let blocked_addrs = self.0.remove_blocked(&mut addrs);
726            if addrs.is_empty() && !blocked_addrs.is_empty() {
727                tracing::error!(
728                    "error.type" = "destination_ip_prohibited",
729                    ?blocked_addrs,
730                    "all destination IP(s) prohibited by runtime config"
731                );
732            }
733            Ok(Box::new(addrs.into_iter()) as Box<dyn Iterator<Item = SocketAddr> + Send>)
734        })
735    }
736}