1use std::net::SocketAddr;
2
3use anyhow::Result;
4use redis::AsyncConnectionConfig;
5use redis::io::AsyncDNSResolver;
6use redis::{AsyncCommands, FromRedisValue, Value, aio::MultiplexedConnection};
7use spin_core::wasmtime::component::{Accessor, Resource};
8use spin_factor_otel::OtelFactorState;
9use spin_factor_outbound_networking::config::blocked_networks::BlockedNetworks;
10use spin_world::MAX_HOST_BUFFERED_BYTES;
11use spin_world::spin::redis::redis as v3;
12use spin_world::v1::{redis as v1, redis_types};
13use spin_world::v2::redis as v2;
14use tracing::field::Empty;
15use tracing::{Level, instrument};
16
17use crate::allowed_hosts::AllowedHostChecker;
18
19pub struct InstanceState {
20 pub(crate) allowed_host_checker: AllowedHostChecker,
21 pub blocked_networks: BlockedNetworks,
22 pub connections: spin_resource_table::Table<MultiplexedConnection>,
23 pub otel: OtelFactorState,
24}
25
26impl InstanceState {
27 async fn is_address_allowed(&self, address: &str) -> Result<bool> {
28 self.allowed_host_checker.is_address_allowed(address).await
29 }
30
31 async fn establish_connection(
32 &mut self,
33 address: String,
34 ) -> Result<Resource<v2::Connection>, v2::Error> {
35 let config = AsyncConnectionConfig::new()
36 .set_dns_resolver(SpinDnsResolver(self.blocked_networks.clone()));
37 let conn = redis::Client::open(address.as_str())
38 .map_err(|_| v2::Error::InvalidAddress)?
39 .get_multiplexed_async_connection_with_config(&config)
40 .await
41 .map_err(other_error_v2)?;
42 self.connections
43 .push(conn)
44 .map(Resource::new_own)
45 .map_err(|_| v2::Error::TooManyConnections)
46 }
47
48 async fn get_conn(
49 &mut self,
50 connection: Resource<v2::Connection>,
51 ) -> Result<&mut MultiplexedConnection, v2::Error> {
52 self.connections
53 .get_mut(connection.rep())
54 .ok_or(v2::Error::Other(
55 "could not find connection for resource".into(),
56 ))
57 }
58
59 fn get_conn_v3(
60 &mut self,
61 connection: Resource<v3::Connection>,
62 ) -> Result<MultiplexedConnection, v3::Error> {
63 self.connections
64 .get(connection.rep())
65 .cloned()
66 .ok_or(v3::Error::Other(
67 "could not find connection for resource".into(),
68 ))
69 }
70}
71
72mod operations {
73 use super::*;
74
75 pub async fn publish(
76 conn: &mut MultiplexedConnection,
77 channel: String,
78 payload: v3::Payload,
79 ) -> Result<(), v3::Error> {
80 let () = conn
83 .publish(&channel, &payload)
84 .await
85 .map_err(other_error_v3)?;
86 Ok(())
87 }
88
89 pub async fn get(
90 conn: &mut MultiplexedConnection,
91 key: String,
92 ) -> Result<Option<Vec<u8>>, v3::Error> {
93 let value = conn
94 .get::<_, Option<Vec<u8>>>(&key)
95 .await
96 .map_err(other_error_v3)?;
97
98 if std::mem::size_of::<Option<Vec<u8>>>() + value.as_ref().map(|v| v.len()).unwrap_or(0)
102 > MAX_HOST_BUFFERED_BYTES
103 {
104 Err(v3::Error::Other(format!(
105 "query result exceeds limit of {MAX_HOST_BUFFERED_BYTES} bytes"
106 )))
107 } else {
108 Ok(value)
109 }
110 }
111
112 pub async fn set(
113 conn: &mut MultiplexedConnection,
114 key: String,
115 value: Vec<u8>,
116 ) -> Result<(), v3::Error> {
117 let () = conn.set(&key, &value).await.map_err(other_error_v3)?;
120 Ok(())
121 }
122
123 pub async fn incr(conn: &mut MultiplexedConnection, key: String) -> Result<i64, v3::Error> {
124 conn.incr(&key, 1).await.map_err(other_error_v3)
125 }
126
127 pub async fn del(
128 conn: &mut MultiplexedConnection,
129 keys: Vec<String>,
130 ) -> Result<u32, v3::Error> {
131 conn.del(&keys).await.map_err(other_error_v3)
132 }
133
134 pub async fn sadd(
135 conn: &mut MultiplexedConnection,
136 key: String,
137 values: Vec<String>,
138 ) -> Result<u32, v3::Error> {
139 let value = conn.sadd(&key, &values).await.map_err(|e| {
140 if e.kind() == redis::ErrorKind::TypeError {
141 v3::Error::TypeError
142 } else {
143 v3::Error::Other(e.to_string())
144 }
145 })?;
146 Ok(value)
147 }
148
149 pub async fn smembers(
150 conn: &mut MultiplexedConnection,
151 key: String,
152 ) -> Result<Vec<String>, v3::Error> {
153 conn.smembers(&key).await.map_err(other_error_v3)
154 }
155
156 pub async fn srem(
157 conn: &mut MultiplexedConnection,
158 key: String,
159 values: Vec<String>,
160 ) -> Result<u32, v3::Error> {
161 conn.srem(&key, &values).await.map_err(other_error_v3)
162 }
163
164 pub async fn execute(
165 conn: &mut MultiplexedConnection,
166 command: String,
167 arguments: impl IntoIterator<Item = v3::RedisParameter>,
168 ) -> Result<RedisResults, v3::Error> {
169 let mut cmd = redis::cmd(&command);
170 arguments.into_iter().for_each(|value| match value {
171 v3::RedisParameter::Int64(v) => {
172 cmd.arg(v);
173 }
174 v3::RedisParameter::Binary(v) => {
175 cmd.arg(v);
176 }
177 });
178
179 let results = cmd
180 .query_async::<RedisResults>(conn)
181 .await
182 .map_err(other_error_v3)?;
183
184 if std::mem::size_of::<Vec<v3::RedisResult>>()
188 + results.0.iter().map(memory_size).sum::<usize>()
189 > MAX_HOST_BUFFERED_BYTES
190 {
191 Err(v3::Error::Other(format!(
192 "query result exceeds limit of {MAX_HOST_BUFFERED_BYTES} bytes"
193 )))
194 } else {
195 Ok(results)
196 }
197 }
198}
199
200impl v3::Host for crate::InstanceState {
201 fn convert_error(&mut self, error: v3::Error) -> anyhow::Result<v3::Error> {
202 Ok(error)
203 }
204}
205
206impl v3::HostConnection for crate::InstanceState {
207 async fn drop(&mut self, connection: Resource<v3::Connection>) -> anyhow::Result<()> {
208 self.connections.remove(connection.rep());
209 Ok(())
210 }
211}
212
213impl crate::RedisFactorData {
214 fn get_conn<T: Send>(
215 accessor: &Accessor<T, Self>,
216 connection: Resource<v3::Connection>,
217 ) -> Result<MultiplexedConnection, v3::Error> {
218 accessor.with(|mut access| {
219 let host = access.get();
220 host.otel.reparent_tracing_span();
221 host.get_conn_v3(connection)
222 })
223 }
224}
225
226impl v3::HostConnectionWithStore for crate::RedisFactorData {
227 #[instrument(name = "spin_outbound_redis.open_connection", skip(accessor, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", db.address = Empty, server.port = Empty, db.namespace = Empty))]
228 async fn open<T: Send>(
229 accessor: &Accessor<T, Self>,
230 address: String,
231 ) -> Result<Resource<v3::Connection>, v3::Error> {
232 let (allowed_host_checker, blocked_networks) = accessor.with(|mut access| {
233 let host = access.get();
234 host.otel.reparent_tracing_span();
235 (
236 host.allowed_host_checker.clone(),
237 host.blocked_networks.clone(),
238 )
239 });
240
241 if !allowed_host_checker
242 .is_address_allowed(&address)
243 .await
244 .map_err(|e| v3::Error::Other(e.to_string()))?
245 {
246 return Err(v3::Error::InvalidAddress);
247 }
248
249 let config =
250 AsyncConnectionConfig::new().set_dns_resolver(SpinDnsResolver(blocked_networks));
251 let conn = redis::Client::open(address.as_str())
252 .map_err(|_| v3::Error::InvalidAddress)?
253 .get_multiplexed_async_connection_with_config(&config)
254 .await
255 .map_err(other_error_v3)?;
256
257 accessor.with(|mut access| {
258 let host = access.get();
259 host.connections
260 .push(conn)
261 .map(Resource::new_own)
262 .map_err(|_| v3::Error::TooManyConnections)
263 })
264 }
265
266 #[instrument(name = "spin_outbound_redis.publish", skip(accessor, connection, payload), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("PUBLISH {}", channel)))]
267 async fn publish<T: Send>(
268 accessor: &Accessor<T, Self>,
269 connection: Resource<v3::Connection>,
270 channel: String,
271 payload: v3::Payload,
272 ) -> Result<(), v3::Error> {
273 let mut conn = Self::get_conn(accessor, connection)?;
274 operations::publish(&mut conn, channel, payload).await
275 }
276
277 #[instrument(name = "spin_outbound_redis.get", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("GET {}", key)))]
278 async fn get<T: Send>(
279 accessor: &Accessor<T, Self>,
280 connection: Resource<v3::Connection>,
281 key: String,
282 ) -> Result<Option<v3::Payload>, v3::Error> {
283 let mut conn = Self::get_conn(accessor, connection)?;
284 operations::get(&mut conn, key).await
285 }
286
287 #[instrument(name = "spin_outbound_redis.set", skip(accessor, connection, value), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SET {}", key)))]
288 async fn set<T: Send>(
289 accessor: &Accessor<T, Self>,
290 connection: Resource<v3::Connection>,
291 key: String,
292 value: v3::Payload,
293 ) -> Result<(), v3::Error> {
294 let mut conn = Self::get_conn(accessor, connection)?;
295 operations::set(&mut conn, key, value).await
296 }
297
298 #[instrument(name = "spin_outbound_redis.incr", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("INCRBY {} 1", key)))]
299 async fn incr<T: Send>(
300 accessor: &Accessor<T, Self>,
301 connection: Resource<v3::Connection>,
302 key: String,
303 ) -> Result<i64, v3::Error> {
304 let mut conn = Self::get_conn(accessor, connection)?;
305 operations::incr(&mut conn, key).await
306 }
307
308 #[instrument(name = "spin_outbound_redis.del", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("DEL {}", keys.join(" "))))]
309 async fn del<T: Send>(
310 accessor: &Accessor<T, Self>,
311 connection: Resource<v3::Connection>,
312 keys: Vec<String>,
313 ) -> Result<u32, v3::Error> {
314 let mut conn = Self::get_conn(accessor, connection)?;
315 operations::del(&mut conn, keys).await
316 }
317
318 #[instrument(name = "spin_outbound_redis.sadd", skip(accessor, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SADD {} {}", key, values.join(" "))))]
319 async fn sadd<T: Send>(
320 accessor: &Accessor<T, Self>,
321 connection: Resource<v3::Connection>,
322 key: String,
323 values: Vec<String>,
324 ) -> Result<u32, v3::Error> {
325 let mut conn = Self::get_conn(accessor, connection)?;
326 operations::sadd(&mut conn, key, values).await
327 }
328
329 #[instrument(name = "spin_outbound_redis.smembers", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SMEMBERS {}", key)))]
330 async fn smembers<T: Send>(
331 accessor: &Accessor<T, Self>,
332 connection: Resource<v3::Connection>,
333 key: String,
334 ) -> Result<Vec<String>, v3::Error> {
335 let mut conn = Self::get_conn(accessor, connection)?;
336 operations::smembers(&mut conn, key).await
337 }
338
339 #[instrument(name = "spin_outbound_redis.srem", skip(accessor, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SREM {} {}", key, values.join(" "))))]
340 async fn srem<T: Send>(
341 accessor: &Accessor<T, Self>,
342 connection: Resource<v3::Connection>,
343 key: String,
344 values: Vec<String>,
345 ) -> Result<u32, v3::Error> {
346 let mut conn = Self::get_conn(accessor, connection)?;
347 operations::srem(&mut conn, key, values).await
348 }
349
350 #[instrument(name = "spin_outbound_redis.execute", skip(accessor, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("{}", command)))]
351 async fn execute<T: Send>(
352 accessor: &Accessor<T, Self>,
353 connection: Resource<v3::Connection>,
354 command: String,
355 arguments: Vec<v3::RedisParameter>,
356 ) -> Result<Vec<v3::RedisResult>, v3::Error> {
357 let mut conn = Self::get_conn(accessor, connection)?;
358 Ok(operations::execute(&mut conn, command, arguments)
359 .await?
360 .into_v3())
361 }
362}
363
364impl v2::Host for crate::InstanceState {
365 fn convert_error(&mut self, error: v2::Error) -> Result<v2::Error> {
366 Ok(error)
367 }
368}
369
370impl v2::HostConnection for crate::InstanceState {
371 #[instrument(name = "spin_outbound_redis.open_connection", skip(self, address), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", db.address = Empty, server.port = Empty, db.namespace = Empty))]
372 async fn open(&mut self, address: String) -> Result<Resource<v2::Connection>, v2::Error> {
373 self.otel.reparent_tracing_span();
374 if !self
375 .is_address_allowed(&address)
376 .await
377 .map_err(|e| v2::Error::Other(e.to_string()))?
378 {
379 return Err(v2::Error::InvalidAddress);
380 }
381
382 self.establish_connection(address).await
383 }
384
385 #[instrument(name = "spin_outbound_redis.publish", skip(self, connection, payload), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("PUBLISH {}", channel)))]
386 async fn publish(
387 &mut self,
388 connection: Resource<v2::Connection>,
389 channel: String,
390 payload: Vec<u8>,
391 ) -> Result<(), v2::Error> {
392 self.otel.reparent_tracing_span();
393
394 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
395
396 Ok(operations::publish(conn, channel, payload).await?)
397 }
398
399 #[instrument(name = "spin_outbound_redis.get", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("GET {}", key)))]
400 async fn get(
401 &mut self,
402 connection: Resource<v2::Connection>,
403 key: String,
404 ) -> Result<Option<Vec<u8>>, v2::Error> {
405 self.otel.reparent_tracing_span();
406
407 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
408
409 Ok(operations::get(conn, key).await?)
410 }
411
412 #[instrument(name = "spin_outbound_redis.set", skip(self, connection, value), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SET {}", key)))]
413 async fn set(
414 &mut self,
415 connection: Resource<v2::Connection>,
416 key: String,
417 value: Vec<u8>,
418 ) -> Result<(), v2::Error> {
419 self.otel.reparent_tracing_span();
420
421 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
422 Ok(operations::set(conn, key, value).await?)
423 }
424
425 #[instrument(name = "spin_outbound_redis.incr", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("INCRBY {} 1", key)))]
426 async fn incr(
427 &mut self,
428 connection: Resource<v2::Connection>,
429 key: String,
430 ) -> Result<i64, v2::Error> {
431 self.otel.reparent_tracing_span();
432
433 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
434 Ok(operations::incr(conn, key).await?)
435 }
436
437 #[instrument(name = "spin_outbound_redis.del", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("DEL {}", keys.join(" "))))]
438 async fn del(
439 &mut self,
440 connection: Resource<v2::Connection>,
441 keys: Vec<String>,
442 ) -> Result<u32, v2::Error> {
443 self.otel.reparent_tracing_span();
444
445 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
446 Ok(operations::del(conn, keys).await?)
447 }
448
449 #[instrument(name = "spin_outbound_redis.sadd", skip(self, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SADD {} {}", key, values.join(" "))))]
450 async fn sadd(
451 &mut self,
452 connection: Resource<v2::Connection>,
453 key: String,
454 values: Vec<String>,
455 ) -> Result<u32, v2::Error> {
456 self.otel.reparent_tracing_span();
457
458 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
459 Ok(operations::sadd(conn, key, values).await?)
460 }
461
462 #[instrument(name = "spin_outbound_redis.smembers", skip(self, connection), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SMEMBERS {}", key)))]
463 async fn smembers(
464 &mut self,
465 connection: Resource<v2::Connection>,
466 key: String,
467 ) -> Result<Vec<String>, v2::Error> {
468 self.otel.reparent_tracing_span();
469
470 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
471 Ok(operations::smembers(conn, key).await?)
472 }
473
474 #[instrument(name = "spin_outbound_redis.srem", skip(self, connection, values), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("SREM {} {}", key, values.join(" "))))]
475 async fn srem(
476 &mut self,
477 connection: Resource<v2::Connection>,
478 key: String,
479 values: Vec<String>,
480 ) -> Result<u32, v2::Error> {
481 self.otel.reparent_tracing_span();
482
483 let conn = self.get_conn(connection).await.map_err(other_error_v2)?;
484 Ok(operations::srem(conn, key, values).await?)
485 }
486
487 #[instrument(name = "spin_outbound_redis.execute", skip(self, connection, arguments), err(level = Level::INFO), fields(otel.kind = "client", db.system = "redis", otel.name = format!("{}", command)))]
488 async fn execute(
489 &mut self,
490 connection: Resource<v2::Connection>,
491 command: String,
492 arguments: Vec<v2::RedisParameter>,
493 ) -> Result<Vec<v2::RedisResult>, v2::Error> {
494 fn to_v3_param(value: v2::RedisParameter) -> v3::RedisParameter {
495 match value {
496 v2::RedisParameter::Int64(v) => v3::RedisParameter::Int64(v),
497 v2::RedisParameter::Binary(v) => v3::RedisParameter::Binary(v),
498 }
499 }
500
501 self.otel.reparent_tracing_span();
502
503 let conn = self.get_conn(connection).await?;
504
505 let arguments = arguments.into_iter().map(to_v3_param);
506 Ok(operations::execute(conn, command, arguments)
507 .await?
508 .into_v2())
509 }
510
511 async fn drop(&mut self, connection: Resource<v2::Connection>) -> anyhow::Result<()> {
512 self.connections.remove(connection.rep());
513 Ok(())
514 }
515}
516
517fn other_error_v2(e: impl std::fmt::Display) -> v2::Error {
518 v2::Error::Other(e.to_string())
519}
520
521fn other_error_v3(e: impl std::fmt::Display) -> v3::Error {
522 v3::Error::Other(e.to_string())
523}
524
525macro_rules! delegate {
527 ($self:ident.$name:ident($address:expr, $($arg:expr),*)) => {{
528 if !$self.is_address_allowed(&$address).await.map_err(|_| v1::Error::Error)? {
529 return Err(v1::Error::Error);
530 }
531 let connection = match $self.establish_connection($address).await {
532 Ok(c) => c,
533 Err(_) => return Err(v1::Error::Error),
534 };
535 <Self as v2::HostConnection>::$name($self, connection, $($arg),*)
536 .await
537 .map_err(|_| v1::Error::Error)
538 }};
539}
540
541impl v1::Host for crate::InstanceState {
542 async fn publish(
543 &mut self,
544 address: String,
545 channel: String,
546 payload: Vec<u8>,
547 ) -> Result<(), v1::Error> {
548 delegate!(self.publish(address, channel, payload))
549 }
550
551 async fn get(&mut self, address: String, key: String) -> Result<Vec<u8>, v1::Error> {
552 delegate!(self.get(address, key)).map(|v| v.unwrap_or_default())
553 }
554
555 async fn set(&mut self, address: String, key: String, value: Vec<u8>) -> Result<(), v1::Error> {
556 delegate!(self.set(address, key, value))
557 }
558
559 async fn incr(&mut self, address: String, key: String) -> Result<i64, v1::Error> {
560 delegate!(self.incr(address, key))
561 }
562
563 async fn del(&mut self, address: String, keys: Vec<String>) -> Result<i64, v1::Error> {
564 delegate!(self.del(address, keys)).map(|v| v as i64)
565 }
566
567 async fn sadd(
568 &mut self,
569 address: String,
570 key: String,
571 values: Vec<String>,
572 ) -> Result<i64, v1::Error> {
573 delegate!(self.sadd(address, key, values)).map(|v| v as i64)
574 }
575
576 async fn smembers(&mut self, address: String, key: String) -> Result<Vec<String>, v1::Error> {
577 delegate!(self.smembers(address, key))
578 }
579
580 async fn srem(
581 &mut self,
582 address: String,
583 key: String,
584 values: Vec<String>,
585 ) -> Result<i64, v1::Error> {
586 delegate!(self.srem(address, key, values)).map(|v| v as i64)
587 }
588
589 async fn execute(
590 &mut self,
591 address: String,
592 command: String,
593 arguments: Vec<v1::RedisParameter>,
594 ) -> Result<Vec<v1::RedisResult>, v1::Error> {
595 delegate!(self.execute(
596 address,
597 command,
598 arguments.into_iter().map(Into::into).collect()
599 ))
600 .map(|v| v.into_iter().map(Into::into).collect())
601 }
602}
603
604impl redis_types::Host for crate::InstanceState {
605 fn convert_error(&mut self, error: redis_types::Error) -> Result<redis_types::Error> {
606 Ok(error)
607 }
608}
609
610struct RedisResults(Vec<v3::RedisResult>);
611
612impl RedisResults {
613 fn into_v2(self) -> Vec<v2::RedisResult> {
614 fn into_v2(value: v3::RedisResult) -> v2::RedisResult {
615 match value {
616 v3::RedisResult::Nil => v2::RedisResult::Nil,
617 v3::RedisResult::Status(v) => v2::RedisResult::Status(v),
618 v3::RedisResult::Int64(v) => v2::RedisResult::Int64(v),
619 v3::RedisResult::Binary(v) => v2::RedisResult::Binary(v),
620 }
621 }
622
623 self.0.into_iter().map(into_v2).collect()
624 }
625
626 fn into_v3(self) -> Vec<v3::RedisResult> {
627 self.0
628 }
629}
630
631impl FromRedisValue for RedisResults {
632 fn from_redis_value(value: &Value) -> redis::RedisResult<Self> {
633 fn append(values: &mut Vec<v3::RedisResult>, value: &Value) -> redis::RedisResult<()> {
634 match value {
635 Value::Nil => {
636 values.push(v3::RedisResult::Nil);
637 Ok(())
638 }
639 Value::Int(v) => {
640 values.push(v3::RedisResult::Int64(*v));
641 Ok(())
642 }
643 Value::BulkString(bytes) => {
644 values.push(v3::RedisResult::Binary(bytes.to_owned()));
645 Ok(())
646 }
647 Value::SimpleString(s) => {
648 values.push(v3::RedisResult::Status(s.to_owned()));
649 Ok(())
650 }
651 Value::Okay => {
652 values.push(v3::RedisResult::Status("OK".to_string()));
653 Ok(())
654 }
655 Value::Map(_) => Err(redis::RedisError::from((
656 redis::ErrorKind::TypeError,
657 "Could not convert Redis response",
658 "Redis Map type is not supported".to_string(),
659 ))),
660 Value::Attribute { .. } => Err(redis::RedisError::from((
661 redis::ErrorKind::TypeError,
662 "Could not convert Redis response",
663 "Redis Attribute type is not supported".to_string(),
664 ))),
665 Value::Array(arr) | Value::Set(arr) => {
666 arr.iter().try_for_each(|value| append(values, value))
667 }
668 Value::Double(v) => {
669 values.push(v3::RedisResult::Binary(v.to_string().into_bytes()));
670 Ok(())
671 }
672 Value::VerbatimString { .. } => Err(redis::RedisError::from((
673 redis::ErrorKind::TypeError,
674 "Could not convert Redis response",
675 "Redis string with format attribute is not supported".to_string(),
676 ))),
677 Value::Boolean(v) => {
678 values.push(v3::RedisResult::Int64(if *v { 1 } else { 0 }));
679 Ok(())
680 }
681 Value::BigNumber(v) => {
682 values.push(v3::RedisResult::Binary(v.to_string().as_bytes().to_owned()));
683 Ok(())
684 }
685 Value::Push { .. } => Err(redis::RedisError::from((
686 redis::ErrorKind::TypeError,
687 "Could not convert Redis response",
688 "Redis Pub/Sub types are not supported".to_string(),
689 ))),
690 Value::ServerError(err) => Err(redis::RedisError::from((
691 redis::ErrorKind::ResponseError,
692 "Server error",
693 format!("{err:?}"),
694 ))),
695 }
696 }
697 let mut values = Vec::new();
698 append(&mut values, value)?;
699 Ok(RedisResults(values))
700 }
701}
702
703fn memory_size(value: &v3::RedisResult) -> usize {
704 match value {
705 v3::RedisResult::Nil | v3::RedisResult::Int64(_) => std::mem::size_of::<v3::RedisResult>(),
706 v3::RedisResult::Binary(b) => std::mem::size_of::<v3::RedisResult>() + b.len(),
707 v3::RedisResult::Status(s) => std::mem::size_of::<v3::RedisResult>() + s.len(),
708 }
709}
710
711struct SpinDnsResolver(BlockedNetworks);
713
714impl AsyncDNSResolver for SpinDnsResolver {
715 fn resolve<'a, 'b: 'a>(
716 &'a self,
717 host: &'b str,
718 port: u16,
719 ) -> redis::RedisFuture<'a, Box<dyn Iterator<Item = std::net::SocketAddr> + Send + 'a>> {
720 Box::pin(async move {
721 let mut addrs = tokio::net::lookup_host((host, port))
722 .await?
723 .collect::<Vec<_>>();
724 let blocked_addrs = self.0.remove_blocked(&mut addrs);
726 if addrs.is_empty() && !blocked_addrs.is_empty() {
727 tracing::error!(
728 "error.type" = "destination_ip_prohibited",
729 ?blocked_addrs,
730 "all destination IP(s) prohibited by runtime config"
731 );
732 }
733 Ok(Box::new(addrs.into_iter()) as Box<dyn Iterator<Item = SocketAddr> + Send>)
734 })
735 }
736}