1#![cfg(feature = "bilock")]
2
3use futures::executor::block_on;
4use futures::future;
5use futures::stream;
6use futures::task::{Context, Poll};
7use futures::Future;
8use futures::StreamExt;
9use futures_test::task::noop_context;
10use futures_util::lock::BiLock;
11use std::pin::Pin;
12use std::thread;
13
14#[test]
15fn smoke() {
16 let future = future::lazy(|cx| {
17 let (a, b) = BiLock::new(1);
18
19 {
20 let mut lock = match a.poll_lock(cx) {
21 Poll::Ready(l) => l,
22 Poll::Pending => panic!("poll not ready"),
23 };
24 assert_eq!(*lock, 1);
25 *lock = 2;
26
27 assert!(b.poll_lock(cx).is_pending());
28 assert!(a.poll_lock(cx).is_pending());
29 }
30
31 assert!(b.poll_lock(cx).is_ready());
32 assert!(a.poll_lock(cx).is_ready());
33
34 {
35 let lock = match b.poll_lock(cx) {
36 Poll::Ready(l) => l,
37 Poll::Pending => panic!("poll not ready"),
38 };
39 assert_eq!(*lock, 2);
40 }
41
42 assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error"), 2);
43
44 Ok::<(), ()>(())
45 });
46
47 assert_eq!(block_on(future), Ok(()));
48}
49
50#[test]
51fn concurrent() {
52 const N: usize = 10000;
53 let mut cx = noop_context();
54 let (a, b) = BiLock::new(0);
55
56 let a = Increment { a: Some(a), remaining: N };
57 let b = stream::iter(0..N).fold(b, |b, _n| async {
58 let mut g = b.lock().await;
59 *g += 1;
60 drop(g);
61 b
62 });
63
64 let t1 = thread::spawn(move || block_on(a));
65 let b = block_on(b);
66 let a = t1.join().unwrap();
67
68 match a.poll_lock(&mut cx) {
69 Poll::Ready(l) => assert_eq!(*l, 2 * N),
70 Poll::Pending => panic!("poll not ready"),
71 }
72 match b.poll_lock(&mut cx) {
73 Poll::Ready(l) => assert_eq!(*l, 2 * N),
74 Poll::Pending => panic!("poll not ready"),
75 }
76
77 assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error"), 2 * N);
78
79 struct Increment {
80 remaining: usize,
81 a: Option<BiLock<usize>>,
82 }
83
84 impl Future for Increment {
85 type Output = BiLock<usize>;
86
87 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> {
88 loop {
89 if self.remaining == 0 {
90 return self.a.take().unwrap().into();
91 }
92
93 let a = self.a.as_mut().unwrap();
94 let mut a = match a.poll_lock(cx) {
95 Poll::Ready(l) => l,
96 Poll::Pending => return Poll::Pending,
97 };
98 *a += 1;
99 drop(a);
100 self.remaining -= 1;
101 }
102 }
103 }
104}
105