1//! An MPSC channel whose receiving end is an event source
2//!
3//! Create a channel using [`channel()`](channel), which returns a
4//! [`Sender`] that can be cloned and sent accross threads if `T: Send`,
5//! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop).
6//! It will generate one event per message.
7//!
8//! A synchronous version of the channel is provided by [`sync_channel`], in which
9//! the [`SyncSender`] will block when the channel is full.
10
11use std::sync::mpsc;
12
13use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory};
14
15use super::ping::{make_ping, Ping, PingError, PingSource};
16
17/// The events generated by the channel event source
18#[derive(Debug)]
19pub enum Event<T> {
20 /// A message was received and is bundled here
21 Msg(T),
22 /// The channel was closed
23 ///
24 /// This means all the `Sender`s associated with this channel
25 /// have been dropped, no more messages will ever be received.
26 Closed,
27}
28
29/// The sender end of a channel
30///
31/// It can be cloned and sent accross threads (if `T` is).
32#[derive(Debug)]
33pub struct Sender<T> {
34 sender: mpsc::Sender<T>,
35 ping: Ping,
36}
37
38impl<T> Clone for Sender<T> {
39 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
40 fn clone(&self) -> Sender<T> {
41 Sender {
42 sender: self.sender.clone(),
43 ping: self.ping.clone(),
44 }
45 }
46}
47
48impl<T> Sender<T> {
49 /// Send a message to the channel
50 ///
51 /// This will wake the event loop and deliver an `Event::Msg` to
52 /// it containing the provided value.
53 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
54 self.sender.send(t).map(|()| self.ping.ping())
55 }
56}
57
58impl<T> Drop for Sender<T> {
59 fn drop(&mut self) {
60 // ping on drop, to notify about channel closure
61 self.ping.ping();
62 }
63}
64
65/// The sender end of a synchronous channel
66///
67/// It can be cloned and sent accross threads (if `T` is).
68#[derive(Debug)]
69pub struct SyncSender<T> {
70 sender: mpsc::SyncSender<T>,
71 ping: Ping,
72}
73
74impl<T> Clone for SyncSender<T> {
75 #[cfg_attr(feature = "nightly_coverage", coverage(off))]
76 fn clone(&self) -> SyncSender<T> {
77 SyncSender {
78 sender: self.sender.clone(),
79 ping: self.ping.clone(),
80 }
81 }
82}
83
84impl<T> SyncSender<T> {
85 /// Send a message to the synchronous channel
86 ///
87 /// This will wake the event loop and deliver an `Event::Msg` to
88 /// it containing the provided value. If the channel is full, this
89 /// function will block until the event loop empties it and it can
90 /// deliver the message.
91 ///
92 /// Due to the blocking behavior, this method should not be used on the
93 /// same thread as the one running the event loop, as it could cause deadlocks.
94 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
95 let ret = self.try_send(t);
96 match ret {
97 Ok(()) => Ok(()),
98 Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()),
99 Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)),
100 }
101 }
102
103 /// Send a message to the synchronous channel
104 ///
105 /// This will wake the event loop and deliver an `Event::Msg` to
106 /// it containing the provided value. If the channel is full, this
107 /// function will return an error, but the event loop will still be
108 /// signaled for readiness.
109 pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
110 let ret = self.sender.try_send(t);
111 if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret {
112 self.ping.ping();
113 }
114 ret
115 }
116}
117
118/// The receiving end of the channel
119///
120/// This is the event source to be inserted into your `EventLoop`.
121#[derive(Debug)]
122pub struct Channel<T> {
123 receiver: mpsc::Receiver<T>,
124 source: PingSource,
125}
126
127// This impl is safe because the Channel is only able to move around threads
128// when it is not inserted into an event loop. (Otherwise it is stuck into
129// a Source<_> and the internals of calloop, which are not Send).
130// At this point, the Arc<Receiver> has a count of 1, and it is obviously
131// safe to Send between threads.
132unsafe impl<T: Send> Send for Channel<T> {}
133
134impl<T> Channel<T> {
135 /// Proxy for [`mpsc::Receiver::recv`] to manually poll events.
136 ///
137 /// *Note*: Normally you would want to use the `Channel` by inserting
138 /// it into an event loop instead. Use this for example to immediately
139 /// dispatch pending events after creation.
140 pub fn recv(&self) -> Result<T, mpsc::RecvError> {
141 self.receiver.recv()
142 }
143
144 /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events.
145 ///
146 /// *Note*: Normally you would want to use the `Channel` by inserting
147 /// it into an event loop instead. Use this for example to immediately
148 /// dispatch pending events after creation.
149 pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
150 self.receiver.try_recv()
151 }
152}
153
154/// Create a new asynchronous channel
155pub fn channel<T>() -> (Sender<T>, Channel<T>) {
156 let (sender: Sender, receiver: Receiver) = mpsc::channel();
157 let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping.");
158 (Sender { sender, ping }, Channel { receiver, source })
159}
160
161/// Create a new synchronous, bounded channel
162pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) {
163 let (sender: SyncSender, receiver: Receiver) = mpsc::sync_channel(bound);
164 let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping.");
165 (SyncSender { sender, ping }, Channel { receiver, source })
166}
167
168impl<T> EventSource for Channel<T> {
169 type Event = Event<T>;
170 type Metadata = ();
171 type Ret = ();
172 type Error = ChannelError;
173
174 fn process_events<C>(
175 &mut self,
176 readiness: Readiness,
177 token: Token,
178 mut callback: C,
179 ) -> Result<PostAction, Self::Error>
180 where
181 C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
182 {
183 let receiver = &self.receiver;
184 self.source
185 .process_events(readiness, token, |(), &mut ()| loop {
186 match receiver.try_recv() {
187 Ok(val) => callback(Event::Msg(val), &mut ()),
188 Err(mpsc::TryRecvError::Empty) => break,
189 Err(mpsc::TryRecvError::Disconnected) => {
190 callback(Event::Closed, &mut ());
191 break;
192 }
193 }
194 })
195 .map_err(ChannelError)
196 }
197
198 fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> {
199 self.source.register(poll, token_factory)
200 }
201
202 fn reregister(
203 &mut self,
204 poll: &mut Poll,
205 token_factory: &mut TokenFactory,
206 ) -> crate::Result<()> {
207 self.source.reregister(poll, token_factory)
208 }
209
210 fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> {
211 self.source.unregister(poll)
212 }
213}
214
215/// An error arising from processing events for a channel.
216#[derive(thiserror::Error, Debug)]
217#[error(transparent)]
218pub struct ChannelError(PingError);
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn basic_channel() {
226 let mut event_loop = crate::EventLoop::try_new().unwrap();
227
228 let handle = event_loop.handle();
229
230 let (tx, rx) = channel::<()>();
231
232 // (got_msg, got_closed)
233 let mut got = (false, false);
234
235 let _channel_token = handle
236 .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt {
237 Event::Msg(()) => {
238 got.0 = true;
239 }
240 Event::Closed => {
241 got.1 = true;
242 }
243 })
244 .unwrap();
245
246 // nothing is sent, nothing is received
247 event_loop
248 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
249 .unwrap();
250
251 assert_eq!(got, (false, false));
252
253 // a message is send
254 tx.send(()).unwrap();
255 event_loop
256 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
257 .unwrap();
258
259 assert_eq!(got, (true, false));
260
261 // the sender is dropped
262 ::std::mem::drop(tx);
263 event_loop
264 .dispatch(Some(::std::time::Duration::ZERO), &mut got)
265 .unwrap();
266
267 assert_eq!(got, (true, true));
268 }
269
270 #[test]
271 fn basic_sync_channel() {
272 let mut event_loop = crate::EventLoop::try_new().unwrap();
273
274 let handle = event_loop.handle();
275
276 let (tx, rx) = sync_channel::<()>(2);
277
278 let mut received = (0, false);
279
280 let _channel_token = handle
281 .insert_source(
282 rx,
283 move |evt, &mut (), received: &mut (u32, bool)| match evt {
284 Event::Msg(()) => {
285 received.0 += 1;
286 }
287 Event::Closed => {
288 received.1 = true;
289 }
290 },
291 )
292 .unwrap();
293
294 // nothing is sent, nothing is received
295 event_loop
296 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
297 .unwrap();
298
299 assert_eq!(received.0, 0);
300 assert!(!received.1);
301
302 // fill the channel
303 tx.send(()).unwrap();
304 tx.send(()).unwrap();
305 assert!(tx.try_send(()).is_err());
306
307 // empty it
308 event_loop
309 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
310 .unwrap();
311
312 assert_eq!(received.0, 2);
313 assert!(!received.1);
314
315 // send a final message and drop the sender
316 tx.send(()).unwrap();
317 std::mem::drop(tx);
318
319 // final read of the channel
320 event_loop
321 .dispatch(Some(::std::time::Duration::ZERO), &mut received)
322 .unwrap();
323
324 assert_eq!(received.0, 3);
325 assert!(received.1);
326 }
327}
328