1 | //! An MPSC channel whose receiving end is an event source |
2 | //! |
3 | //! Create a channel using [`channel()`](channel), which returns a |
4 | //! [`Sender`] that can be cloned and sent accross threads if `T: Send`, |
5 | //! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop). |
6 | //! It will generate one event per message. |
7 | //! |
8 | //! A synchronous version of the channel is provided by [`sync_channel`], in which |
9 | //! the [`SyncSender`] will block when the channel is full. |
10 | |
11 | use std::cmp; |
12 | use std::fmt; |
13 | use std::sync::mpsc; |
14 | |
15 | use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; |
16 | |
17 | use super::ping::{make_ping, Ping, PingError, PingSource}; |
18 | |
19 | const MAX_EVENTS_CHECK: usize = 1024; |
20 | |
21 | /// The events generated by the channel event source |
22 | #[derive (Debug)] |
23 | pub enum Event<T> { |
24 | /// A message was received and is bundled here |
25 | Msg(T), |
26 | /// The channel was closed |
27 | /// |
28 | /// This means all the `Sender`s associated with this channel |
29 | /// have been dropped, no more messages will ever be received. |
30 | Closed, |
31 | } |
32 | |
33 | /// The sender end of a channel |
34 | /// |
35 | /// It can be cloned and sent accross threads (if `T` is). |
36 | #[derive (Debug)] |
37 | pub struct Sender<T> { |
38 | sender: mpsc::Sender<T>, |
39 | ping: Ping, |
40 | } |
41 | |
42 | impl<T> Clone for Sender<T> { |
43 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
44 | fn clone(&self) -> Sender<T> { |
45 | Sender { |
46 | sender: self.sender.clone(), |
47 | ping: self.ping.clone(), |
48 | } |
49 | } |
50 | } |
51 | |
52 | impl<T> Sender<T> { |
53 | /// Send a message to the channel |
54 | /// |
55 | /// This will wake the event loop and deliver an `Event::Msg` to |
56 | /// it containing the provided value. |
57 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
58 | self.sender.send(t).map(|()| self.ping.ping()) |
59 | } |
60 | } |
61 | |
62 | impl<T> Drop for Sender<T> { |
63 | fn drop(&mut self) { |
64 | // ping on drop, to notify about channel closure |
65 | self.ping.ping(); |
66 | } |
67 | } |
68 | |
69 | /// The sender end of a synchronous channel |
70 | /// |
71 | /// It can be cloned and sent accross threads (if `T` is). |
72 | #[derive (Debug)] |
73 | pub struct SyncSender<T> { |
74 | sender: mpsc::SyncSender<T>, |
75 | ping: Ping, |
76 | } |
77 | |
78 | impl<T> Clone for SyncSender<T> { |
79 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
80 | fn clone(&self) -> SyncSender<T> { |
81 | SyncSender { |
82 | sender: self.sender.clone(), |
83 | ping: self.ping.clone(), |
84 | } |
85 | } |
86 | } |
87 | |
88 | impl<T> SyncSender<T> { |
89 | /// Send a message to the synchronous channel |
90 | /// |
91 | /// This will wake the event loop and deliver an `Event::Msg` to |
92 | /// it containing the provided value. If the channel is full, this |
93 | /// function will block until the event loop empties it and it can |
94 | /// deliver the message. |
95 | /// |
96 | /// Due to the blocking behavior, this method should not be used on the |
97 | /// same thread as the one running the event loop, as it could cause deadlocks. |
98 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
99 | let ret = self.try_send(t); |
100 | match ret { |
101 | Ok(()) => Ok(()), |
102 | Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()), |
103 | Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)), |
104 | } |
105 | } |
106 | |
107 | /// Send a message to the synchronous channel |
108 | /// |
109 | /// This will wake the event loop and deliver an `Event::Msg` to |
110 | /// it containing the provided value. If the channel is full, this |
111 | /// function will return an error, but the event loop will still be |
112 | /// signaled for readiness. |
113 | pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> { |
114 | let ret = self.sender.try_send(t); |
115 | if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret { |
116 | self.ping.ping(); |
117 | } |
118 | ret |
119 | } |
120 | } |
121 | |
122 | /// The receiving end of the channel |
123 | /// |
124 | /// This is the event source to be inserted into your `EventLoop`. |
125 | #[derive (Debug)] |
126 | pub struct Channel<T> { |
127 | receiver: mpsc::Receiver<T>, |
128 | source: PingSource, |
129 | ping: Ping, |
130 | capacity: usize, |
131 | } |
132 | |
133 | // This impl is safe because the Channel is only able to move around threads |
134 | // when it is not inserted into an event loop. (Otherwise it is stuck into |
135 | // a Source<_> and the internals of calloop, which are not Send). |
136 | // At this point, the Arc<Receiver> has a count of 1, and it is obviously |
137 | // safe to Send between threads. |
138 | unsafe impl<T: Send> Send for Channel<T> {} |
139 | |
140 | impl<T> Channel<T> { |
141 | /// Proxy for [`mpsc::Receiver::recv`] to manually poll events. |
142 | /// |
143 | /// *Note*: Normally you would want to use the `Channel` by inserting |
144 | /// it into an event loop instead. Use this for example to immediately |
145 | /// dispatch pending events after creation. |
146 | pub fn recv(&self) -> Result<T, mpsc::RecvError> { |
147 | self.receiver.recv() |
148 | } |
149 | |
150 | /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events. |
151 | /// |
152 | /// *Note*: Normally you would want to use the `Channel` by inserting |
153 | /// it into an event loop instead. Use this for example to immediately |
154 | /// dispatch pending events after creation. |
155 | pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { |
156 | self.receiver.try_recv() |
157 | } |
158 | } |
159 | |
160 | /// Create a new asynchronous channel |
161 | pub fn channel<T>() -> (Sender<T>, Channel<T>) { |
162 | let (sender: Sender, receiver: Receiver) = mpsc::channel(); |
163 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
164 | ( |
165 | Sender { |
166 | sender, |
167 | ping: ping.clone(), |
168 | }, |
169 | Channel { |
170 | receiver, |
171 | ping, |
172 | source, |
173 | capacity: usize::MAX, |
174 | }, |
175 | ) |
176 | } |
177 | |
178 | /// Create a new synchronous, bounded channel |
179 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) { |
180 | let (sender: SyncSender, receiver: Receiver) = mpsc::sync_channel(bound); |
181 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
182 | ( |
183 | SyncSender { |
184 | sender, |
185 | ping: ping.clone(), |
186 | }, |
187 | Channel { |
188 | receiver, |
189 | source, |
190 | ping, |
191 | capacity: bound, |
192 | }, |
193 | ) |
194 | } |
195 | |
196 | impl<T> EventSource for Channel<T> { |
197 | type Event = Event<T>; |
198 | type Metadata = (); |
199 | type Ret = (); |
200 | type Error = ChannelError; |
201 | |
202 | fn process_events<C>( |
203 | &mut self, |
204 | readiness: Readiness, |
205 | token: Token, |
206 | mut callback: C, |
207 | ) -> Result<PostAction, Self::Error> |
208 | where |
209 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
210 | { |
211 | let receiver = &self.receiver; |
212 | let capacity = self.capacity; |
213 | let mut clear_readiness = false; |
214 | let mut disconnected = false; |
215 | |
216 | let action = self |
217 | .source |
218 | .process_events(readiness, token, |(), &mut ()| { |
219 | // Limit the number of elements we process at a time to the channel's capacity, or 1024. |
220 | let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK); |
221 | for _ in 0..max { |
222 | match receiver.try_recv() { |
223 | Ok(val) => callback(Event::Msg(val), &mut ()), |
224 | Err(mpsc::TryRecvError::Empty) => { |
225 | clear_readiness = true; |
226 | break; |
227 | } |
228 | Err(mpsc::TryRecvError::Disconnected) => { |
229 | callback(Event::Closed, &mut ()); |
230 | disconnected = true; |
231 | break; |
232 | } |
233 | } |
234 | } |
235 | }) |
236 | .map_err(ChannelError)?; |
237 | |
238 | if disconnected { |
239 | Ok(PostAction::Remove) |
240 | } else if clear_readiness { |
241 | Ok(action) |
242 | } else { |
243 | // Re-notify the ping source so we can try again. |
244 | self.ping.ping(); |
245 | Ok(PostAction::Continue) |
246 | } |
247 | } |
248 | |
249 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
250 | self.source.register(poll, token_factory) |
251 | } |
252 | |
253 | fn reregister( |
254 | &mut self, |
255 | poll: &mut Poll, |
256 | token_factory: &mut TokenFactory, |
257 | ) -> crate::Result<()> { |
258 | self.source.reregister(poll, token_factory) |
259 | } |
260 | |
261 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
262 | self.source.unregister(poll) |
263 | } |
264 | } |
265 | |
266 | /// An error arising from processing events for a channel. |
267 | #[derive (Debug)] |
268 | pub struct ChannelError(PingError); |
269 | |
270 | impl fmt::Display for ChannelError { |
271 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
272 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
273 | fmt::Display::fmt(&self.0, f) |
274 | } |
275 | } |
276 | |
277 | impl std::error::Error for ChannelError { |
278 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
279 | fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { |
280 | Some(&self.0) |
281 | } |
282 | } |
283 | |
284 | #[cfg (test)] |
285 | mod tests { |
286 | use super::*; |
287 | |
288 | #[test ] |
289 | fn basic_channel() { |
290 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
291 | |
292 | let handle = event_loop.handle(); |
293 | |
294 | let (tx, rx) = channel::<()>(); |
295 | |
296 | // (got_msg, got_closed) |
297 | let mut got = (false, false); |
298 | |
299 | let _channel_token = handle |
300 | .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt { |
301 | Event::Msg(()) => { |
302 | got.0 = true; |
303 | } |
304 | Event::Closed => { |
305 | got.1 = true; |
306 | } |
307 | }) |
308 | .unwrap(); |
309 | |
310 | // nothing is sent, nothing is received |
311 | event_loop |
312 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
313 | .unwrap(); |
314 | |
315 | assert_eq!(got, (false, false)); |
316 | |
317 | // a message is send |
318 | tx.send(()).unwrap(); |
319 | event_loop |
320 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
321 | .unwrap(); |
322 | |
323 | assert_eq!(got, (true, false)); |
324 | |
325 | // the sender is dropped |
326 | ::std::mem::drop(tx); |
327 | event_loop |
328 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
329 | .unwrap(); |
330 | |
331 | assert_eq!(got, (true, true)); |
332 | } |
333 | |
334 | #[test ] |
335 | fn basic_sync_channel() { |
336 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
337 | |
338 | let handle = event_loop.handle(); |
339 | |
340 | let (tx, rx) = sync_channel::<()>(2); |
341 | |
342 | let mut received = (0, false); |
343 | |
344 | let _channel_token = handle |
345 | .insert_source( |
346 | rx, |
347 | move |evt, &mut (), received: &mut (u32, bool)| match evt { |
348 | Event::Msg(()) => { |
349 | received.0 += 1; |
350 | } |
351 | Event::Closed => { |
352 | received.1 = true; |
353 | } |
354 | }, |
355 | ) |
356 | .unwrap(); |
357 | |
358 | // nothing is sent, nothing is received |
359 | event_loop |
360 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
361 | .unwrap(); |
362 | |
363 | assert_eq!(received.0, 0); |
364 | assert!(!received.1); |
365 | |
366 | // fill the channel |
367 | tx.send(()).unwrap(); |
368 | tx.send(()).unwrap(); |
369 | assert!(tx.try_send(()).is_err()); |
370 | |
371 | // empty it |
372 | event_loop |
373 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
374 | .unwrap(); |
375 | |
376 | assert_eq!(received.0, 2); |
377 | assert!(!received.1); |
378 | |
379 | // send a final message and drop the sender |
380 | tx.send(()).unwrap(); |
381 | std::mem::drop(tx); |
382 | |
383 | // final read of the channel |
384 | event_loop |
385 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
386 | .unwrap(); |
387 | |
388 | assert_eq!(received.0, 3); |
389 | assert!(received.1); |
390 | } |
391 | |
392 | #[test ] |
393 | fn test_more_than_1024() { |
394 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
395 | let handle = event_loop.handle(); |
396 | |
397 | let (tx, rx) = channel::<()>(); |
398 | let mut received = (0u32, false); |
399 | |
400 | handle |
401 | .insert_source( |
402 | rx, |
403 | move |evt, &mut (), received: &mut (u32, bool)| match evt { |
404 | Event::Msg(()) => received.0 += 1, |
405 | Event::Closed => received.1 = true, |
406 | }, |
407 | ) |
408 | .unwrap(); |
409 | |
410 | event_loop |
411 | .dispatch(Some(std::time::Duration::ZERO), &mut received) |
412 | .unwrap(); |
413 | |
414 | assert_eq!(received.0, 0); |
415 | assert!(!received.1); |
416 | |
417 | // Send 1025 elements into the channel. |
418 | for _ in 0..MAX_EVENTS_CHECK + 1 { |
419 | tx.send(()).unwrap(); |
420 | } |
421 | |
422 | event_loop |
423 | .dispatch(Some(std::time::Duration::ZERO), &mut received) |
424 | .unwrap(); |
425 | |
426 | assert_eq!(received.0, MAX_EVENTS_CHECK as u32); |
427 | assert!(!received.1); |
428 | |
429 | event_loop |
430 | .dispatch(Some(std::time::Duration::ZERO), &mut received) |
431 | .unwrap(); |
432 | |
433 | assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32); |
434 | assert!(!received.1); |
435 | } |
436 | } |
437 | |