1 | use std::{collections::HashMap, sync::Arc}; |
2 | |
3 | use event_listener::Event; |
4 | use tracing::{debug, instrument, trace}; |
5 | |
6 | use crate::{ |
7 | async_lock::Mutex, connection::MsgBroadcaster, Executor, Message, OwnedMatchRule, Task, |
8 | }; |
9 | |
10 | use super::socket::ReadHalf; |
11 | |
12 | #[derive (Debug)] |
13 | pub(crate) struct SocketReader { |
14 | socket: Box<dyn ReadHalf>, |
15 | senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>, |
16 | already_received_bytes: Vec<u8>, |
17 | #[cfg (unix)] |
18 | already_received_fds: Vec<std::os::fd::OwnedFd>, |
19 | prev_seq: u64, |
20 | activity_event: Arc<Event>, |
21 | } |
22 | |
23 | impl SocketReader { |
24 | pub fn new( |
25 | socket: Box<dyn ReadHalf>, |
26 | senders: Arc<Mutex<HashMap<Option<OwnedMatchRule>, MsgBroadcaster>>>, |
27 | already_received_bytes: Vec<u8>, |
28 | #[cfg (unix)] already_received_fds: Vec<std::os::fd::OwnedFd>, |
29 | activity_event: Arc<Event>, |
30 | ) -> Self { |
31 | Self { |
32 | socket, |
33 | senders, |
34 | already_received_bytes, |
35 | #[cfg (unix)] |
36 | already_received_fds, |
37 | prev_seq: 0, |
38 | activity_event, |
39 | } |
40 | } |
41 | |
42 | pub fn spawn(self, executor: &Executor<'_>) -> Task<()> { |
43 | executor.spawn(self.receive_msg(), "socket reader" ) |
44 | } |
45 | |
46 | // Keep receiving messages and put them on the queue. |
47 | #[instrument (name = "socket reader" , skip(self))] |
48 | async fn receive_msg(mut self) { |
49 | loop { |
50 | trace!("Waiting for message on the socket.." ); |
51 | let msg = self.read_socket().await; |
52 | match &msg { |
53 | Ok(msg) => trace!("Message received on the socket: {:?}" , msg), |
54 | Err(e) => trace!("Error reading from the socket: {:?}" , e), |
55 | }; |
56 | |
57 | let mut senders = self.senders.lock().await; |
58 | for (rule, sender) in &*senders { |
59 | if let Ok(msg) = &msg { |
60 | if let Some(rule) = rule.as_ref() { |
61 | match rule.matches(msg) { |
62 | Ok(true) => (), |
63 | Ok(false) => continue, |
64 | Err(e) => { |
65 | debug!("Error matching message against rule: {:?}" , e); |
66 | |
67 | continue; |
68 | } |
69 | } |
70 | } |
71 | } |
72 | |
73 | if let Err(e) = sender.broadcast_direct(msg.clone()).await { |
74 | // An error would be due to either of these: |
75 | // |
76 | // 1. the channel is closed. |
77 | // 2. No active receivers. |
78 | // |
79 | // In either case, just log it. |
80 | trace!( |
81 | "Error broadcasting message to stream for ` {:?}`: {:?}" , |
82 | rule, |
83 | e |
84 | ); |
85 | } |
86 | } |
87 | trace!("Broadcasted to all streams: {:?}" , msg); |
88 | |
89 | if msg.is_err() { |
90 | senders.clear(); |
91 | trace!("Socket reading task stopped" ); |
92 | |
93 | return; |
94 | } |
95 | } |
96 | } |
97 | |
98 | #[instrument ] |
99 | async fn read_socket(&mut self) -> crate::Result<Message> { |
100 | self.activity_event.notify(usize::MAX); |
101 | let seq = self.prev_seq + 1; |
102 | let msg = self |
103 | .socket |
104 | .receive_message( |
105 | seq, |
106 | &mut self.already_received_bytes, |
107 | #[cfg (unix)] |
108 | &mut self.already_received_fds, |
109 | ) |
110 | .await?; |
111 | self.prev_seq = seq; |
112 | |
113 | Ok(msg) |
114 | } |
115 | } |
116 | |