1// Take a look at the license at the top of the repository in the LICENSE file.
2
3use std::{
4 collections::VecDeque,
5 fmt, mem, ptr,
6 sync::{mpsc, Arc, Condvar, Mutex},
7};
8
9use crate::{
10 thread_guard::ThreadGuard, translate::*, ControlFlow, MainContext, Priority, Source, SourceId,
11};
12
13enum ChannelSourceState {
14 NotAttached,
15 Attached(*mut ffi::GSource),
16 Destroyed,
17}
18
19unsafe impl Send for ChannelSourceState {}
20unsafe impl Sync for ChannelSourceState {}
21
22struct ChannelInner<T> {
23 queue: VecDeque<T>,
24 source: ChannelSourceState,
25 num_senders: usize,
26}
27
28impl<T> ChannelInner<T> {
29 fn receiver_disconnected(&self) -> bool {
30 match self.source {
31 ChannelSourceState::Destroyed => true,
32 // Receiver exists but is already destroyed
33 ChannelSourceState::Attached(source)
34 if unsafe { ffi::g_source_is_destroyed(source) } != ffi::GFALSE =>
35 {
36 true
37 }
38 // Not attached yet so the Receiver still exists
39 ChannelSourceState::NotAttached => false,
40 // Receiver still running
41 ChannelSourceState::Attached(_) => false,
42 }
43 }
44
45 #[doc(alias = "g_source_set_ready_time")]
46 fn set_ready_time(&mut self, ready_time: i64) {
47 if let ChannelSourceState::Attached(source) = self.source {
48 unsafe {
49 ffi::g_source_set_ready_time(source, ready_time);
50 }
51 }
52 }
53}
54
55struct ChannelBound {
56 bound: usize,
57 cond: Condvar,
58}
59
60struct Channel<T>(Arc<(Mutex<ChannelInner<T>>, Option<ChannelBound>)>);
61
62impl<T> Clone for Channel<T> {
63 fn clone(&self) -> Channel<T> {
64 Channel(self.0.clone())
65 }
66}
67
68impl<T> Channel<T> {
69 fn new(bound: Option<usize>) -> Channel<T> {
70 Channel(Arc::new((
71 Mutex::new(ChannelInner {
72 queue: VecDeque::new(),
73 source: ChannelSourceState::NotAttached,
74 num_senders: 0,
75 }),
76 bound.map(|bound| ChannelBound {
77 bound,
78 cond: Condvar::new(),
79 }),
80 )))
81 }
82
83 fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
84 let mut inner = (self.0).0.lock().unwrap();
85
86 // If we have a bounded channel then we need to wait here until enough free space is
87 // available or the receiver disappears
88 //
89 // A special case here is a bound of 0: the queue must be empty for accepting
90 // new data and then we will again wait later for the data to be actually taken
91 // out
92 if let Some(ChannelBound { bound, ref cond }) = (self.0).1 {
93 while inner.queue.len() >= bound
94 && !inner.queue.is_empty()
95 && !inner.receiver_disconnected()
96 {
97 inner = cond.wait(inner).unwrap();
98 }
99 }
100
101 // Error out directly if the receiver is disconnected
102 if inner.receiver_disconnected() {
103 return Err(mpsc::SendError(t));
104 }
105
106 // Store the item on our queue
107 inner.queue.push_back(t);
108
109 // and then wake up the GSource
110 inner.set_ready_time(0);
111
112 // If we have a bound of 0 we need to wait until the receiver actually
113 // handled the data
114 if let Some(ChannelBound { bound: 0, ref cond }) = (self.0).1 {
115 while !inner.queue.is_empty() && !inner.receiver_disconnected() {
116 inner = cond.wait(inner).unwrap();
117 }
118
119 // If the receiver was destroyed in the meantime take out the item and report an error
120 if inner.receiver_disconnected() {
121 // If the item is not in the queue anymore then the receiver just handled it before
122 // getting disconnected and all is good
123 if let Some(t) = inner.queue.pop_front() {
124 return Err(mpsc::SendError(t));
125 }
126 }
127 }
128
129 Ok(())
130 }
131
132 fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
133 let mut inner = (self.0).0.lock().unwrap();
134
135 let ChannelBound { bound, ref cond } = (self.0)
136 .1
137 .as_ref()
138 .expect("called try_send() on an unbounded channel");
139
140 // Check if the queue is full and handle the special case of a 0 bound
141 if inner.queue.len() >= *bound && !inner.queue.is_empty() {
142 return Err(mpsc::TrySendError::Full(t));
143 }
144
145 // Error out directly if the receiver is disconnected
146 if inner.receiver_disconnected() {
147 return Err(mpsc::TrySendError::Disconnected(t));
148 }
149
150 // Store the item on our queue
151 inner.queue.push_back(t);
152
153 // and then wake up the GSource
154 inner.set_ready_time(0);
155
156 // If we have a bound of 0 we need to wait until the receiver actually
157 // handled the data
158 if *bound == 0 {
159 while !inner.queue.is_empty() && !inner.receiver_disconnected() {
160 inner = cond.wait(inner).unwrap();
161 }
162
163 // If the receiver was destroyed in the meantime take out the item and report an error
164 if inner.receiver_disconnected() {
165 // If the item is not in the queue anymore then the receiver just handled it before
166 // getting disconnected and all is good
167 if let Some(t) = inner.queue.pop_front() {
168 return Err(mpsc::TrySendError::Disconnected(t));
169 }
170 }
171 }
172
173 Ok(())
174 }
175
176 // SAFETY: Must be called from the main context the channel was attached to.
177 unsafe fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
178 let mut inner = (self.0).0.lock().unwrap();
179
180 // Pop item if we have any
181 if let Some(item) = inner.queue.pop_front() {
182 // Wake up a sender that is currently waiting, if any
183 if let Some(ChannelBound { ref cond, .. }) = (self.0).1 {
184 cond.notify_one();
185 }
186 return Ok(item);
187 }
188
189 // If there are no senders left we are disconnected or otherwise empty. That's the case if
190 // the only remaining strong reference is the one of the receiver
191 if inner.num_senders == 0 {
192 Err(mpsc::TryRecvError::Disconnected)
193 } else {
194 Err(mpsc::TryRecvError::Empty)
195 }
196 }
197}
198
199#[repr(C)]
200struct ChannelSource<T, F: FnMut(T) -> ControlFlow + 'static> {
201 source: ffi::GSource,
202 source_funcs: Box<ffi::GSourceFuncs>,
203 channel: Channel<T>,
204 callback: ThreadGuard<F>,
205}
206
207unsafe extern "C" fn dispatch<T, F: FnMut(T) -> ControlFlow + 'static>(
208 source: *mut ffi::GSource,
209 callback: ffi::GSourceFunc,
210 _user_data: ffi::gpointer,
211) -> ffi::gboolean {
212 let source = &mut *(source as *mut ChannelSource<T, F>);
213 debug_assert!(callback.is_none());
214
215 // Set ready-time to -1 so that we won't get called again before a new item is added
216 // to the channel queue.
217 ffi::g_source_set_ready_time(&mut source.source, -1);
218
219 // Get a reference to the callback. This will panic if we're called from a different
220 // thread than where the source was attached to the main context.
221 let callback = source.callback.get_mut();
222
223 // Now iterate over all items that we currently have in the channel until it is
224 // empty again. If all senders are disconnected at some point we remove the GSource
225 // from the main context it was attached to as it will never ever be called again.
226 loop {
227 match source.channel.try_recv() {
228 Err(mpsc::TryRecvError::Empty) => break,
229 Err(mpsc::TryRecvError::Disconnected) => return ffi::G_SOURCE_REMOVE,
230 Ok(item) => {
231 if callback(item).is_break() {
232 return ffi::G_SOURCE_REMOVE;
233 }
234 }
235 }
236 }
237
238 ffi::G_SOURCE_CONTINUE
239}
240
241#[cfg(feature = "v2_64")]
242unsafe extern "C" fn dispose<T, F: FnMut(T) -> ControlFlow + 'static>(source: *mut ffi::GSource) {
243 let source = &mut *(source as *mut ChannelSource<T, F>);
244
245 // Set the source inside the channel to None so that all senders know that there
246 // is no receiver left and wake up the condition variable if any
247 let mut inner = (source.channel.0).0.lock().unwrap();
248 inner.source = ChannelSourceState::Destroyed;
249 if let Some(ChannelBound { ref cond, .. }) = (source.channel.0).1 {
250 cond.notify_all();
251 }
252}
253
254unsafe extern "C" fn finalize<T, F: FnMut(T) -> ControlFlow + 'static>(source: *mut ffi::GSource) {
255 let source = &mut *(source as *mut ChannelSource<T, F>);
256
257 // Drop all memory we own by taking it out of the Options
258
259 #[cfg(not(feature = "v2_64"))]
260 {
261 // FIXME: This is the same as would otherwise be done in the dispose() function but
262 // unfortunately it doesn't exist in older version of GLib. Doing it only here can
263 // cause a channel sender to get a reference to the source with reference count 0
264 // if it happens just before the mutex is taken below.
265 //
266 // This is exactly the pattern why g_source_set_dispose_function() was added.
267 //
268 // Set the source inside the channel to None so that all senders know that there
269 // is no receiver left and wake up the condition variable if any
270 let mut inner = (source.channel.0).0.lock().unwrap();
271 inner.source = ChannelSourceState::Destroyed;
272 if let Some(ChannelBound { ref cond, .. }) = (source.channel.0).1 {
273 cond.notify_all();
274 }
275 }
276 ptr::drop_in_place(&mut source.channel);
277 ptr::drop_in_place(&mut source.source_funcs);
278
279 // Take the callback out of the source. This will panic if the value is dropped
280 // from a different thread than where the callback was created so try to drop it
281 // from the main context if we're on another thread and the main context still exists.
282 //
283 // This can only really happen if the caller to `attach()` gets the `Source` from the returned
284 // `SourceId` and sends it to another thread or otherwise retrieves it from the main context,
285 // but better safe than sorry.
286 if source.callback.is_owner() {
287 ptr::drop_in_place(&mut source.callback);
288 } else {
289 let callback = ptr::read(&source.callback);
290 let context =
291 ffi::g_source_get_context(source as *mut ChannelSource<T, F> as *mut ffi::GSource);
292 if !context.is_null() {
293 let context = MainContext::from_glib_none(context);
294 context.invoke(move || {
295 drop(callback);
296 });
297 }
298 }
299}
300
301// rustdoc-stripper-ignore-next
302/// A `Sender` that can be used to send items to the corresponding main context receiver.
303///
304/// This `Sender` behaves the same as `std::sync::mpsc::Sender`.
305///
306/// See [`MainContext::channel()`] for how to create such a `Sender`.
307///
308/// [`MainContext::channel()`]: struct.MainContext.html#method.channel
309pub struct Sender<T>(Channel<T>);
310
311// It's safe to send the Sender to other threads for attaching it as
312// long as the items to be sent can also be sent between threads.
313unsafe impl<T: Send> Send for Sender<T> {}
314
315impl<T> fmt::Debug for Sender<T> {
316 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
317 f.debug_struct(name:"Sender").finish()
318 }
319}
320
321impl<T> Clone for Sender<T> {
322 fn clone(&self) -> Sender<T> {
323 Self::new(&self.0)
324 }
325}
326
327impl<T> Sender<T> {
328 fn new(channel: &Channel<T>) -> Self {
329 let mut inner: MutexGuard<'_, ChannelInner<…>> = (channel.0).0.lock().unwrap();
330 inner.num_senders += 1;
331 Self(channel.clone())
332 }
333
334 // rustdoc-stripper-ignore-next
335 /// Sends a value to the channel.
336 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
337 self.0.send(t)
338 }
339}
340
341impl<T> Drop for Sender<T> {
342 fn drop(&mut self) {
343 // Decrease the number of senders and wake up the channel if this
344 // was the last sender that was dropped.
345 let mut inner: MutexGuard<'_, ChannelInner<…>> = ((self.0).0).0.lock().unwrap();
346 inner.num_senders -= 1;
347 if inner.num_senders == 0 {
348 inner.set_ready_time(0);
349 }
350 }
351}
352
353// rustdoc-stripper-ignore-next
354/// A `SyncSender` that can be used to send items to the corresponding main context receiver.
355///
356/// This `SyncSender` behaves the same as `std::sync::mpsc::SyncSender`.
357///
358/// See [`MainContext::sync_channel()`] for how to create such a `SyncSender`.
359///
360/// [`MainContext::sync_channel()`]: struct.MainContext.html#method.sync_channel
361pub struct SyncSender<T>(Channel<T>);
362
363// It's safe to send the SyncSender to other threads for attaching it as
364// long as the items to be sent can also be sent between threads.
365unsafe impl<T: Send> Send for SyncSender<T> {}
366
367impl<T> fmt::Debug for SyncSender<T> {
368 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
369 f.debug_struct(name:"SyncSender").finish()
370 }
371}
372
373impl<T> Clone for SyncSender<T> {
374 fn clone(&self) -> SyncSender<T> {
375 Self::new(&self.0)
376 }
377}
378
379impl<T> SyncSender<T> {
380 fn new(channel: &Channel<T>) -> Self {
381 let mut inner: MutexGuard<'_, ChannelInner<…>> = (channel.0).0.lock().unwrap();
382 inner.num_senders += 1;
383 Self(channel.clone())
384 }
385
386 // rustdoc-stripper-ignore-next
387 /// Sends a value to the channel and blocks if the channel is full.
388 pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
389 self.0.send(t)
390 }
391
392 // rustdoc-stripper-ignore-next
393 /// Sends a value to the channel.
394 pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
395 self.0.try_send(t)
396 }
397}
398
399impl<T> Drop for SyncSender<T> {
400 fn drop(&mut self) {
401 // Decrease the number of senders and wake up the channel if this
402 // was the last sender that was dropped.
403 let mut inner: MutexGuard<'_, ChannelInner<…>> = ((self.0).0).0.lock().unwrap();
404 inner.num_senders -= 1;
405 if inner.num_senders == 0 {
406 inner.set_ready_time(0);
407 }
408 }
409}
410
411// rustdoc-stripper-ignore-next
412/// A `Receiver` that can be attached to a main context to receive items from its corresponding
413/// `Sender` or `SyncSender`.
414///
415/// See [`MainContext::channel()`] or [`MainContext::sync_channel()`] for how to create
416/// such a `Receiver`.
417///
418/// [`MainContext::channel()`]: struct.MainContext.html#method.channel
419/// [`MainContext::sync_channel()`]: struct.MainContext.html#method.sync_channel
420pub struct Receiver<T>(Option<Channel<T>>, Priority);
421
422impl<T> fmt::Debug for Receiver<T> {
423 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424 f.debug_struct(name:"Receiver").finish()
425 }
426}
427
428// It's safe to send the Receiver to other threads for attaching it as
429// long as the items to be sent can also be sent between threads.
430unsafe impl<T: Send> Send for Receiver<T> {}
431
432impl<T> Drop for Receiver<T> {
433 fn drop(&mut self) {
434 // If the receiver was never attached to a main context we need to let all the senders know
435 if let Some(channel: Channel) = self.0.take() {
436 let mut inner: MutexGuard<'_, ChannelInner<…>> = (channel.0).0.lock().unwrap();
437 inner.source = ChannelSourceState::Destroyed;
438 if let Some(ChannelBound { ref cond: &Condvar, .. }) = (channel.0).1 {
439 cond.notify_all();
440 }
441 }
442 }
443}
444
445impl<T> Receiver<T> {
446 // rustdoc-stripper-ignore-next
447 /// Attaches the receiver to the given `context` and calls `func` whenever an item is
448 /// available on the channel.
449 ///
450 /// Passing `None` for the context will attach it to the thread default main context.
451 ///
452 /// # Panics
453 ///
454 /// This function panics if called from a thread that is not the owner of the provided
455 /// `context`, or, if `None` is provided, of the thread default main context.
456 pub fn attach<F: FnMut(T) -> ControlFlow + 'static>(
457 mut self,
458 context: Option<&MainContext>,
459 func: F,
460 ) -> SourceId {
461 unsafe {
462 let channel = self.0.take().expect("Receiver without channel");
463
464 let source_funcs = Box::new(ffi::GSourceFuncs {
465 check: None,
466 prepare: None,
467 dispatch: Some(dispatch::<T, F>),
468 finalize: Some(finalize::<T, F>),
469 closure_callback: None,
470 closure_marshal: None,
471 });
472
473 let source = ffi::g_source_new(
474 mut_override(&*source_funcs),
475 mem::size_of::<ChannelSource<T, F>>() as u32,
476 ) as *mut ChannelSource<T, F>;
477
478 #[cfg(feature = "v2_64")]
479 {
480 ffi::g_source_set_dispose_function(
481 source as *mut ffi::GSource,
482 Some(dispose::<T, F>),
483 );
484 }
485
486 // Set up the GSource
487 {
488 let source = &mut *source;
489 let mut inner = (channel.0).0.lock().unwrap();
490
491 ffi::g_source_set_priority(mut_override(&source.source), self.1.into_glib());
492
493 // We're immediately ready if the queue is not empty or if no sender is left at this point
494 ffi::g_source_set_ready_time(
495 mut_override(&source.source),
496 if !inner.queue.is_empty() || inner.num_senders == 0 {
497 0
498 } else {
499 -1
500 },
501 );
502 inner.source = ChannelSourceState::Attached(&mut source.source);
503 }
504
505 // Store all our data inside our part of the GSource
506 {
507 let source = &mut *source;
508 ptr::write(ptr::addr_of_mut!(source.channel), channel);
509 ptr::write(ptr::addr_of_mut!(source.callback), ThreadGuard::new(func));
510 ptr::write(ptr::addr_of_mut!(source.source_funcs), source_funcs);
511 }
512
513 let source = Source::from_glib_full(mut_override(&(*source).source));
514 let context = match context {
515 Some(context) => context.clone(),
516 None => MainContext::ref_thread_default(),
517 };
518
519 let _acquire = context
520 .acquire()
521 .expect("main context already acquired by another thread");
522 source.attach(Some(&context))
523 }
524 }
525}
526
527impl MainContext {
528 // rustdoc-stripper-ignore-next
529 /// Creates a channel for a main context.
530 ///
531 /// The `Receiver` has to be attached to a main context at a later time, together with a
532 /// closure that will be called for every item sent to a `Sender`.
533 ///
534 /// The `Sender` can be cloned and both the `Sender` and `Receiver` can be sent to different
535 /// threads as long as the item type implements the `Send` trait.
536 ///
537 /// When the last `Sender` is dropped the channel is removed from the main context. If the
538 /// `Receiver` is dropped and not attached to a main context all sending to the `Sender`
539 /// will fail.
540 ///
541 /// The returned `Sender` behaves the same as `std::sync::mpsc::Sender`.
542 #[deprecated = "Use an async channel, from async-channel for example, on the main context using spawn_future_local() instead"]
543 pub fn channel<T>(priority: Priority) -> (Sender<T>, Receiver<T>) {
544 let channel = Channel::new(None);
545 let receiver = Receiver(Some(channel.clone()), priority);
546 let sender = Sender::new(&channel);
547
548 (sender, receiver)
549 }
550
551 // rustdoc-stripper-ignore-next
552 /// Creates a synchronous channel for a main context with a given bound on the capacity of the
553 /// channel.
554 ///
555 /// The `Receiver` has to be attached to a main context at a later time, together with a
556 /// closure that will be called for every item sent to a `SyncSender`.
557 ///
558 /// The `SyncSender` can be cloned and both the `SyncSender` and `Receiver` can be sent to different
559 /// threads as long as the item type implements the `Send` trait.
560 ///
561 /// When the last `SyncSender` is dropped the channel is removed from the main context. If the
562 /// `Receiver` is dropped and not attached to a main context all sending to the `SyncSender`
563 /// will fail.
564 ///
565 /// The returned `SyncSender` behaves the same as `std::sync::mpsc::SyncSender`.
566 #[deprecated = "Use an async channel, from async-channel for example, on the main context using spawn_future_local() instead"]
567 pub fn sync_channel<T>(priority: Priority, bound: usize) -> (SyncSender<T>, Receiver<T>) {
568 let channel = Channel::new(Some(bound));
569 let receiver = Receiver(Some(channel.clone()), priority);
570 let sender = SyncSender::new(&channel);
571
572 (sender, receiver)
573 }
574}
575
576#[cfg(test)]
577#[allow(deprecated)]
578mod tests {
579 use std::{
580 cell::RefCell,
581 rc::Rc,
582 sync::atomic::{AtomicBool, Ordering},
583 thread, time,
584 };
585
586 use super::*;
587 use crate::MainLoop;
588
589 #[test]
590 fn test_channel() {
591 let c = MainContext::new();
592 let l = MainLoop::new(Some(&c), false);
593
594 let _guard = c.acquire().unwrap();
595
596 let (sender, receiver) = MainContext::channel(Priority::default());
597
598 let sum = Rc::new(RefCell::new(0));
599 let sum_clone = sum.clone();
600 let l_clone = l.clone();
601 receiver.attach(Some(&c), move |item| {
602 *sum_clone.borrow_mut() += item;
603 if *sum_clone.borrow() == 6 {
604 l_clone.quit();
605 ControlFlow::Break
606 } else {
607 ControlFlow::Continue
608 }
609 });
610
611 sender.send(1).unwrap();
612 sender.send(2).unwrap();
613 sender.send(3).unwrap();
614
615 l.run();
616
617 assert_eq!(*sum.borrow(), 6);
618 }
619
620 #[test]
621 fn test_drop_sender() {
622 let c = MainContext::new();
623 let l = MainLoop::new(Some(&c), false);
624
625 let _guard = c.acquire().unwrap();
626
627 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
628
629 struct Helper(MainLoop);
630 impl Drop for Helper {
631 fn drop(&mut self) {
632 self.0.quit();
633 }
634 }
635
636 let helper = Helper(l.clone());
637 receiver.attach(Some(&c), move |_| {
638 let _helper = &helper;
639 ControlFlow::Continue
640 });
641
642 drop(sender);
643
644 l.run();
645 }
646
647 #[test]
648 fn test_drop_receiver() {
649 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
650
651 drop(receiver);
652 assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
653 }
654
655 #[test]
656 fn test_remove_receiver() {
657 let c = MainContext::new();
658
659 let _guard = c.acquire().unwrap();
660
661 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
662
663 let source_id = receiver.attach(Some(&c), move |_| ControlFlow::Continue);
664
665 let source = c.find_source_by_id(&source_id).unwrap();
666 source.destroy();
667
668 assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
669 }
670
671 #[test]
672 fn test_remove_receiver_and_drop_source() {
673 let c = MainContext::new();
674
675 let _guard = c.acquire().unwrap();
676
677 let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
678
679 struct Helper(Arc<AtomicBool>);
680 impl Drop for Helper {
681 fn drop(&mut self) {
682 self.0.store(true, Ordering::Relaxed);
683 }
684 }
685
686 let dropped = Arc::new(AtomicBool::new(false));
687 let helper = Helper(dropped.clone());
688 let source_id = receiver.attach(Some(&c), move |_| {
689 let _helper = &helper;
690 ControlFlow::Continue
691 });
692
693 let source = c.find_source_by_id(&source_id).unwrap();
694 source.destroy();
695
696 // This should drop the closure
697 drop(source);
698
699 assert!(dropped.load(Ordering::Relaxed));
700 assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
701 }
702
703 #[test]
704 fn test_sync_channel() {
705 let c = MainContext::new();
706 let l = MainLoop::new(Some(&c), false);
707
708 let _guard = c.acquire().unwrap();
709
710 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
711
712 let sum = Rc::new(RefCell::new(0));
713 let sum_clone = sum.clone();
714 let l_clone = l.clone();
715 receiver.attach(Some(&c), move |item| {
716 *sum_clone.borrow_mut() += item;
717 if *sum_clone.borrow() == 6 {
718 l_clone.quit();
719 ControlFlow::Break
720 } else {
721 ControlFlow::Continue
722 }
723 });
724
725 let (wait_sender, wait_receiver) = mpsc::channel();
726
727 let thread = thread::spawn(move || {
728 // The first two must succeed
729 sender.try_send(1).unwrap();
730 sender.try_send(2).unwrap();
731
732 // This fill up the channel
733 assert!(sender.try_send(3).is_err());
734 wait_sender.send(()).unwrap();
735
736 // This will block
737 sender.send(3).unwrap();
738 });
739
740 // Wait until the channel is full, and then another
741 // 50ms to make sure the sender is blocked now and
742 // can wake up properly once an item was consumed
743 assert!(wait_receiver.recv().is_ok());
744 thread::sleep(time::Duration::from_millis(50));
745 l.run();
746
747 thread.join().unwrap();
748
749 assert_eq!(*sum.borrow(), 6);
750 }
751
752 #[test]
753 fn test_sync_channel_drop_wakeup() {
754 let c = MainContext::new();
755 let l = MainLoop::new(Some(&c), false);
756
757 let _guard = c.acquire().unwrap();
758
759 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 3);
760
761 let sum = Rc::new(RefCell::new(0));
762 let sum_clone = sum.clone();
763 let l_clone = l.clone();
764 receiver.attach(Some(&c), move |item| {
765 *sum_clone.borrow_mut() += item;
766 if *sum_clone.borrow() == 6 {
767 l_clone.quit();
768 ControlFlow::Break
769 } else {
770 ControlFlow::Continue
771 }
772 });
773
774 let (wait_sender, wait_receiver) = mpsc::channel();
775
776 let thread = thread::spawn(move || {
777 // The first three must succeed
778 sender.try_send(1).unwrap();
779 sender.try_send(2).unwrap();
780 sender.try_send(3).unwrap();
781
782 wait_sender.send(()).unwrap();
783 for i in 4.. {
784 // This will block at some point until the
785 // receiver is removed from the main context
786 if sender.send(i).is_err() {
787 break;
788 }
789 }
790 });
791
792 // Wait until the channel is full, and then another
793 // 50ms to make sure the sender is blocked now and
794 // can wake up properly once an item was consumed
795 assert!(wait_receiver.recv().is_ok());
796 thread::sleep(time::Duration::from_millis(50));
797 l.run();
798
799 thread.join().unwrap();
800
801 assert_eq!(*sum.borrow(), 6);
802 }
803
804 #[test]
805 fn test_sync_channel_drop_receiver_wakeup() {
806 let c = MainContext::new();
807
808 let _guard = c.acquire().unwrap();
809
810 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
811
812 let (wait_sender, wait_receiver) = mpsc::channel();
813
814 let thread = thread::spawn(move || {
815 // The first two must succeed
816 sender.try_send(1).unwrap();
817 sender.try_send(2).unwrap();
818 wait_sender.send(()).unwrap();
819
820 // This will block and then error out because the receiver is destroyed
821 assert!(sender.send(3).is_err());
822 });
823
824 // Wait until the channel is full, and then another
825 // 50ms to make sure the sender is blocked now and
826 // can wake up properly once an item was consumed
827 assert!(wait_receiver.recv().is_ok());
828 thread::sleep(time::Duration::from_millis(50));
829 drop(receiver);
830 thread.join().unwrap();
831 }
832
833 #[test]
834 fn test_sync_channel_rendezvous() {
835 let c = MainContext::new();
836 let l = MainLoop::new(Some(&c), false);
837
838 let _guard = c.acquire().unwrap();
839
840 let (sender, receiver) = MainContext::sync_channel(Priority::default(), 0);
841
842 let (wait_sender, wait_receiver) = mpsc::channel();
843
844 let thread = thread::spawn(move || {
845 wait_sender.send(()).unwrap();
846 sender.send(1).unwrap();
847 wait_sender.send(()).unwrap();
848 sender.send(2).unwrap();
849 wait_sender.send(()).unwrap();
850 sender.send(3).unwrap();
851 wait_sender.send(()).unwrap();
852 });
853
854 // Wait until the thread is started, then wait another 50ms and
855 // during that time it must not have proceeded yet to send the
856 // second item because we did not yet receive the first item.
857 assert!(wait_receiver.recv().is_ok());
858 assert_eq!(
859 wait_receiver.recv_timeout(time::Duration::from_millis(50)),
860 Err(mpsc::RecvTimeoutError::Timeout)
861 );
862
863 let sum = Rc::new(RefCell::new(0));
864 let sum_clone = sum.clone();
865 let l_clone = l.clone();
866 receiver.attach(Some(&c), move |item| {
867 // We consumed one item so there should be one item on
868 // the other receiver now.
869 assert!(wait_receiver.recv().is_ok());
870 *sum_clone.borrow_mut() += item;
871 if *sum_clone.borrow() == 6 {
872 // But as we didn't consume the next one yet, there must be no
873 // other item available yet
874 assert_eq!(
875 wait_receiver.recv_timeout(time::Duration::from_millis(50)),
876 Err(mpsc::RecvTimeoutError::Disconnected)
877 );
878 l_clone.quit();
879 ControlFlow::Break
880 } else {
881 // But as we didn't consume the next one yet, there must be no
882 // other item available yet
883 assert_eq!(
884 wait_receiver.recv_timeout(time::Duration::from_millis(50)),
885 Err(mpsc::RecvTimeoutError::Timeout)
886 );
887 ControlFlow::Continue
888 }
889 });
890 l.run();
891
892 thread.join().unwrap();
893
894 assert_eq!(*sum.borrow(), 6);
895 }
896}
897