1 | //! # Flume |
2 | //! |
3 | //! A blazingly fast multi-producer, multi-consumer channel. |
4 | //! |
5 | //! *"Do not communicate by sharing memory; instead, share memory by communicating."* |
6 | //! |
7 | //! ## Why Flume? |
8 | //! |
9 | //! - **Featureful**: Unbounded, bounded and rendezvous queues |
10 | //! - **Fast**: Always faster than `std::sync::mpsc` and sometimes `crossbeam-channel` |
11 | //! - **Safe**: No `unsafe` code anywhere in the codebase! |
12 | //! - **Flexible**: `Sender` and `Receiver` both implement `Send + Sync + Clone` |
13 | //! - **Familiar**: Drop-in replacement for `std::sync::mpsc` |
14 | //! - **Capable**: Additional features like MPMC support and send timeouts/deadlines |
15 | //! - **Simple**: Few dependencies, minimal codebase, fast to compile |
16 | //! - **Asynchronous**: `async` support, including mix 'n match with sync code |
17 | //! - **Ergonomic**: Powerful `select`-like interface |
18 | //! |
19 | //! ## Example |
20 | //! |
21 | //! ``` |
22 | //! let (tx, rx) = flume::unbounded(); |
23 | //! |
24 | //! tx.send(42).unwrap(); |
25 | //! assert_eq!(rx.recv().unwrap(), 42); |
26 | //! ``` |
27 | |
28 | #![deny (missing_docs)] |
29 | |
30 | #[cfg (feature = "select" )] |
31 | pub mod select; |
32 | #[cfg (feature = "async" )] |
33 | pub mod r#async; |
34 | |
35 | mod signal; |
36 | |
37 | // Reexports |
38 | #[cfg (feature = "select" )] |
39 | pub use select::Selector; |
40 | |
41 | use std::{ |
42 | collections::VecDeque, |
43 | sync::{Arc, atomic::{AtomicUsize, AtomicBool, Ordering}, Weak}, |
44 | time::{Duration, Instant}, |
45 | marker::PhantomData, |
46 | thread, |
47 | fmt, |
48 | }; |
49 | |
50 | #[cfg (feature = "spin" )] |
51 | use spin1::{Mutex as Spinlock, MutexGuard as SpinlockGuard}; |
52 | use crate::signal::{Signal, SyncSignal}; |
53 | |
54 | /// An error that may be emitted when attempting to send a value into a channel on a sender when |
55 | /// all receivers are dropped. |
56 | #[derive (Copy, Clone, PartialEq, Eq)] |
57 | pub struct SendError<T>(pub T); |
58 | |
59 | impl<T> SendError<T> { |
60 | /// Consume the error, yielding the message that failed to send. |
61 | pub fn into_inner(self) -> T { self.0 } |
62 | } |
63 | |
64 | impl<T> fmt::Debug for SendError<T> { |
65 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
66 | "SendError(..)" .fmt(f) |
67 | } |
68 | } |
69 | |
70 | impl<T> fmt::Display for SendError<T> { |
71 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
72 | "sending on a closed channel" .fmt(f) |
73 | } |
74 | } |
75 | |
76 | impl<T> std::error::Error for SendError<T> {} |
77 | |
78 | /// An error that may be emitted when attempting to send a value into a channel on a sender when |
79 | /// the channel is full or all receivers are dropped. |
80 | #[derive (Copy, Clone, PartialEq, Eq)] |
81 | pub enum TrySendError<T> { |
82 | /// The channel the message is sent on has a finite capacity and was full when the send was attempted. |
83 | Full(T), |
84 | /// All channel receivers were dropped and so the message has nobody to receive it. |
85 | Disconnected(T), |
86 | } |
87 | |
88 | impl<T> TrySendError<T> { |
89 | /// Consume the error, yielding the message that failed to send. |
90 | pub fn into_inner(self) -> T { |
91 | match self { |
92 | Self::Full(msg: T) | Self::Disconnected(msg: T) => msg, |
93 | } |
94 | } |
95 | } |
96 | |
97 | impl<T> fmt::Debug for TrySendError<T> { |
98 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
99 | match *self { |
100 | TrySendError::Full(..) => "Full(..)" .fmt(f), |
101 | TrySendError::Disconnected(..) => "Disconnected(..)" .fmt(f), |
102 | } |
103 | } |
104 | } |
105 | |
106 | impl<T> fmt::Display for TrySendError<T> { |
107 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
108 | match self { |
109 | TrySendError::Full(..) => "sending on a full channel" .fmt(f), |
110 | TrySendError::Disconnected(..) => "sending on a closed channel" .fmt(f), |
111 | } |
112 | } |
113 | } |
114 | |
115 | impl<T> std::error::Error for TrySendError<T> {} |
116 | |
117 | impl<T> From<SendError<T>> for TrySendError<T> { |
118 | fn from(err: SendError<T>) -> Self { |
119 | match err { |
120 | SendError(item: T) => Self::Disconnected(item), |
121 | } |
122 | } |
123 | } |
124 | |
125 | /// An error that may be emitted when sending a value into a channel on a sender with a timeout when |
126 | /// the send operation times out or all receivers are dropped. |
127 | #[derive (Copy, Clone, PartialEq, Eq)] |
128 | pub enum SendTimeoutError<T> { |
129 | /// A timeout occurred when attempting to send the message. |
130 | Timeout(T), |
131 | /// All channel receivers were dropped and so the message has nobody to receive it. |
132 | Disconnected(T), |
133 | } |
134 | |
135 | impl<T> SendTimeoutError<T> { |
136 | /// Consume the error, yielding the message that failed to send. |
137 | pub fn into_inner(self) -> T { |
138 | match self { |
139 | Self::Timeout(msg: T) | Self::Disconnected(msg: T) => msg, |
140 | } |
141 | } |
142 | } |
143 | |
144 | impl<T> fmt::Debug for SendTimeoutError<T> { |
145 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
146 | "SendTimeoutError(..)" .fmt(f) |
147 | } |
148 | } |
149 | |
150 | impl<T> fmt::Display for SendTimeoutError<T> { |
151 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
152 | match self { |
153 | SendTimeoutError::Timeout(..) => "timed out sending on a full channel" .fmt(f), |
154 | SendTimeoutError::Disconnected(..) => "sending on a closed channel" .fmt(f), |
155 | } |
156 | } |
157 | } |
158 | |
159 | impl<T> std::error::Error for SendTimeoutError<T> {} |
160 | |
161 | impl<T> From<SendError<T>> for SendTimeoutError<T> { |
162 | fn from(err: SendError<T>) -> Self { |
163 | match err { |
164 | SendError(item: T) => Self::Disconnected(item), |
165 | } |
166 | } |
167 | } |
168 | |
169 | enum TrySendTimeoutError<T> { |
170 | Full(T), |
171 | Disconnected(T), |
172 | Timeout(T), |
173 | } |
174 | |
175 | /// An error that may be emitted when attempting to wait for a value on a receiver when all senders |
176 | /// are dropped and there are no more messages in the channel. |
177 | #[derive (Copy, Clone, Debug, PartialEq, Eq)] |
178 | pub enum RecvError { |
179 | /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. |
180 | Disconnected, |
181 | } |
182 | |
183 | impl fmt::Display for RecvError { |
184 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
185 | match self { |
186 | RecvError::Disconnected => "receiving on a closed channel" .fmt(f), |
187 | } |
188 | } |
189 | } |
190 | |
191 | impl std::error::Error for RecvError {} |
192 | |
193 | /// An error that may be emitted when attempting to fetch a value on a receiver when there are no |
194 | /// messages in the channel. If there are no messages in the channel and all senders are dropped, |
195 | /// then `TryRecvError::Disconnected` will be returned. |
196 | #[derive (Copy, Clone, Debug, PartialEq, Eq)] |
197 | pub enum TryRecvError { |
198 | /// The channel was empty when the receive was attempted. |
199 | Empty, |
200 | /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. |
201 | Disconnected, |
202 | } |
203 | |
204 | impl fmt::Display for TryRecvError { |
205 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
206 | match self { |
207 | TryRecvError::Empty => "receiving on an empty channel" .fmt(f), |
208 | TryRecvError::Disconnected => "channel is empty and closed" .fmt(f), |
209 | } |
210 | } |
211 | } |
212 | |
213 | impl std::error::Error for TryRecvError {} |
214 | |
215 | impl From<RecvError> for TryRecvError { |
216 | fn from(err: RecvError) -> Self { |
217 | match err { |
218 | RecvError::Disconnected => Self::Disconnected, |
219 | } |
220 | } |
221 | } |
222 | |
223 | /// An error that may be emitted when attempting to wait for a value on a receiver with a timeout |
224 | /// when the receive operation times out or all senders are dropped and there are no values left |
225 | /// in the channel. |
226 | #[derive (Copy, Clone, Debug, PartialEq, Eq)] |
227 | pub enum RecvTimeoutError { |
228 | /// A timeout occurred when attempting to receive a message. |
229 | Timeout, |
230 | /// All senders were dropped and no messages are waiting in the channel, so no further messages can be received. |
231 | Disconnected, |
232 | } |
233 | |
234 | impl fmt::Display for RecvTimeoutError { |
235 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
236 | match self { |
237 | RecvTimeoutError::Timeout => "timed out waiting on a channel" .fmt(f), |
238 | RecvTimeoutError::Disconnected => "channel is empty and closed" .fmt(f), |
239 | } |
240 | } |
241 | } |
242 | |
243 | impl std::error::Error for RecvTimeoutError {} |
244 | |
245 | impl From<RecvError> for RecvTimeoutError { |
246 | fn from(err: RecvError) -> Self { |
247 | match err { |
248 | RecvError::Disconnected => Self::Disconnected, |
249 | } |
250 | } |
251 | } |
252 | |
253 | enum TryRecvTimeoutError { |
254 | Empty, |
255 | Timeout, |
256 | Disconnected, |
257 | } |
258 | |
259 | // TODO: Investigate some sort of invalidation flag for timeouts |
260 | #[cfg (feature = "spin" )] |
261 | struct Hook<T, S: ?Sized>(Option<Spinlock<Option<T>>>, S); |
262 | |
263 | #[cfg (not(feature = "spin" ))] |
264 | struct Hook<T, S: ?Sized>(Option<Mutex<Option<T>>>, S); |
265 | |
266 | #[cfg (feature = "spin" )] |
267 | impl<T, S: ?Sized + Signal> Hook<T, S> { |
268 | pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> |
269 | where |
270 | S: Sized, |
271 | { |
272 | Arc::new(Self(Some(Spinlock::new(msg)), signal)) |
273 | } |
274 | |
275 | fn lock(&self) -> Option<SpinlockGuard<'_, Option<T>>> { |
276 | self.0.as_ref().map(|s| s.lock()) |
277 | } |
278 | } |
279 | |
280 | #[cfg (not(feature = "spin" ))] |
281 | impl<T, S: ?Sized + Signal> Hook<T, S> { |
282 | pub fn slot(msg: Option<T>, signal: S) -> Arc<Self> |
283 | where |
284 | S: Sized, |
285 | { |
286 | Arc::new(Self(Some(Mutex::new(msg)), signal)) |
287 | } |
288 | |
289 | fn lock(&self) -> Option<MutexGuard<'_, Option<T>>> { |
290 | self.0.as_ref().map(|s: &Mutex| s.lock().unwrap()) |
291 | } |
292 | } |
293 | |
294 | impl<T, S: ?Sized + Signal> Hook<T, S> { |
295 | pub fn fire_recv(&self) -> (T, &S) { |
296 | let msg = self.lock().unwrap().take().unwrap(); |
297 | (msg, self.signal()) |
298 | } |
299 | |
300 | pub fn fire_send(&self, msg: T) -> (Option<T>, &S) { |
301 | let ret = match self.lock() { |
302 | Some(mut lock) => { |
303 | *lock = Some(msg); |
304 | None |
305 | } |
306 | None => Some(msg), |
307 | }; |
308 | (ret, self.signal()) |
309 | } |
310 | |
311 | pub fn is_empty(&self) -> bool { |
312 | self.lock().map(|s| s.is_none()).unwrap_or(true) |
313 | } |
314 | |
315 | pub fn try_take(&self) -> Option<T> { |
316 | self.lock().unwrap().take() |
317 | } |
318 | |
319 | pub fn trigger(signal: S) -> Arc<Self> |
320 | where |
321 | S: Sized, |
322 | { |
323 | Arc::new(Self(None, signal)) |
324 | } |
325 | |
326 | pub fn signal(&self) -> &S { |
327 | &self.1 |
328 | } |
329 | |
330 | pub fn fire_nothing(&self) -> bool { |
331 | self.signal().fire() |
332 | } |
333 | } |
334 | |
335 | impl<T> Hook<T, SyncSignal> { |
336 | pub fn wait_recv(&self, abort: &AtomicBool) -> Option<T> { |
337 | loop { |
338 | let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg |
339 | let msg = self.lock().unwrap().take(); |
340 | if let Some(msg) = msg { |
341 | break Some(msg); |
342 | } else if disconnected { |
343 | break None; |
344 | } else { |
345 | self.signal().wait() |
346 | } |
347 | } |
348 | } |
349 | |
350 | // Err(true) if timeout |
351 | pub fn wait_deadline_recv(&self, abort: &AtomicBool, deadline: Instant) -> Result<T, bool> { |
352 | loop { |
353 | let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg |
354 | let msg = self.lock().unwrap().take(); |
355 | if let Some(msg) = msg { |
356 | break Ok(msg); |
357 | } else if disconnected { |
358 | break Err(false); |
359 | } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) { |
360 | self.signal().wait_timeout(dur); |
361 | } else { |
362 | break Err(true); |
363 | } |
364 | } |
365 | } |
366 | |
367 | pub fn wait_send(&self, abort: &AtomicBool) { |
368 | loop { |
369 | let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg |
370 | if disconnected || self.lock().unwrap().is_none() { |
371 | break; |
372 | } |
373 | |
374 | self.signal().wait(); |
375 | } |
376 | } |
377 | |
378 | // Err(true) if timeout |
379 | pub fn wait_deadline_send(&self, abort: &AtomicBool, deadline: Instant) -> Result<(), bool> { |
380 | loop { |
381 | let disconnected = abort.load(Ordering::SeqCst); // Check disconnect *before* msg |
382 | if self.lock().unwrap().is_none() { |
383 | break Ok(()); |
384 | } else if disconnected { |
385 | break Err(false); |
386 | } else if let Some(dur) = deadline.checked_duration_since(Instant::now()) { |
387 | self.signal().wait_timeout(dur); |
388 | } else { |
389 | break Err(true); |
390 | } |
391 | } |
392 | } |
393 | } |
394 | |
395 | #[cfg (feature = "spin" )] |
396 | #[inline ] |
397 | fn wait_lock<T>(lock: &Spinlock<T>) -> SpinlockGuard<T> { |
398 | let mut i = 4; |
399 | loop { |
400 | for _ in 0..10 { |
401 | if let Some(guard) = lock.try_lock() { |
402 | return guard; |
403 | } |
404 | thread::yield_now(); |
405 | } |
406 | // Sleep for at most ~1 ms |
407 | thread::sleep(Duration::from_nanos(1 << i.min(20))); |
408 | i += 1; |
409 | } |
410 | } |
411 | |
412 | #[cfg (not(feature = "spin" ))] |
413 | #[inline ] |
414 | fn wait_lock<'a, T>(lock: &'a Mutex<T>) -> MutexGuard<'a, T> { |
415 | lock.lock().unwrap() |
416 | } |
417 | |
418 | #[cfg (not(feature = "spin" ))] |
419 | use std::sync::{Mutex, MutexGuard}; |
420 | |
421 | #[cfg (feature = "spin" )] |
422 | type ChanLock<T> = Spinlock<T>; |
423 | #[cfg (not(feature = "spin" ))] |
424 | type ChanLock<T> = Mutex<T>; |
425 | |
426 | |
427 | type SignalVec<T> = VecDeque<Arc<Hook<T, dyn signal::Signal>>>; |
428 | struct Chan<T> { |
429 | sending: Option<(usize, SignalVec<T>)>, |
430 | queue: VecDeque<T>, |
431 | waiting: SignalVec<T>, |
432 | } |
433 | |
434 | impl<T> Chan<T> { |
435 | fn pull_pending(&mut self, pull_extra: bool) { |
436 | if let Some((cap: &mut usize, sending: &mut VecDeque>>)) = &mut self.sending { |
437 | let effective_cap: usize = *cap + pull_extra as usize; |
438 | |
439 | while self.queue.len() < effective_cap { |
440 | if let Some(s: Arc>) = sending.pop_front() { |
441 | let (msg: T, signal: &dyn Signal) = s.fire_recv(); |
442 | signal.fire(); |
443 | self.queue.push_back(msg); |
444 | } else { |
445 | break; |
446 | } |
447 | } |
448 | } |
449 | } |
450 | |
451 | fn try_wake_receiver_if_pending(&mut self) { |
452 | if !self.queue.is_empty() { |
453 | while Some(false) == self.waiting.pop_front().map(|s: Arc>| s.fire_nothing()) {} |
454 | } |
455 | } |
456 | } |
457 | |
458 | struct Shared<T> { |
459 | chan: ChanLock<Chan<T>>, |
460 | disconnected: AtomicBool, |
461 | sender_count: AtomicUsize, |
462 | receiver_count: AtomicUsize, |
463 | } |
464 | |
465 | impl<T> Shared<T> { |
466 | fn new(cap: Option<usize>) -> Self { |
467 | Self { |
468 | chan: ChanLock::new(Chan { |
469 | sending: cap.map(|cap| (cap, VecDeque::new())), |
470 | queue: VecDeque::new(), |
471 | waiting: VecDeque::new(), |
472 | }), |
473 | disconnected: AtomicBool::new(false), |
474 | sender_count: AtomicUsize::new(1), |
475 | receiver_count: AtomicUsize::new(1), |
476 | } |
477 | } |
478 | |
479 | fn send<S: Signal, R: From<Result<(), TrySendTimeoutError<T>>>>( |
480 | &self, |
481 | msg: T, |
482 | should_block: bool, |
483 | make_signal: impl FnOnce(T) -> Arc<Hook<T, S>>, |
484 | do_block: impl FnOnce(Arc<Hook<T, S>>) -> R, |
485 | ) -> R { |
486 | let mut chan = wait_lock(&self.chan); |
487 | |
488 | if self.is_disconnected() { |
489 | Err(TrySendTimeoutError::Disconnected(msg)).into() |
490 | } else if !chan.waiting.is_empty() { |
491 | let mut msg = Some(msg); |
492 | |
493 | loop { |
494 | let slot = chan.waiting.pop_front(); |
495 | match slot.as_ref().map(|r| r.fire_send(msg.take().unwrap())) { |
496 | // No more waiting receivers and msg in queue, so break out of the loop |
497 | None if msg.is_none() => break, |
498 | // No more waiting receivers, so add msg to queue and break out of the loop |
499 | None => { |
500 | chan.queue.push_back(msg.unwrap()); |
501 | break; |
502 | } |
503 | Some((Some(m), signal)) => { |
504 | if signal.fire() { |
505 | // Was async and a stream, so didn't acquire the message. Wake another |
506 | // receiver, and do not yet push the message. |
507 | msg.replace(m); |
508 | continue; |
509 | } else { |
510 | // Was async and not a stream, so it did acquire the message. Push the |
511 | // message to the queue for it to be received. |
512 | chan.queue.push_back(m); |
513 | drop(chan); |
514 | break; |
515 | } |
516 | }, |
517 | Some((None, signal)) => { |
518 | drop(chan); |
519 | signal.fire(); |
520 | break; // Was sync, so it has acquired the message |
521 | }, |
522 | } |
523 | } |
524 | |
525 | Ok(()).into() |
526 | } else if chan.sending.as_ref().map(|(cap, _)| chan.queue.len() < *cap).unwrap_or(true) { |
527 | chan.queue.push_back(msg); |
528 | Ok(()).into() |
529 | } else if should_block { // Only bounded from here on |
530 | let hook = make_signal(msg); |
531 | chan.sending.as_mut().unwrap().1.push_back(hook.clone()); |
532 | drop(chan); |
533 | |
534 | do_block(hook) |
535 | } else { |
536 | Err(TrySendTimeoutError::Full(msg)).into() |
537 | } |
538 | } |
539 | |
540 | fn send_sync( |
541 | &self, |
542 | msg: T, |
543 | block: Option<Option<Instant>>, |
544 | ) -> Result<(), TrySendTimeoutError<T>> { |
545 | self.send( |
546 | // msg |
547 | msg, |
548 | // should_block |
549 | block.is_some(), |
550 | // make_signal |
551 | |msg| Hook::slot(Some(msg), SyncSignal::default()), |
552 | // do_block |
553 | |hook| if let Some(deadline) = block.unwrap() { |
554 | hook.wait_deadline_send(&self.disconnected, deadline) |
555 | .or_else(|timed_out| { |
556 | if timed_out { // Remove our signal |
557 | let hook: Arc<Hook<T, dyn signal::Signal>> = hook.clone(); |
558 | wait_lock(&self.chan).sending |
559 | .as_mut() |
560 | .unwrap().1 |
561 | .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); |
562 | } |
563 | hook.try_take().map(|msg| if self.is_disconnected() { |
564 | Err(TrySendTimeoutError::Disconnected(msg)) |
565 | } else { |
566 | Err(TrySendTimeoutError::Timeout(msg)) |
567 | }) |
568 | .unwrap_or(Ok(())) |
569 | }) |
570 | } else { |
571 | hook.wait_send(&self.disconnected); |
572 | |
573 | match hook.try_take() { |
574 | Some(msg) => Err(TrySendTimeoutError::Disconnected(msg)), |
575 | None => Ok(()), |
576 | } |
577 | }, |
578 | ) |
579 | } |
580 | |
581 | fn recv<S: Signal, R: From<Result<T, TryRecvTimeoutError>>>( |
582 | &self, |
583 | should_block: bool, |
584 | make_signal: impl FnOnce() -> Arc<Hook<T, S>>, |
585 | do_block: impl FnOnce(Arc<Hook<T, S>>) -> R, |
586 | ) -> R { |
587 | let mut chan = wait_lock(&self.chan); |
588 | chan.pull_pending(true); |
589 | |
590 | if let Some(msg) = chan.queue.pop_front() { |
591 | drop(chan); |
592 | Ok(msg).into() |
593 | } else if self.is_disconnected() { |
594 | drop(chan); |
595 | Err(TryRecvTimeoutError::Disconnected).into() |
596 | } else if should_block { |
597 | let hook = make_signal(); |
598 | chan.waiting.push_back(hook.clone()); |
599 | drop(chan); |
600 | |
601 | do_block(hook) |
602 | } else { |
603 | drop(chan); |
604 | Err(TryRecvTimeoutError::Empty).into() |
605 | } |
606 | } |
607 | |
608 | fn recv_sync(&self, block: Option<Option<Instant>>) -> Result<T, TryRecvTimeoutError> { |
609 | self.recv( |
610 | // should_block |
611 | block.is_some(), |
612 | // make_signal |
613 | || Hook::slot(None, SyncSignal::default()), |
614 | // do_block |
615 | |hook| if let Some(deadline) = block.unwrap() { |
616 | hook.wait_deadline_recv(&self.disconnected, deadline) |
617 | .or_else(|timed_out| { |
618 | if timed_out { // Remove our signal |
619 | let hook: Arc<Hook<T, dyn Signal>> = hook.clone(); |
620 | wait_lock(&self.chan).waiting |
621 | .retain(|s| s.signal().as_ptr() != hook.signal().as_ptr()); |
622 | } |
623 | match hook.try_take() { |
624 | Some(msg) => Ok(msg), |
625 | None => { |
626 | let disconnected = self.is_disconnected(); // Check disconnect *before* msg |
627 | if let Some(msg) = wait_lock(&self.chan).queue.pop_front() { |
628 | Ok(msg) |
629 | } else if disconnected { |
630 | Err(TryRecvTimeoutError::Disconnected) |
631 | } else { |
632 | Err(TryRecvTimeoutError::Timeout) |
633 | } |
634 | }, |
635 | } |
636 | }) |
637 | } else { |
638 | hook.wait_recv(&self.disconnected) |
639 | .or_else(|| wait_lock(&self.chan).queue.pop_front()) |
640 | .ok_or(TryRecvTimeoutError::Disconnected) |
641 | }, |
642 | ) |
643 | } |
644 | |
645 | /// Disconnect anything listening on this channel (this will not prevent receivers receiving |
646 | /// msgs that have already been sent) |
647 | fn disconnect_all(&self) { |
648 | self.disconnected.store(true, Ordering::Relaxed); |
649 | |
650 | let mut chan = wait_lock(&self.chan); |
651 | chan.pull_pending(false); |
652 | if let Some((_, sending)) = chan.sending.as_ref() { |
653 | sending.iter().for_each(|hook| { |
654 | hook.signal().fire(); |
655 | }) |
656 | } |
657 | chan.waiting.iter().for_each(|hook| { |
658 | hook.signal().fire(); |
659 | }); |
660 | } |
661 | |
662 | fn is_disconnected(&self) -> bool { |
663 | self.disconnected.load(Ordering::SeqCst) |
664 | } |
665 | |
666 | fn is_empty(&self) -> bool { |
667 | self.len() == 0 |
668 | } |
669 | |
670 | fn is_full(&self) -> bool { |
671 | self.capacity().map(|cap| cap == self.len()).unwrap_or(false) |
672 | } |
673 | |
674 | fn len(&self) -> usize { |
675 | let mut chan = wait_lock(&self.chan); |
676 | chan.pull_pending(false); |
677 | chan.queue.len() |
678 | } |
679 | |
680 | fn capacity(&self) -> Option<usize> { |
681 | wait_lock(&self.chan).sending.as_ref().map(|(cap, _)| *cap) |
682 | } |
683 | |
684 | fn sender_count(&self) -> usize { |
685 | self.sender_count.load(Ordering::Relaxed) |
686 | } |
687 | |
688 | fn receiver_count(&self) -> usize { |
689 | self.receiver_count.load(Ordering::Relaxed) |
690 | } |
691 | } |
692 | |
693 | /// A transmitting end of a channel. |
694 | pub struct Sender<T> { |
695 | shared: Arc<Shared<T>>, |
696 | } |
697 | |
698 | impl<T> Sender<T> { |
699 | /// Attempt to send a value into the channel. If the channel is bounded and full, or all |
700 | /// receivers have been dropped, an error is returned. If the channel associated with this |
701 | /// sender is unbounded, this method has the same behaviour as [`Sender::send`]. |
702 | pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> { |
703 | self.shared.send_sync(msg, None).map_err(|err| match err { |
704 | TrySendTimeoutError::Full(msg) => TrySendError::Full(msg), |
705 | TrySendTimeoutError::Disconnected(msg) => TrySendError::Disconnected(msg), |
706 | _ => unreachable!(), |
707 | }) |
708 | } |
709 | |
710 | /// Send a value into the channel, returning an error if all receivers have been dropped. |
711 | /// If the channel is bounded and is full, this method will block until space is available |
712 | /// or all receivers have been dropped. If the channel is unbounded, this method will not |
713 | /// block. |
714 | pub fn send(&self, msg: T) -> Result<(), SendError<T>> { |
715 | self.shared.send_sync(msg, Some(None)).map_err(|err| match err { |
716 | TrySendTimeoutError::Disconnected(msg) => SendError(msg), |
717 | _ => unreachable!(), |
718 | }) |
719 | } |
720 | |
721 | /// Send a value into the channel, returning an error if all receivers have been dropped |
722 | /// or the deadline has passed. If the channel is bounded and is full, this method will |
723 | /// block until space is available, the deadline is reached, or all receivers have been |
724 | /// dropped. |
725 | pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> { |
726 | self.shared.send_sync(msg, Some(Some(deadline))).map_err(|err| match err { |
727 | TrySendTimeoutError::Disconnected(msg) => SendTimeoutError::Disconnected(msg), |
728 | TrySendTimeoutError::Timeout(msg) => SendTimeoutError::Timeout(msg), |
729 | _ => unreachable!(), |
730 | }) |
731 | } |
732 | |
733 | /// Send a value into the channel, returning an error if all receivers have been dropped |
734 | /// or the timeout has expired. If the channel is bounded and is full, this method will |
735 | /// block until space is available, the timeout has expired, or all receivers have been |
736 | /// dropped. |
737 | pub fn send_timeout(&self, msg: T, dur: Duration) -> Result<(), SendTimeoutError<T>> { |
738 | self.send_deadline(msg, Instant::now().checked_add(dur).unwrap()) |
739 | } |
740 | |
741 | /// Returns true if all receivers for this channel have been dropped. |
742 | pub fn is_disconnected(&self) -> bool { |
743 | self.shared.is_disconnected() |
744 | } |
745 | |
746 | /// Returns true if the channel is empty. |
747 | /// Note: Zero-capacity channels are always empty. |
748 | pub fn is_empty(&self) -> bool { |
749 | self.shared.is_empty() |
750 | } |
751 | |
752 | /// Returns true if the channel is full. |
753 | /// Note: Zero-capacity channels are always full. |
754 | pub fn is_full(&self) -> bool { |
755 | self.shared.is_full() |
756 | } |
757 | |
758 | /// Returns the number of messages in the channel |
759 | pub fn len(&self) -> usize { |
760 | self.shared.len() |
761 | } |
762 | |
763 | /// If the channel is bounded, returns its capacity. |
764 | pub fn capacity(&self) -> Option<usize> { |
765 | self.shared.capacity() |
766 | } |
767 | |
768 | /// Get the number of senders that currently exist, including this one. |
769 | pub fn sender_count(&self) -> usize { |
770 | self.shared.sender_count() |
771 | } |
772 | |
773 | /// Get the number of receivers that currently exist. |
774 | /// |
775 | /// Note that this method makes no guarantees that a subsequent send will succeed; it's |
776 | /// possible that between `receiver_count()` being called and a `send()`, all open receivers |
777 | /// could drop. |
778 | pub fn receiver_count(&self) -> usize { |
779 | self.shared.receiver_count() |
780 | } |
781 | |
782 | /// Creates a [`WeakSender`] that does not keep the channel open. |
783 | /// |
784 | /// The channel is closed once all `Sender`s are dropped, even if there |
785 | /// are still active `WeakSender`s. |
786 | pub fn downgrade(&self) -> WeakSender<T> { |
787 | WeakSender { |
788 | shared: Arc::downgrade(&self.shared), |
789 | } |
790 | } |
791 | |
792 | /// Returns whether the senders are belong to the same channel. |
793 | pub fn same_channel(&self, other: &Sender<T>) -> bool { |
794 | Arc::ptr_eq(&self.shared, &other.shared) |
795 | } |
796 | } |
797 | |
798 | impl<T> Clone for Sender<T> { |
799 | /// Clone this sender. [`Sender`] acts as a handle to the ending a channel. Remaining channel |
800 | /// contents will only be cleaned up when all senders and the receiver have been dropped. |
801 | fn clone(&self) -> Self { |
802 | self.shared.sender_count.fetch_add(val:1, order:Ordering::Relaxed); |
803 | Self { shared: self.shared.clone() } |
804 | } |
805 | } |
806 | |
807 | impl<T> fmt::Debug for Sender<T> { |
808 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
809 | f.debug_struct(name:"Sender" ).finish() |
810 | } |
811 | } |
812 | |
813 | impl<T> Drop for Sender<T> { |
814 | fn drop(&mut self) { |
815 | // Notify receivers that all senders have been dropped if the number of senders drops to 0. |
816 | if self.shared.sender_count.fetch_sub(val:1, order:Ordering::Relaxed) == 1 { |
817 | self.shared.disconnect_all(); |
818 | } |
819 | } |
820 | } |
821 | |
822 | /// A sender that does not prevent the channel from being closed. |
823 | /// |
824 | /// Weak senders do not count towards the number of active senders on the channel. As soon as |
825 | /// all normal [`Sender`]s are dropped, the channel is closed, even if there is still a |
826 | /// `WeakSender`. |
827 | /// |
828 | /// To send messages, a `WeakSender` must first be upgraded to a `Sender` using the [`upgrade`] |
829 | /// method. |
830 | pub struct WeakSender<T> { |
831 | shared: Weak<Shared<T>>, |
832 | } |
833 | |
834 | impl<T> WeakSender<T> { |
835 | /// Tries to upgrade the `WeakSender` to a [`Sender`], in order to send messages. |
836 | /// |
837 | /// Returns `None` if the channel was closed already. Note that a `Some` return value |
838 | /// does not guarantee that the channel is still open. |
839 | pub fn upgrade(&self) -> Option<Sender<T>> { |
840 | self.shared |
841 | .upgrade() |
842 | // check that there are still live senders |
843 | .filter(|shared| { |
844 | shared |
845 | .sender_count |
846 | .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |count| { |
847 | if count == 0 { |
848 | // all senders are closed already -> don't increase the sender count |
849 | None |
850 | } else { |
851 | // there is still at least one active sender |
852 | Some(count + 1) |
853 | } |
854 | }) |
855 | .is_ok() |
856 | }) |
857 | .map(|shared| Sender { shared }) |
858 | } |
859 | } |
860 | |
861 | /// The receiving end of a channel. |
862 | /// |
863 | /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel. |
864 | /// Each message will only be received by a single receiver. This is useful for |
865 | /// implementing work stealing for concurrent programs. |
866 | pub struct Receiver<T> { |
867 | shared: Arc<Shared<T>>, |
868 | } |
869 | |
870 | impl<T> Receiver<T> { |
871 | /// Attempt to fetch an incoming value from the channel associated with this receiver, |
872 | /// returning an error if the channel is empty or if all senders have been dropped. |
873 | pub fn try_recv(&self) -> Result<T, TryRecvError> { |
874 | self.shared.recv_sync(None).map_err(|err| match err { |
875 | TryRecvTimeoutError::Disconnected => TryRecvError::Disconnected, |
876 | TryRecvTimeoutError::Empty => TryRecvError::Empty, |
877 | _ => unreachable!(), |
878 | }) |
879 | } |
880 | |
881 | /// Wait for an incoming value from the channel associated with this receiver, returning an |
882 | /// error if all senders have been dropped. |
883 | pub fn recv(&self) -> Result<T, RecvError> { |
884 | self.shared.recv_sync(Some(None)).map_err(|err| match err { |
885 | TryRecvTimeoutError::Disconnected => RecvError::Disconnected, |
886 | _ => unreachable!(), |
887 | }) |
888 | } |
889 | |
890 | /// Wait for an incoming value from the channel associated with this receiver, returning an |
891 | /// error if all senders have been dropped or the deadline has passed. |
892 | pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> { |
893 | self.shared.recv_sync(Some(Some(deadline))).map_err(|err| match err { |
894 | TryRecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, |
895 | TryRecvTimeoutError::Timeout => RecvTimeoutError::Timeout, |
896 | _ => unreachable!(), |
897 | }) |
898 | } |
899 | |
900 | /// Wait for an incoming value from the channel associated with this receiver, returning an |
901 | /// error if all senders have been dropped or the timeout has expired. |
902 | pub fn recv_timeout(&self, dur: Duration) -> Result<T, RecvTimeoutError> { |
903 | self.recv_deadline(Instant::now().checked_add(dur).unwrap()) |
904 | } |
905 | |
906 | /// Create a blocking iterator over the values received on the channel that finishes iteration |
907 | /// when all senders have been dropped. |
908 | /// |
909 | /// You can also create a self-owned iterator with [`Receiver::into_iter`]. |
910 | pub fn iter(&self) -> Iter<T> { |
911 | Iter { receiver: &self } |
912 | } |
913 | |
914 | /// A non-blocking iterator over the values received on the channel that finishes iteration |
915 | /// when all senders have been dropped or the channel is empty. |
916 | pub fn try_iter(&self) -> TryIter<T> { |
917 | TryIter { receiver: &self } |
918 | } |
919 | |
920 | /// Take all msgs currently sitting in the channel and produce an iterator over them. Unlike |
921 | /// `try_iter`, the iterator will not attempt to fetch any more values from the channel once |
922 | /// the function has been called. |
923 | pub fn drain(&self) -> Drain<T> { |
924 | let mut chan = wait_lock(&self.shared.chan); |
925 | chan.pull_pending(false); |
926 | let queue = std::mem::take(&mut chan.queue); |
927 | |
928 | Drain { queue, _phantom: PhantomData } |
929 | } |
930 | |
931 | /// Returns true if all senders for this channel have been dropped. |
932 | pub fn is_disconnected(&self) -> bool { |
933 | self.shared.is_disconnected() |
934 | } |
935 | |
936 | /// Returns true if the channel is empty. |
937 | /// Note: Zero-capacity channels are always empty. |
938 | pub fn is_empty(&self) -> bool { |
939 | self.shared.is_empty() |
940 | } |
941 | |
942 | /// Returns true if the channel is full. |
943 | /// Note: Zero-capacity channels are always full. |
944 | pub fn is_full(&self) -> bool { |
945 | self.shared.is_full() |
946 | } |
947 | |
948 | /// Returns the number of messages in the channel. |
949 | pub fn len(&self) -> usize { |
950 | self.shared.len() |
951 | } |
952 | |
953 | /// If the channel is bounded, returns its capacity. |
954 | pub fn capacity(&self) -> Option<usize> { |
955 | self.shared.capacity() |
956 | } |
957 | |
958 | /// Get the number of senders that currently exist. |
959 | pub fn sender_count(&self) -> usize { |
960 | self.shared.sender_count() |
961 | } |
962 | |
963 | /// Get the number of receivers that currently exist, including this one. |
964 | pub fn receiver_count(&self) -> usize { |
965 | self.shared.receiver_count() |
966 | } |
967 | |
968 | /// Returns whether the receivers are belong to the same channel. |
969 | pub fn same_channel(&self, other: &Receiver<T>) -> bool { |
970 | Arc::ptr_eq(&self.shared, &other.shared) |
971 | } |
972 | } |
973 | |
974 | impl<T> Clone for Receiver<T> { |
975 | /// Clone this receiver. [`Receiver`] acts as a handle to the ending a channel. Remaining |
976 | /// channel contents will only be cleaned up when all senders and the receiver have been |
977 | /// dropped. |
978 | /// |
979 | /// Note: Cloning the receiver *does not* turn this channel into a broadcast channel. |
980 | /// Each message will only be received by a single receiver. This is useful for |
981 | /// implementing work stealing for concurrent programs. |
982 | fn clone(&self) -> Self { |
983 | self.shared.receiver_count.fetch_add(val:1, order:Ordering::Relaxed); |
984 | Self { shared: self.shared.clone() } |
985 | } |
986 | } |
987 | |
988 | impl<T> fmt::Debug for Receiver<T> { |
989 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
990 | f.debug_struct(name:"Receiver" ).finish() |
991 | } |
992 | } |
993 | |
994 | impl<T> Drop for Receiver<T> { |
995 | fn drop(&mut self) { |
996 | // Notify senders that all receivers have been dropped if the number of receivers drops |
997 | // to 0. |
998 | if self.shared.receiver_count.fetch_sub(val:1, order:Ordering::Relaxed) == 1 { |
999 | self.shared.disconnect_all(); |
1000 | } |
1001 | } |
1002 | } |
1003 | |
1004 | /// This exists as a shorthand for [`Receiver::iter`]. |
1005 | impl<'a, T> IntoIterator for &'a Receiver<T> { |
1006 | type Item = T; |
1007 | type IntoIter = Iter<'a, T>; |
1008 | |
1009 | fn into_iter(self) -> Self::IntoIter { |
1010 | Iter { receiver: self } |
1011 | } |
1012 | } |
1013 | |
1014 | impl<T> IntoIterator for Receiver<T> { |
1015 | type Item = T; |
1016 | type IntoIter = IntoIter<T>; |
1017 | |
1018 | /// Creates a self-owned but semantically equivalent alternative to [`Receiver::iter`]. |
1019 | fn into_iter(self) -> Self::IntoIter { |
1020 | IntoIter { receiver: self } |
1021 | } |
1022 | } |
1023 | |
1024 | /// An iterator over the msgs received from a channel. |
1025 | pub struct Iter<'a, T> { |
1026 | receiver: &'a Receiver<T>, |
1027 | } |
1028 | |
1029 | impl<'a, T> Iterator for Iter<'a, T> { |
1030 | type Item = T; |
1031 | |
1032 | fn next(&mut self) -> Option<Self::Item> { |
1033 | self.receiver.recv().ok() |
1034 | } |
1035 | } |
1036 | |
1037 | /// An non-blocking iterator over the msgs received from a channel. |
1038 | pub struct TryIter<'a, T> { |
1039 | receiver: &'a Receiver<T>, |
1040 | } |
1041 | |
1042 | impl<'a, T> Iterator for TryIter<'a, T> { |
1043 | type Item = T; |
1044 | |
1045 | fn next(&mut self) -> Option<Self::Item> { |
1046 | self.receiver.try_recv().ok() |
1047 | } |
1048 | } |
1049 | |
1050 | /// An fixed-sized iterator over the msgs drained from a channel. |
1051 | #[derive (Debug)] |
1052 | pub struct Drain<'a, T> { |
1053 | queue: VecDeque<T>, |
1054 | /// A phantom field used to constrain the lifetime of this iterator. We do this because the |
1055 | /// implementation may change and we don't want to unintentionally constrain it. Removing this |
1056 | /// lifetime later is a possibility. |
1057 | _phantom: PhantomData<&'a ()>, |
1058 | } |
1059 | |
1060 | impl<'a, T> Iterator for Drain<'a, T> { |
1061 | type Item = T; |
1062 | |
1063 | fn next(&mut self) -> Option<Self::Item> { |
1064 | self.queue.pop_front() |
1065 | } |
1066 | } |
1067 | |
1068 | impl<'a, T> ExactSizeIterator for Drain<'a, T> { |
1069 | fn len(&self) -> usize { |
1070 | self.queue.len() |
1071 | } |
1072 | } |
1073 | |
1074 | /// An owned iterator over the msgs received from a channel. |
1075 | pub struct IntoIter<T> { |
1076 | receiver: Receiver<T>, |
1077 | } |
1078 | |
1079 | impl<T> Iterator for IntoIter<T> { |
1080 | type Item = T; |
1081 | |
1082 | fn next(&mut self) -> Option<Self::Item> { |
1083 | self.receiver.recv().ok() |
1084 | } |
1085 | } |
1086 | |
1087 | /// Create a channel with no maximum capacity. |
1088 | /// |
1089 | /// Create an unbounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in |
1090 | /// one end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and |
1091 | /// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`] |
1092 | /// may be cloned. |
1093 | /// |
1094 | /// # Examples |
1095 | /// ``` |
1096 | /// let (tx, rx) = flume::unbounded(); |
1097 | /// |
1098 | /// tx.send(42).unwrap(); |
1099 | /// assert_eq!(rx.recv().unwrap(), 42); |
1100 | /// ``` |
1101 | pub fn unbounded<T>() -> (Sender<T>, Receiver<T>) { |
1102 | let shared: Arc> = Arc::new(data:Shared::new(cap:None)); |
1103 | ( |
1104 | Sender { shared: shared.clone() }, |
1105 | Receiver { shared }, |
1106 | ) |
1107 | } |
1108 | |
1109 | /// Create a channel with a maximum capacity. |
1110 | /// |
1111 | /// Create a bounded channel with a [`Sender`] and [`Receiver`] connected to each end respectively. Values sent in one |
1112 | /// end of the channel will be received on the other end. The channel is thread-safe, and both [`Sender`] and |
1113 | /// [`Receiver`] may be sent to or shared between threads as necessary. In addition, both [`Sender`] and [`Receiver`] |
1114 | /// may be cloned. |
1115 | /// |
1116 | /// Unlike an [`unbounded`] channel, if there is no space left for new messages, calls to |
1117 | /// [`Sender::send`] will block (unblocking once a receiver has made space). If blocking behaviour |
1118 | /// is not desired, [`Sender::try_send`] may be used. |
1119 | /// |
1120 | /// Like `std::sync::mpsc`, `flume` supports 'rendezvous' channels. A bounded queue with a maximum capacity of zero |
1121 | /// will block senders until a receiver is available to take the value. You can imagine a rendezvous channel as a |
1122 | /// ['Glienicke Bridge'](https://en.wikipedia.org/wiki/Glienicke_Bridge)-style location at which senders and receivers |
1123 | /// perform a handshake and transfer ownership of a value. |
1124 | /// |
1125 | /// # Examples |
1126 | /// ``` |
1127 | /// let (tx, rx) = flume::bounded(32); |
1128 | /// |
1129 | /// for i in 1..33 { |
1130 | /// tx.send(i).unwrap(); |
1131 | /// } |
1132 | /// assert!(tx.try_send(33).is_err()); |
1133 | /// |
1134 | /// assert_eq!(rx.try_iter().sum::<u32>(), (1..33).sum()); |
1135 | /// ``` |
1136 | pub fn bounded<T>(cap: usize) -> (Sender<T>, Receiver<T>) { |
1137 | let shared: Arc> = Arc::new(data:Shared::new(cap:Some(cap))); |
1138 | ( |
1139 | Sender { shared: shared.clone() }, |
1140 | Receiver { shared }, |
1141 | ) |
1142 | } |
1143 | |