| 1 | use std::{collections::HashMap, sync::Arc}; |
| 2 | |
| 3 | use event_listener::Event; |
| 4 | use tracing::{debug, instrument, trace}; |
| 5 | |
| 6 | use crate::{ |
| 7 | async_lock::Mutex, connection::MsgBroadcaster, Executor, Message, OwnedMatchRule, Task, |
| 8 | }; |
| 9 | |
| 10 | use super::socket::ReadHalf; |
| 11 | |
| 12 | #[derive (Debug)] |
| 13 | pub(crate) struct SocketReader { |
| 14 | socket: Box<dyn ReadHalf>, |
| 15 | senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>, |
| 16 | already_received_bytes: Vec<u8>, |
| 17 | #[cfg (unix)] |
| 18 | already_received_fds: Vec<std::os::fd::OwnedFd>, |
| 19 | prev_seq: u64, |
| 20 | activity_event: Arc<Event>, |
| 21 | } |
| 22 | |
| 23 | impl SocketReader { |
| 24 | pub fn new( |
| 25 | socket: Box<dyn ReadHalf>, |
| 26 | senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>, |
| 27 | already_received_bytes: Vec<u8>, |
| 28 | #[cfg (unix)] already_received_fds: Vec<std::os::fd::OwnedFd>, |
| 29 | activity_event: Arc<Event>, |
| 30 | ) -> Self { |
| 31 | Self { |
| 32 | socket, |
| 33 | senders, |
| 34 | already_received_bytes, |
| 35 | #[cfg (unix)] |
| 36 | already_received_fds, |
| 37 | prev_seq: 0, |
| 38 | activity_event, |
| 39 | } |
| 40 | } |
| 41 | |
| 42 | pub fn spawn(self, executor: &Executor<'_>) -> Task<()> { |
| 43 | executor.spawn(self.receive_msg(), "socket reader" ) |
| 44 | } |
| 45 | |
| 46 | // Keep receiving messages and put them on the queue. |
| 47 | #[instrument (name = "socket reader" , skip(self))] |
| 48 | async fn receive_msg(mut self) { |
| 49 | loop { |
| 50 | trace!("Waiting for message on the socket.." ); |
| 51 | let msg = self.read_socket().await; |
| 52 | match &msg { |
| 53 | Ok(msg) => trace!("Message received on the socket: {:?}" , msg), |
| 54 | Err(e) => trace!("Error reading from the socket: {:?}" , e), |
| 55 | }; |
| 56 | |
| 57 | let mut senders = self.senders.lock().await; |
| 58 | for (rule, sender) in &*senders { |
| 59 | if let Ok(msg) = &msg { |
| 60 | if let Some(rule) = rule.as_ref() { |
| 61 | match rule.matches(msg) { |
| 62 | Ok(true) => (), |
| 63 | Ok(false) => continue, |
| 64 | Err(e) => { |
| 65 | debug!("Error matching message against rule: {:?}" , e); |
| 66 | |
| 67 | continue; |
| 68 | } |
| 69 | } |
| 70 | } |
| 71 | } |
| 72 | |
| 73 | if let Err(e) = sender.broadcast_direct(msg.clone()).await { |
| 74 | // An error would be due to either of these: |
| 75 | // |
| 76 | // 1. the channel is closed. |
| 77 | // 2. No active receivers. |
| 78 | // |
| 79 | // In either case, just log it. |
| 80 | trace!( |
| 81 | "Error broadcasting message to stream for ` {:?}`: {:?}" , |
| 82 | rule, |
| 83 | e |
| 84 | ); |
| 85 | } |
| 86 | } |
| 87 | trace!("Broadcasted to all streams: {:?}" , msg); |
| 88 | |
| 89 | if msg.is_err() { |
| 90 | senders.clear(); |
| 91 | trace!("Socket reading task stopped" ); |
| 92 | |
| 93 | return; |
| 94 | } |
| 95 | } |
| 96 | } |
| 97 | |
| 98 | #[instrument ] |
| 99 | async fn read_socket(&mut self) -> crate::Result<Message> { |
| 100 | self.activity_event.notify(usize::MAX); |
| 101 | let seq = self.prev_seq + 1; |
| 102 | let msg = self |
| 103 | .socket |
| 104 | .receive_message( |
| 105 | seq, |
| 106 | &mut self.already_received_bytes, |
| 107 | #[cfg (unix)] |
| 108 | &mut self.already_received_fds, |
| 109 | ) |
| 110 | .await?; |
| 111 | self.prev_seq = seq; |
| 112 | |
| 113 | Ok(msg) |
| 114 | } |
| 115 | } |
| 116 | |