1 | //! An MPSC channel whose receiving end is an event source |
2 | //! |
3 | //! Create a channel using [`channel()`](channel), which returns a |
4 | //! [`Sender`] that can be cloned and sent accross threads if `T: Send`, |
5 | //! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop). |
6 | //! It will generate one event per message. |
7 | //! |
8 | //! A synchronous version of the channel is provided by [`sync_channel`], in which |
9 | //! the [`SyncSender`] will block when the channel is full. |
10 | |
11 | use std::sync::mpsc; |
12 | |
13 | use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; |
14 | |
15 | use super::ping::{make_ping, Ping, PingError, PingSource}; |
16 | |
17 | /// The events generated by the channel event source |
18 | #[derive (Debug)] |
19 | pub enum Event<T> { |
20 | /// A message was received and is bundled here |
21 | Msg(T), |
22 | /// The channel was closed |
23 | /// |
24 | /// This means all the `Sender`s associated with this channel |
25 | /// have been dropped, no more messages will ever be received. |
26 | Closed, |
27 | } |
28 | |
29 | /// The sender end of a channel |
30 | /// |
31 | /// It can be cloned and sent accross threads (if `T` is). |
32 | #[derive (Debug)] |
33 | pub struct Sender<T> { |
34 | sender: mpsc::Sender<T>, |
35 | ping: Ping, |
36 | } |
37 | |
38 | impl<T> Clone for Sender<T> { |
39 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
40 | fn clone(&self) -> Sender<T> { |
41 | Sender { |
42 | sender: self.sender.clone(), |
43 | ping: self.ping.clone(), |
44 | } |
45 | } |
46 | } |
47 | |
48 | impl<T> Sender<T> { |
49 | /// Send a message to the channel |
50 | /// |
51 | /// This will wake the event loop and deliver an `Event::Msg` to |
52 | /// it containing the provided value. |
53 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
54 | self.sender.send(t).map(|()| self.ping.ping()) |
55 | } |
56 | } |
57 | |
58 | impl<T> Drop for Sender<T> { |
59 | fn drop(&mut self) { |
60 | // ping on drop, to notify about channel closure |
61 | self.ping.ping(); |
62 | } |
63 | } |
64 | |
65 | /// The sender end of a synchronous channel |
66 | /// |
67 | /// It can be cloned and sent accross threads (if `T` is). |
68 | #[derive (Debug)] |
69 | pub struct SyncSender<T> { |
70 | sender: mpsc::SyncSender<T>, |
71 | ping: Ping, |
72 | } |
73 | |
74 | impl<T> Clone for SyncSender<T> { |
75 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
76 | fn clone(&self) -> SyncSender<T> { |
77 | SyncSender { |
78 | sender: self.sender.clone(), |
79 | ping: self.ping.clone(), |
80 | } |
81 | } |
82 | } |
83 | |
84 | impl<T> SyncSender<T> { |
85 | /// Send a message to the synchronous channel |
86 | /// |
87 | /// This will wake the event loop and deliver an `Event::Msg` to |
88 | /// it containing the provided value. If the channel is full, this |
89 | /// function will block until the event loop empties it and it can |
90 | /// deliver the message. |
91 | /// |
92 | /// Due to the blocking behavior, this method should not be used on the |
93 | /// same thread as the one running the event loop, as it could cause deadlocks. |
94 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
95 | let ret = self.try_send(t); |
96 | match ret { |
97 | Ok(()) => Ok(()), |
98 | Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()), |
99 | Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)), |
100 | } |
101 | } |
102 | |
103 | /// Send a message to the synchronous channel |
104 | /// |
105 | /// This will wake the event loop and deliver an `Event::Msg` to |
106 | /// it containing the provided value. If the channel is full, this |
107 | /// function will return an error, but the event loop will still be |
108 | /// signaled for readiness. |
109 | pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> { |
110 | let ret = self.sender.try_send(t); |
111 | if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret { |
112 | self.ping.ping(); |
113 | } |
114 | ret |
115 | } |
116 | } |
117 | |
118 | /// The receiving end of the channel |
119 | /// |
120 | /// This is the event source to be inserted into your `EventLoop`. |
121 | #[derive (Debug)] |
122 | pub struct Channel<T> { |
123 | receiver: mpsc::Receiver<T>, |
124 | source: PingSource, |
125 | } |
126 | |
127 | // This impl is safe because the Channel is only able to move around threads |
128 | // when it is not inserted into an event loop. (Otherwise it is stuck into |
129 | // a Source<_> and the internals of calloop, which are not Send). |
130 | // At this point, the Arc<Receiver> has a count of 1, and it is obviously |
131 | // safe to Send between threads. |
132 | unsafe impl<T: Send> Send for Channel<T> {} |
133 | |
134 | impl<T> Channel<T> { |
135 | /// Proxy for [`mpsc::Receiver::recv`] to manually poll events. |
136 | /// |
137 | /// *Note*: Normally you would want to use the `Channel` by inserting |
138 | /// it into an event loop instead. Use this for example to immediately |
139 | /// dispatch pending events after creation. |
140 | pub fn recv(&self) -> Result<T, mpsc::RecvError> { |
141 | self.receiver.recv() |
142 | } |
143 | |
144 | /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events. |
145 | /// |
146 | /// *Note*: Normally you would want to use the `Channel` by inserting |
147 | /// it into an event loop instead. Use this for example to immediately |
148 | /// dispatch pending events after creation. |
149 | pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { |
150 | self.receiver.try_recv() |
151 | } |
152 | } |
153 | |
154 | /// Create a new asynchronous channel |
155 | pub fn channel<T>() -> (Sender<T>, Channel<T>) { |
156 | let (sender: Sender, receiver: Receiver) = mpsc::channel(); |
157 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
158 | (Sender { sender, ping }, Channel { receiver, source }) |
159 | } |
160 | |
161 | /// Create a new synchronous, bounded channel |
162 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) { |
163 | let (sender: SyncSender, receiver: Receiver) = mpsc::sync_channel(bound); |
164 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
165 | (SyncSender { sender, ping }, Channel { receiver, source }) |
166 | } |
167 | |
168 | impl<T> EventSource for Channel<T> { |
169 | type Event = Event<T>; |
170 | type Metadata = (); |
171 | type Ret = (); |
172 | type Error = ChannelError; |
173 | |
174 | fn process_events<C>( |
175 | &mut self, |
176 | readiness: Readiness, |
177 | token: Token, |
178 | mut callback: C, |
179 | ) -> Result<PostAction, Self::Error> |
180 | where |
181 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
182 | { |
183 | let receiver = &self.receiver; |
184 | self.source |
185 | .process_events(readiness, token, |(), &mut ()| loop { |
186 | match receiver.try_recv() { |
187 | Ok(val) => callback(Event::Msg(val), &mut ()), |
188 | Err(mpsc::TryRecvError::Empty) => break, |
189 | Err(mpsc::TryRecvError::Disconnected) => { |
190 | callback(Event::Closed, &mut ()); |
191 | break; |
192 | } |
193 | } |
194 | }) |
195 | .map_err(ChannelError) |
196 | } |
197 | |
198 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
199 | self.source.register(poll, token_factory) |
200 | } |
201 | |
202 | fn reregister( |
203 | &mut self, |
204 | poll: &mut Poll, |
205 | token_factory: &mut TokenFactory, |
206 | ) -> crate::Result<()> { |
207 | self.source.reregister(poll, token_factory) |
208 | } |
209 | |
210 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
211 | self.source.unregister(poll) |
212 | } |
213 | } |
214 | |
215 | /// An error arising from processing events for a channel. |
216 | #[derive (thiserror::Error, Debug)] |
217 | #[error(transparent)] |
218 | pub struct ChannelError(PingError); |
219 | |
220 | #[cfg (test)] |
221 | mod tests { |
222 | use super::*; |
223 | |
224 | #[test ] |
225 | fn basic_channel() { |
226 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
227 | |
228 | let handle = event_loop.handle(); |
229 | |
230 | let (tx, rx) = channel::<()>(); |
231 | |
232 | // (got_msg, got_closed) |
233 | let mut got = (false, false); |
234 | |
235 | let _channel_token = handle |
236 | .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt { |
237 | Event::Msg(()) => { |
238 | got.0 = true; |
239 | } |
240 | Event::Closed => { |
241 | got.1 = true; |
242 | } |
243 | }) |
244 | .unwrap(); |
245 | |
246 | // nothing is sent, nothing is received |
247 | event_loop |
248 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
249 | .unwrap(); |
250 | |
251 | assert_eq!(got, (false, false)); |
252 | |
253 | // a message is send |
254 | tx.send(()).unwrap(); |
255 | event_loop |
256 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
257 | .unwrap(); |
258 | |
259 | assert_eq!(got, (true, false)); |
260 | |
261 | // the sender is dropped |
262 | ::std::mem::drop(tx); |
263 | event_loop |
264 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
265 | .unwrap(); |
266 | |
267 | assert_eq!(got, (true, true)); |
268 | } |
269 | |
270 | #[test ] |
271 | fn basic_sync_channel() { |
272 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
273 | |
274 | let handle = event_loop.handle(); |
275 | |
276 | let (tx, rx) = sync_channel::<()>(2); |
277 | |
278 | let mut received = (0, false); |
279 | |
280 | let _channel_token = handle |
281 | .insert_source( |
282 | rx, |
283 | move |evt, &mut (), received: &mut (u32, bool)| match evt { |
284 | Event::Msg(()) => { |
285 | received.0 += 1; |
286 | } |
287 | Event::Closed => { |
288 | received.1 = true; |
289 | } |
290 | }, |
291 | ) |
292 | .unwrap(); |
293 | |
294 | // nothing is sent, nothing is received |
295 | event_loop |
296 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
297 | .unwrap(); |
298 | |
299 | assert_eq!(received.0, 0); |
300 | assert!(!received.1); |
301 | |
302 | // fill the channel |
303 | tx.send(()).unwrap(); |
304 | tx.send(()).unwrap(); |
305 | assert!(tx.try_send(()).is_err()); |
306 | |
307 | // empty it |
308 | event_loop |
309 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
310 | .unwrap(); |
311 | |
312 | assert_eq!(received.0, 2); |
313 | assert!(!received.1); |
314 | |
315 | // send a final message and drop the sender |
316 | tx.send(()).unwrap(); |
317 | std::mem::drop(tx); |
318 | |
319 | // final read of the channel |
320 | event_loop |
321 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
322 | .unwrap(); |
323 | |
324 | assert_eq!(received.0, 3); |
325 | assert!(received.1); |
326 | } |
327 | } |
328 | |