| 1 | //! An MPSC channel whose receiving end is an event source |
| 2 | //! |
| 3 | //! Create a channel using [`channel()`](channel), which returns a |
| 4 | //! [`Sender`] that can be cloned and sent accross threads if `T: Send`, |
| 5 | //! and a [`Channel`] that can be inserted into an [`EventLoop`](crate::EventLoop). |
| 6 | //! It will generate one event per message. |
| 7 | //! |
| 8 | //! A synchronous version of the channel is provided by [`sync_channel`], in which |
| 9 | //! the [`SyncSender`] will block when the channel is full. |
| 10 | |
| 11 | use std::cmp; |
| 12 | use std::fmt; |
| 13 | use std::sync::mpsc; |
| 14 | |
| 15 | use crate::{EventSource, Poll, PostAction, Readiness, Token, TokenFactory}; |
| 16 | |
| 17 | use super::ping::{make_ping, Ping, PingError, PingSource}; |
| 18 | |
| 19 | const MAX_EVENTS_CHECK: usize = 1024; |
| 20 | |
| 21 | /// The events generated by the channel event source |
| 22 | #[derive (Debug)] |
| 23 | pub enum Event<T> { |
| 24 | /// A message was received and is bundled here |
| 25 | Msg(T), |
| 26 | /// The channel was closed |
| 27 | /// |
| 28 | /// This means all the `Sender`s associated with this channel |
| 29 | /// have been dropped, no more messages will ever be received. |
| 30 | Closed, |
| 31 | } |
| 32 | |
| 33 | /// The sender end of a channel |
| 34 | /// |
| 35 | /// It can be cloned and sent accross threads (if `T` is). |
| 36 | #[derive (Debug)] |
| 37 | pub struct Sender<T> { |
| 38 | sender: mpsc::Sender<T>, |
| 39 | ping: Ping, |
| 40 | } |
| 41 | |
| 42 | impl<T> Clone for Sender<T> { |
| 43 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
| 44 | fn clone(&self) -> Sender<T> { |
| 45 | Sender { |
| 46 | sender: self.sender.clone(), |
| 47 | ping: self.ping.clone(), |
| 48 | } |
| 49 | } |
| 50 | } |
| 51 | |
| 52 | impl<T> Sender<T> { |
| 53 | /// Send a message to the channel |
| 54 | /// |
| 55 | /// This will wake the event loop and deliver an `Event::Msg` to |
| 56 | /// it containing the provided value. |
| 57 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
| 58 | self.sender.send(t).map(|()| self.ping.ping()) |
| 59 | } |
| 60 | } |
| 61 | |
| 62 | impl<T> Drop for Sender<T> { |
| 63 | fn drop(&mut self) { |
| 64 | // ping on drop, to notify about channel closure |
| 65 | self.ping.ping(); |
| 66 | } |
| 67 | } |
| 68 | |
| 69 | /// The sender end of a synchronous channel |
| 70 | /// |
| 71 | /// It can be cloned and sent accross threads (if `T` is). |
| 72 | #[derive (Debug)] |
| 73 | pub struct SyncSender<T> { |
| 74 | sender: mpsc::SyncSender<T>, |
| 75 | ping: Ping, |
| 76 | } |
| 77 | |
| 78 | impl<T> Clone for SyncSender<T> { |
| 79 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
| 80 | fn clone(&self) -> SyncSender<T> { |
| 81 | SyncSender { |
| 82 | sender: self.sender.clone(), |
| 83 | ping: self.ping.clone(), |
| 84 | } |
| 85 | } |
| 86 | } |
| 87 | |
| 88 | impl<T> SyncSender<T> { |
| 89 | /// Send a message to the synchronous channel |
| 90 | /// |
| 91 | /// This will wake the event loop and deliver an `Event::Msg` to |
| 92 | /// it containing the provided value. If the channel is full, this |
| 93 | /// function will block until the event loop empties it and it can |
| 94 | /// deliver the message. |
| 95 | /// |
| 96 | /// Due to the blocking behavior, this method should not be used on the |
| 97 | /// same thread as the one running the event loop, as it could cause deadlocks. |
| 98 | pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> { |
| 99 | let ret = self.try_send(t); |
| 100 | match ret { |
| 101 | Ok(()) => Ok(()), |
| 102 | Err(mpsc::TrySendError::Full(t)) => self.sender.send(t).map(|()| self.ping.ping()), |
| 103 | Err(mpsc::TrySendError::Disconnected(t)) => Err(mpsc::SendError(t)), |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | /// Send a message to the synchronous channel |
| 108 | /// |
| 109 | /// This will wake the event loop and deliver an `Event::Msg` to |
| 110 | /// it containing the provided value. If the channel is full, this |
| 111 | /// function will return an error, but the event loop will still be |
| 112 | /// signaled for readiness. |
| 113 | pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> { |
| 114 | let ret = self.sender.try_send(t); |
| 115 | if let Ok(()) | Err(mpsc::TrySendError::Full(_)) = ret { |
| 116 | self.ping.ping(); |
| 117 | } |
| 118 | ret |
| 119 | } |
| 120 | } |
| 121 | |
| 122 | /// The receiving end of the channel |
| 123 | /// |
| 124 | /// This is the event source to be inserted into your `EventLoop`. |
| 125 | #[derive (Debug)] |
| 126 | pub struct Channel<T> { |
| 127 | receiver: mpsc::Receiver<T>, |
| 128 | source: PingSource, |
| 129 | ping: Ping, |
| 130 | capacity: usize, |
| 131 | } |
| 132 | |
| 133 | // This impl is safe because the Channel is only able to move around threads |
| 134 | // when it is not inserted into an event loop. (Otherwise it is stuck into |
| 135 | // a Source<_> and the internals of calloop, which are not Send). |
| 136 | // At this point, the Arc<Receiver> has a count of 1, and it is obviously |
| 137 | // safe to Send between threads. |
| 138 | unsafe impl<T: Send> Send for Channel<T> {} |
| 139 | |
| 140 | impl<T> Channel<T> { |
| 141 | /// Proxy for [`mpsc::Receiver::recv`] to manually poll events. |
| 142 | /// |
| 143 | /// *Note*: Normally you would want to use the `Channel` by inserting |
| 144 | /// it into an event loop instead. Use this for example to immediately |
| 145 | /// dispatch pending events after creation. |
| 146 | pub fn recv(&self) -> Result<T, mpsc::RecvError> { |
| 147 | self.receiver.recv() |
| 148 | } |
| 149 | |
| 150 | /// Proxy for [`mpsc::Receiver::try_recv`] to manually poll events. |
| 151 | /// |
| 152 | /// *Note*: Normally you would want to use the `Channel` by inserting |
| 153 | /// it into an event loop instead. Use this for example to immediately |
| 154 | /// dispatch pending events after creation. |
| 155 | pub fn try_recv(&self) -> Result<T, mpsc::TryRecvError> { |
| 156 | self.receiver.try_recv() |
| 157 | } |
| 158 | } |
| 159 | |
| 160 | /// Create a new asynchronous channel |
| 161 | pub fn channel<T>() -> (Sender<T>, Channel<T>) { |
| 162 | let (sender: Sender, receiver: Receiver) = mpsc::channel(); |
| 163 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
| 164 | ( |
| 165 | Sender { |
| 166 | sender, |
| 167 | ping: ping.clone(), |
| 168 | }, |
| 169 | Channel { |
| 170 | receiver, |
| 171 | ping, |
| 172 | source, |
| 173 | capacity: usize::MAX, |
| 174 | }, |
| 175 | ) |
| 176 | } |
| 177 | |
| 178 | /// Create a new synchronous, bounded channel |
| 179 | pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Channel<T>) { |
| 180 | let (sender: SyncSender, receiver: Receiver) = mpsc::sync_channel(bound); |
| 181 | let (ping: Ping, source: PingSource) = make_ping().expect(msg:"Failed to create a Ping." ); |
| 182 | ( |
| 183 | SyncSender { |
| 184 | sender, |
| 185 | ping: ping.clone(), |
| 186 | }, |
| 187 | Channel { |
| 188 | receiver, |
| 189 | source, |
| 190 | ping, |
| 191 | capacity: bound, |
| 192 | }, |
| 193 | ) |
| 194 | } |
| 195 | |
| 196 | impl<T> EventSource for Channel<T> { |
| 197 | type Event = Event<T>; |
| 198 | type Metadata = (); |
| 199 | type Ret = (); |
| 200 | type Error = ChannelError; |
| 201 | |
| 202 | fn process_events<C>( |
| 203 | &mut self, |
| 204 | readiness: Readiness, |
| 205 | token: Token, |
| 206 | mut callback: C, |
| 207 | ) -> Result<PostAction, Self::Error> |
| 208 | where |
| 209 | C: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret, |
| 210 | { |
| 211 | let receiver = &self.receiver; |
| 212 | let capacity = self.capacity; |
| 213 | let mut clear_readiness = false; |
| 214 | let mut disconnected = false; |
| 215 | |
| 216 | let action = self |
| 217 | .source |
| 218 | .process_events(readiness, token, |(), &mut ()| { |
| 219 | // Limit the number of elements we process at a time to the channel's capacity, or 1024. |
| 220 | let max = cmp::min(capacity.saturating_add(1), MAX_EVENTS_CHECK); |
| 221 | for _ in 0..max { |
| 222 | match receiver.try_recv() { |
| 223 | Ok(val) => callback(Event::Msg(val), &mut ()), |
| 224 | Err(mpsc::TryRecvError::Empty) => { |
| 225 | clear_readiness = true; |
| 226 | break; |
| 227 | } |
| 228 | Err(mpsc::TryRecvError::Disconnected) => { |
| 229 | callback(Event::Closed, &mut ()); |
| 230 | disconnected = true; |
| 231 | break; |
| 232 | } |
| 233 | } |
| 234 | } |
| 235 | }) |
| 236 | .map_err(ChannelError)?; |
| 237 | |
| 238 | if disconnected { |
| 239 | Ok(PostAction::Remove) |
| 240 | } else if clear_readiness { |
| 241 | Ok(action) |
| 242 | } else { |
| 243 | // Re-notify the ping source so we can try again. |
| 244 | self.ping.ping(); |
| 245 | Ok(PostAction::Continue) |
| 246 | } |
| 247 | } |
| 248 | |
| 249 | fn register(&mut self, poll: &mut Poll, token_factory: &mut TokenFactory) -> crate::Result<()> { |
| 250 | self.source.register(poll, token_factory) |
| 251 | } |
| 252 | |
| 253 | fn reregister( |
| 254 | &mut self, |
| 255 | poll: &mut Poll, |
| 256 | token_factory: &mut TokenFactory, |
| 257 | ) -> crate::Result<()> { |
| 258 | self.source.reregister(poll, token_factory) |
| 259 | } |
| 260 | |
| 261 | fn unregister(&mut self, poll: &mut Poll) -> crate::Result<()> { |
| 262 | self.source.unregister(poll) |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | /// An error arising from processing events for a channel. |
| 267 | #[derive (Debug)] |
| 268 | pub struct ChannelError(PingError); |
| 269 | |
| 270 | impl fmt::Display for ChannelError { |
| 271 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
| 272 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 273 | fmt::Display::fmt(&self.0, f) |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | impl std::error::Error for ChannelError { |
| 278 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
| 279 | fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { |
| 280 | Some(&self.0) |
| 281 | } |
| 282 | } |
| 283 | |
| 284 | #[cfg (test)] |
| 285 | mod tests { |
| 286 | use super::*; |
| 287 | |
| 288 | #[test ] |
| 289 | fn basic_channel() { |
| 290 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
| 291 | |
| 292 | let handle = event_loop.handle(); |
| 293 | |
| 294 | let (tx, rx) = channel::<()>(); |
| 295 | |
| 296 | // (got_msg, got_closed) |
| 297 | let mut got = (false, false); |
| 298 | |
| 299 | let _channel_token = handle |
| 300 | .insert_source(rx, move |evt, &mut (), got: &mut (bool, bool)| match evt { |
| 301 | Event::Msg(()) => { |
| 302 | got.0 = true; |
| 303 | } |
| 304 | Event::Closed => { |
| 305 | got.1 = true; |
| 306 | } |
| 307 | }) |
| 308 | .unwrap(); |
| 309 | |
| 310 | // nothing is sent, nothing is received |
| 311 | event_loop |
| 312 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
| 313 | .unwrap(); |
| 314 | |
| 315 | assert_eq!(got, (false, false)); |
| 316 | |
| 317 | // a message is send |
| 318 | tx.send(()).unwrap(); |
| 319 | event_loop |
| 320 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
| 321 | .unwrap(); |
| 322 | |
| 323 | assert_eq!(got, (true, false)); |
| 324 | |
| 325 | // the sender is dropped |
| 326 | ::std::mem::drop(tx); |
| 327 | event_loop |
| 328 | .dispatch(Some(::std::time::Duration::ZERO), &mut got) |
| 329 | .unwrap(); |
| 330 | |
| 331 | assert_eq!(got, (true, true)); |
| 332 | } |
| 333 | |
| 334 | #[test ] |
| 335 | fn basic_sync_channel() { |
| 336 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
| 337 | |
| 338 | let handle = event_loop.handle(); |
| 339 | |
| 340 | let (tx, rx) = sync_channel::<()>(2); |
| 341 | |
| 342 | let mut received = (0, false); |
| 343 | |
| 344 | let _channel_token = handle |
| 345 | .insert_source( |
| 346 | rx, |
| 347 | move |evt, &mut (), received: &mut (u32, bool)| match evt { |
| 348 | Event::Msg(()) => { |
| 349 | received.0 += 1; |
| 350 | } |
| 351 | Event::Closed => { |
| 352 | received.1 = true; |
| 353 | } |
| 354 | }, |
| 355 | ) |
| 356 | .unwrap(); |
| 357 | |
| 358 | // nothing is sent, nothing is received |
| 359 | event_loop |
| 360 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
| 361 | .unwrap(); |
| 362 | |
| 363 | assert_eq!(received.0, 0); |
| 364 | assert!(!received.1); |
| 365 | |
| 366 | // fill the channel |
| 367 | tx.send(()).unwrap(); |
| 368 | tx.send(()).unwrap(); |
| 369 | assert!(tx.try_send(()).is_err()); |
| 370 | |
| 371 | // empty it |
| 372 | event_loop |
| 373 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
| 374 | .unwrap(); |
| 375 | |
| 376 | assert_eq!(received.0, 2); |
| 377 | assert!(!received.1); |
| 378 | |
| 379 | // send a final message and drop the sender |
| 380 | tx.send(()).unwrap(); |
| 381 | std::mem::drop(tx); |
| 382 | |
| 383 | // final read of the channel |
| 384 | event_loop |
| 385 | .dispatch(Some(::std::time::Duration::ZERO), &mut received) |
| 386 | .unwrap(); |
| 387 | |
| 388 | assert_eq!(received.0, 3); |
| 389 | assert!(received.1); |
| 390 | } |
| 391 | |
| 392 | #[test ] |
| 393 | fn test_more_than_1024() { |
| 394 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
| 395 | let handle = event_loop.handle(); |
| 396 | |
| 397 | let (tx, rx) = channel::<()>(); |
| 398 | let mut received = (0u32, false); |
| 399 | |
| 400 | handle |
| 401 | .insert_source( |
| 402 | rx, |
| 403 | move |evt, &mut (), received: &mut (u32, bool)| match evt { |
| 404 | Event::Msg(()) => received.0 += 1, |
| 405 | Event::Closed => received.1 = true, |
| 406 | }, |
| 407 | ) |
| 408 | .unwrap(); |
| 409 | |
| 410 | event_loop |
| 411 | .dispatch(Some(std::time::Duration::ZERO), &mut received) |
| 412 | .unwrap(); |
| 413 | |
| 414 | assert_eq!(received.0, 0); |
| 415 | assert!(!received.1); |
| 416 | |
| 417 | // Send 1025 elements into the channel. |
| 418 | for _ in 0..MAX_EVENTS_CHECK + 1 { |
| 419 | tx.send(()).unwrap(); |
| 420 | } |
| 421 | |
| 422 | event_loop |
| 423 | .dispatch(Some(std::time::Duration::ZERO), &mut received) |
| 424 | .unwrap(); |
| 425 | |
| 426 | assert_eq!(received.0, MAX_EVENTS_CHECK as u32); |
| 427 | assert!(!received.1); |
| 428 | |
| 429 | event_loop |
| 430 | .dispatch(Some(std::time::Duration::ZERO), &mut received) |
| 431 | .unwrap(); |
| 432 | |
| 433 | assert_eq!(received.0, (MAX_EVENTS_CHECK + 1) as u32); |
| 434 | assert!(!received.1); |
| 435 | } |
| 436 | } |
| 437 | |