1 | #![cfg (feature = "bilock" )] |
2 | |
3 | use futures::executor::block_on; |
4 | use futures::future; |
5 | use futures::stream; |
6 | use futures::task::{Context, Poll}; |
7 | use futures::Future; |
8 | use futures::StreamExt; |
9 | use futures_test::task::noop_context; |
10 | use futures_util::lock::BiLock; |
11 | use std::pin::Pin; |
12 | use std::thread; |
13 | |
14 | #[test] |
15 | fn smoke() { |
16 | let future = future::lazy(|cx| { |
17 | let (a, b) = BiLock::new(1); |
18 | |
19 | { |
20 | let mut lock = match a.poll_lock(cx) { |
21 | Poll::Ready(l) => l, |
22 | Poll::Pending => panic!("poll not ready" ), |
23 | }; |
24 | assert_eq!(*lock, 1); |
25 | *lock = 2; |
26 | |
27 | assert!(b.poll_lock(cx).is_pending()); |
28 | assert!(a.poll_lock(cx).is_pending()); |
29 | } |
30 | |
31 | assert!(b.poll_lock(cx).is_ready()); |
32 | assert!(a.poll_lock(cx).is_ready()); |
33 | |
34 | { |
35 | let lock = match b.poll_lock(cx) { |
36 | Poll::Ready(l) => l, |
37 | Poll::Pending => panic!("poll not ready" ), |
38 | }; |
39 | assert_eq!(*lock, 2); |
40 | } |
41 | |
42 | assert_eq!(a.reunite(b).expect("bilock/smoke: reunite error" ), 2); |
43 | |
44 | Ok::<(), ()>(()) |
45 | }); |
46 | |
47 | assert_eq!(block_on(future), Ok(())); |
48 | } |
49 | |
50 | #[test] |
51 | fn concurrent() { |
52 | const N: usize = 10000; |
53 | let mut cx = noop_context(); |
54 | let (a, b) = BiLock::new(0); |
55 | |
56 | let a = Increment { a: Some(a), remaining: N }; |
57 | let b = stream::iter(0..N).fold(b, |b, _n| async { |
58 | let mut g = b.lock().await; |
59 | *g += 1; |
60 | drop(g); |
61 | b |
62 | }); |
63 | |
64 | let t1 = thread::spawn(move || block_on(a)); |
65 | let b = block_on(b); |
66 | let a = t1.join().unwrap(); |
67 | |
68 | match a.poll_lock(&mut cx) { |
69 | Poll::Ready(l) => assert_eq!(*l, 2 * N), |
70 | Poll::Pending => panic!("poll not ready" ), |
71 | } |
72 | match b.poll_lock(&mut cx) { |
73 | Poll::Ready(l) => assert_eq!(*l, 2 * N), |
74 | Poll::Pending => panic!("poll not ready" ), |
75 | } |
76 | |
77 | assert_eq!(a.reunite(b).expect("bilock/concurrent: reunite error" ), 2 * N); |
78 | |
79 | struct Increment { |
80 | remaining: usize, |
81 | a: Option<BiLock<usize>>, |
82 | } |
83 | |
84 | impl Future for Increment { |
85 | type Output = BiLock<usize>; |
86 | |
87 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<BiLock<usize>> { |
88 | loop { |
89 | if self.remaining == 0 { |
90 | return self.a.take().unwrap().into(); |
91 | } |
92 | |
93 | let a = self.a.as_mut().unwrap(); |
94 | let mut a = match a.poll_lock(cx) { |
95 | Poll::Ready(l) => l, |
96 | Poll::Pending => return Poll::Pending, |
97 | }; |
98 | *a += 1; |
99 | drop(a); |
100 | self.remaining -= 1; |
101 | } |
102 | } |
103 | } |
104 | } |
105 | |