| 1 | //! An MPSC channel whose receiving end is an event source |
| 2 | //! |
| 3 | //! Create a channel using [`channel()`](channel), which returns a |
| 4 | //! [`Sender`] that can be cloned and sent accross threads if `T: Send`, |
| 5 | //! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop). |
| 6 | //! It will generate one event per message. |
| 7 | //! |
| 8 | //! A synchronous version of the channel is provided by [`sync_channel`], in which |
| 9 | //! the [`SyncSender`] will block when the channel is full. |
| 10 | |
| 11 | use std::sync::mpsc; |
| 12 | |
| 13 | use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; |
| 14 | |
| 15 | use super::ping::{make_ping, Ping, PingError, PingSource}; |
| 16 | |
| 17 | /// The events generated by the channel event source |
| 18 | #[derive (Debug)] |
| 19 | pub enum Event<T> { |
| 20 | /// A message was received and is bundled here |
| 21 | Msg(T), |
| 22 | /// The channel was closed |
| 23 | /// |
| 24 | /// This means all the `Sender`s associated with this channel |
| 25 | /// have been dropped, no more messages will ever be received. |
| 26 | Closed, |
| 27 | } |
| 28 | |
| 29 | /// The sender end of a channel |
| 30 | /// |
| 31 | /// It can be cloned and sent accross threads (if `T` is). |
| 32 | #[derive (Debug)] |
| 33 | pub struct Sender<T> { |
| 34 | sender: mpsc::Sender<T>, |
| 35 | ping: Ping, |
| 36 | } |
| 37 | |
| 38 | impl<T> Clone for Sender<T> { |
| 39 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
| 40 | fn clone(&self) -> Sender<T> { |
| 41 | Sender { |
| 42 | sender: self.sender.clone(), |
| 43 | ping: self.ping.clone(), |
| 44 | } |
| 45 | } |
| 46 | } |
| 47 | |
| 48 | impl<T> Sender<T> { |
| 49 | /// Send a message to the channel |
| 50 | /// |
| 51 | /// This will wake the event loop and deliver an `Event::Msg` to |
| 52 | /// it containing the provided value. |
| 53 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
| 54 | self.sender.send(t).map(|()| self.ping.ping()) |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | impl<T> Drop for Sender<T> { |
| 59 | fn drop(&mut self) { |
| 60 | // ping on drop, to notify about channel closure |
| 61 | self.ping.ping(); |
| 62 | } |
| 63 | } |
| 64 | |
| 65 | /// The sender end of a synchronous channel |
| 66 | /// |
| 67 | /// It can be cloned and sent accross threads (if `T` is). |
| 68 | #[derive (Debug)] |
| 69 | pub struct SyncSender<T> { |
| 70 | sender: mpsc::SyncSender<T>, |
| 71 | ping: Ping, |
| 72 | } |
| 73 | |
| 74 | impl<T> Clone for SyncSender<T> { |
| 75 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
| 76 | fn clone(&self) -> SyncSender<T> { |
| 77 | SyncSender { |
| 78 | sender: self.sender.clone(), |
| 79 | ping: self.ping.clone(), |
| 80 | } |
| 81 | } |
| 82 | } |
| 83 | |
| 84 | impl<T> SyncSender<T> { |
| 85 | /// Send a message to the synchronous channel |
| 86 | /// |
| 87 | /// This will wake the event loop and deliver an `Event::Msg` to |
| 88 | /// it containing the provided value. If the channel is full, this |
| 89 | /// function will block until the event loop empties it and it can |
| 90 | /// deliver the message. |
| 91 | /// |
| 92 | /// Due to the blocking behavior, this method should not be used on the |
| 93 | /// same thread as the one running the event loop, as it could cause deadlocks. |
| 94 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
| 95 | let ret = self.try_send(t); |
| 96 | match ret { |
| 97 | Ok(()) => Ok(()), |
| 98 | Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()), |
| 99 | Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)), |
| 100 | } |
| 101 | } |
| 102 | |
| 103 | /// Send a message to the synchronous channel |
| 104 | /// |
| 105 | /// This will wake the event loop and deliver an `Event::Msg` to |
| 106 | /// it containing the provided value. If the channel is full, this |
| 107 | /// function will return an error, but the event loop will still be |
| 108 | /// signaled for readiness. |
| 109 | pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> { |
| 110 | let ret = self.sender.try_send(t); |
| 111 | if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret { |
| 112 | self.ping.ping(); |
| 113 | } |
| 114 | ret |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | /// The receiving end of the channel |
| 119 | /// |
| 120 | /// This is the event source to be inserted into your `EventLoop`. |
| 121 | #[derive (Debug)] |
| 122 | pub struct Channel<T> { |
| 123 | receiver: mpsc::Receiver<T>, |
| 124 | source: PingSource, |
| 125 | } |
| 126 | |
| 127 | // This impl is safe because the Channel is only able to move around threads |
| 128 | // when it is not inserted into an event loop. (Otherwise it is stuck into |
| 129 | // a Source<_> and the internals of calloop, which are not Send). |
| 130 | // At this point, the Arc<Receiver> has a count of 1, and it is obviously |
| 131 | // safe to Send between threads. |
| 132 | unsafe impl<T: Send> Send for Channel<T> {} |
| 133 | |
| 134 | impl<T> Channel<T> { |
| 135 | /// Proxy for [`mpsc::Receiver::recv`] to manually poll events. |
| 136 | /// |
| 137 | /// *Note*: Normally you would want to use the `Channel` by inserting |
| 138 | /// it into an event loop instead. Use this for example to immediately |
| 139 | /// dispatch pending events after creation. |
| 140 | pub fn recv(&self) -> Result<T, mpsc::RecvError> { |
| 141 | self.receiver.recv() |
| 142 | } |
| 143 | |
| 144 | /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events. |
| 145 | /// |
| 146 | /// *Note*: Normally you would want to use the `Channel` by inserting |
| 147 | /// it into an event loop instead. Use this for example to immediately |
| 148 | /// dispatch pending events after creation. |
| 149 | pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { |
| 150 | self.receiver.try_recv() |
| 151 | } |
| 152 | } |
| 153 | |
| 154 | /// Create a new asynchronous channel |
| 155 | pub fn channel<T>() -> (Sender<T>, Channel<T>) { |
| 156 | let (sender: Sender, receiver: Receiver) = mpsc::channel(); |
| 157 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
| 158 | (Sender { sender, ping }, Channel { receiver, source }) |
| 159 | } |
| 160 | |
| 161 | /// Create a new synchronous, bounded channel |
| 162 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) { |
| 163 | let (sender: SyncSender, receiver: Receiver) = mpsc::sync_channel(bound); |
| 164 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
| 165 | (SyncSender { sender, ping }, Channel { receiver, source }) |
| 166 | } |
| 167 | |
| 168 | impl<T> EventSource for Channel<T> { |
| 169 | type Event = Event<T>; |
| 170 | type Metadata = (); |
| 171 | type Ret = (); |
| 172 | type Error = ChannelError; |
| 173 | |
| 174 | fn process_events<C>( |
| 175 | &mut self, |
| 176 | readiness: Readiness, |
| 177 | token: Token, |
| 178 | mut callback: C, |
| 179 | ) -> Result<PostAction, Self::Error> |
| 180 | where |
| 181 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
| 182 | { |
| 183 | let receiver = &self.receiver; |
| 184 | self.source |
| 185 | .process_events(readiness, token, |(), &mut ()| loop { |
| 186 | match receiver.try_recv() { |
| 187 | Ok(val) => callback(Event::Msg(val), &mut ()), |
| 188 | Err(mpsc::TryRecvError::Empty) => break, |
| 189 | Err(mpsc::TryRecvError::Disconnected) => { |
| 190 | callback(Event::Closed, &mut ()); |
| 191 | break; |
| 192 | } |
| 193 | } |
| 194 | }) |
| 195 | .map_err(ChannelError) |
| 196 | } |
| 197 | |
| 198 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
| 199 | self.source.register(poll, token_factory) |
| 200 | } |
| 201 | |
| 202 | fn reregister( |
| 203 | &mut self, |
| 204 | poll: &mut Poll, |
| 205 | token_factory: &mut TokenFactory, |
| 206 | ) -> crate::Result<()> { |
| 207 | self.source.reregister(poll, token_factory) |
| 208 | } |
| 209 | |
| 210 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
| 211 | self.source.unregister(poll) |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | /// An error arising from processing events for a channel. |
| 216 | #[derive (thiserror::Error, Debug)] |
| 217 | #[error(transparent)] |
| 218 | pub struct ChannelError(PingError); |
| 219 | |
| 220 | #[cfg (test)] |
| 221 | mod tests { |
| 222 | use super::*; |
| 223 | |
| 224 | #[test ] |
| 225 | fn basic_channel() { |
| 226 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
| 227 | |
| 228 | let handle = event_loop.handle(); |
| 229 | |
| 230 | let (tx, rx) = channel::<()>(); |
| 231 | |
| 232 | // (got_msg, got_closed) |
| 233 | let mut got = (false, false); |
| 234 | |
| 235 | let _channel_token = handle |
| 236 | .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt { |
| 237 | Event::Msg(()) => { |
| 238 | got.0 = true; |
| 239 | } |
| 240 | Event::Closed => { |
| 241 | got.1 = true; |
| 242 | } |
| 243 | }) |
| 244 | .unwrap(); |
| 245 | |
| 246 | // nothing is sent, nothing is received |
| 247 | event_loop |
| 248 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
| 249 | .unwrap(); |
| 250 | |
| 251 | assert_eq!(got, (false, false)); |
| 252 | |
| 253 | // a message is send |
| 254 | tx.send(()).unwrap(); |
| 255 | event_loop |
| 256 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
| 257 | .unwrap(); |
| 258 | |
| 259 | assert_eq!(got, (true, false)); |
| 260 | |
| 261 | // the sender is dropped |
| 262 | ::std::mem::drop(tx); |
| 263 | event_loop |
| 264 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
| 265 | .unwrap(); |
| 266 | |
| 267 | assert_eq!(got, (true, true)); |
| 268 | } |
| 269 | |
| 270 | #[test ] |
| 271 | fn basic_sync_channel() { |
| 272 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
| 273 | |
| 274 | let handle = event_loop.handle(); |
| 275 | |
| 276 | let (tx, rx) = sync_channel::<()>(2); |
| 277 | |
| 278 | let mut received = (0, false); |
| 279 | |
| 280 | let _channel_token = handle |
| 281 | .insert_source( |
| 282 | rx, |
| 283 | move |evt, &mut (), received: &mut (u32, bool)| match evt { |
| 284 | Event::Msg(()) => { |
| 285 | received.0 += 1; |
| 286 | } |
| 287 | Event::Closed => { |
| 288 | received.1 = true; |
| 289 | } |
| 290 | }, |
| 291 | ) |
| 292 | .unwrap(); |
| 293 | |
| 294 | // nothing is sent, nothing is received |
| 295 | event_loop |
| 296 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
| 297 | .unwrap(); |
| 298 | |
| 299 | assert_eq!(received.0, 0); |
| 300 | assert!(!received.1); |
| 301 | |
| 302 | // fill the channel |
| 303 | tx.send(()).unwrap(); |
| 304 | tx.send(()).unwrap(); |
| 305 | assert!(tx.try_send(()).is_err()); |
| 306 | |
| 307 | // empty it |
| 308 | event_loop |
| 309 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
| 310 | .unwrap(); |
| 311 | |
| 312 | assert_eq!(received.0, 2); |
| 313 | assert!(!received.1); |
| 314 | |
| 315 | // send a final message and drop the sender |
| 316 | tx.send(()).unwrap(); |
| 317 | std::mem::drop(tx); |
| 318 | |
| 319 | // final read of the channel |
| 320 | event_loop |
| 321 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
| 322 | .unwrap(); |
| 323 | |
| 324 | assert_eq!(received.0, 3); |
| 325 | assert!(received.1); |
| 326 | } |
| 327 | } |
| 328 | |