1use futures::channel::oneshot;
2use futures::executor::{block_on, LocalPool};
3use futures::future::{self, FutureExt, LocalFutureObj, TryFutureExt};
4use futures::task::LocalSpawn;
5use std::cell::{Cell, RefCell};
6use std::panic::AssertUnwindSafe;
7use std::rc::Rc;
8use std::task::Poll;
9use std::thread;
10
11struct CountClone(Rc<Cell<i32>>);
12
13impl Clone for CountClone {
14 fn clone(&self) -> Self {
15 self.0.set(self.0.get() + 1);
16 Self(self.0.clone())
17 }
18}
19
20fn send_shared_oneshot_and_wait_on_multiple_threads(threads_number: u32) {
21 let (tx, rx) = oneshot::channel::<i32>();
22 let f = rx.shared();
23 let join_handles = (0..threads_number)
24 .map(|_| {
25 let cloned_future = f.clone();
26 thread::spawn(move || {
27 assert_eq!(block_on(cloned_future).unwrap(), 6);
28 })
29 })
30 .collect::<Vec<_>>();
31
32 tx.send(6).unwrap();
33
34 assert_eq!(block_on(f).unwrap(), 6);
35 for join_handle in join_handles {
36 join_handle.join().unwrap();
37 }
38}
39
40#[test]
41fn one_thread() {
42 send_shared_oneshot_and_wait_on_multiple_threads(1);
43}
44
45#[test]
46fn two_threads() {
47 send_shared_oneshot_and_wait_on_multiple_threads(2);
48}
49
50#[test]
51fn many_threads() {
52 send_shared_oneshot_and_wait_on_multiple_threads(1000);
53}
54
55#[test]
56fn drop_on_one_task_ok() {
57 let (tx, rx) = oneshot::channel::<u32>();
58 let f1 = rx.shared();
59 let f2 = f1.clone();
60
61 let (tx2, rx2) = oneshot::channel::<u32>();
62
63 let t1 = thread::spawn(|| {
64 let f = future::try_select(f1.map_err(|_| ()), rx2.map_err(|_| ()));
65 drop(block_on(f));
66 });
67
68 let (tx3, rx3) = oneshot::channel::<u32>();
69
70 let t2 = thread::spawn(|| {
71 let _ = block_on(f2.map_ok(|x| tx3.send(x).unwrap()).map_err(|_| ()));
72 });
73
74 tx2.send(11).unwrap(); // cancel `f1`
75 t1.join().unwrap();
76
77 tx.send(42).unwrap(); // Should cause `f2` and then `rx3` to get resolved.
78 let result = block_on(rx3).unwrap();
79 assert_eq!(result, 42);
80 t2.join().unwrap();
81}
82
83#[test]
84fn drop_in_poll() {
85 let slot1 = Rc::new(RefCell::new(None));
86 let slot2 = slot1.clone();
87
88 let future1 = future::lazy(move |_| {
89 slot2.replace(None); // Drop future
90 1
91 })
92 .shared();
93
94 let future2 = LocalFutureObj::new(Box::new(future1.clone()));
95 slot1.replace(Some(future2));
96
97 assert_eq!(block_on(future1), 1);
98}
99
100#[test]
101fn peek() {
102 let mut local_pool = LocalPool::new();
103 let spawn = &mut local_pool.spawner();
104
105 let (tx0, rx0) = oneshot::channel::<i32>();
106 let f1 = rx0.shared();
107 let f2 = f1.clone();
108
109 // Repeated calls on the original or clone do not change the outcome.
110 for _ in 0..2 {
111 assert!(f1.peek().is_none());
112 assert!(f2.peek().is_none());
113 }
114
115 // Completing the underlying future has no effect, because the value has not been `poll`ed in.
116 tx0.send(42).unwrap();
117 for _ in 0..2 {
118 assert!(f1.peek().is_none());
119 assert!(f2.peek().is_none());
120 }
121
122 // Once the Shared has been polled, the value is peekable on the clone.
123 spawn.spawn_local_obj(LocalFutureObj::new(Box::new(f1.map(|_| ())))).unwrap();
124 local_pool.run();
125 for _ in 0..2 {
126 assert_eq!(*f2.peek().unwrap(), Ok(42));
127 }
128}
129
130#[test]
131fn downgrade() {
132 let (tx, rx) = oneshot::channel::<i32>();
133 let shared = rx.shared();
134 // Since there are outstanding `Shared`s, we can get a `WeakShared`.
135 let weak = shared.downgrade().unwrap();
136 // It should upgrade fine right now.
137 let mut shared2 = weak.upgrade().unwrap();
138
139 tx.send(42).unwrap();
140 assert_eq!(block_on(shared).unwrap(), 42);
141
142 // We should still be able to get a new `WeakShared` and upgrade it
143 // because `shared2` is outstanding.
144 assert!(shared2.downgrade().is_some());
145 assert!(weak.upgrade().is_some());
146
147 assert_eq!(block_on(&mut shared2).unwrap(), 42);
148 // Now that all `Shared`s have been exhausted, we should not be able
149 // to get a new `WeakShared` or upgrade an existing one.
150 assert!(weak.upgrade().is_none());
151 assert!(shared2.downgrade().is_none());
152}
153
154#[test]
155fn ptr_eq() {
156 use future::FusedFuture;
157 use std::collections::hash_map::DefaultHasher;
158 use std::hash::Hasher;
159
160 let (tx, rx) = oneshot::channel::<i32>();
161 let shared = rx.shared();
162 let mut shared2 = shared.clone();
163 let mut hasher = DefaultHasher::new();
164 let mut hasher2 = DefaultHasher::new();
165
166 // Because these two futures share the same underlying future,
167 // `ptr_eq` should return true.
168 assert!(shared.ptr_eq(&shared2));
169 // Equivalence relations are symmetric
170 assert!(shared2.ptr_eq(&shared));
171
172 // If `ptr_eq` returns true, they should hash to the same value.
173 shared.ptr_hash(&mut hasher);
174 shared2.ptr_hash(&mut hasher2);
175 assert_eq!(hasher.finish(), hasher2.finish());
176
177 tx.send(42).unwrap();
178 assert_eq!(block_on(&mut shared2).unwrap(), 42);
179
180 // Now that `shared2` has completed, `ptr_eq` should return false.
181 assert!(shared2.is_terminated());
182 assert!(!shared.ptr_eq(&shared2));
183
184 // `ptr_eq` should continue to work for the other `Shared`.
185 let shared3 = shared.clone();
186 let mut hasher3 = DefaultHasher::new();
187 assert!(shared.ptr_eq(&shared3));
188
189 shared3.ptr_hash(&mut hasher3);
190 assert_eq!(hasher.finish(), hasher3.finish());
191
192 let (_tx, rx) = oneshot::channel::<i32>();
193 let shared4 = rx.shared();
194
195 // And `ptr_eq` should return false for two futures that don't share
196 // the underlying future.
197 assert!(!shared.ptr_eq(&shared4));
198}
199
200#[test]
201fn dont_clone_in_single_owner_shared_future() {
202 let counter = CountClone(Rc::new(Cell::new(0)));
203 let (tx, rx) = oneshot::channel();
204
205 let rx = rx.shared();
206
207 tx.send(counter).ok().unwrap();
208
209 assert_eq!(block_on(rx).unwrap().0.get(), 0);
210}
211
212#[test]
213fn dont_do_unnecessary_clones_on_output() {
214 let counter = CountClone(Rc::new(Cell::new(0)));
215 let (tx, rx) = oneshot::channel();
216
217 let rx = rx.shared();
218
219 tx.send(counter).ok().unwrap();
220
221 assert_eq!(block_on(rx.clone()).unwrap().0.get(), 1);
222 assert_eq!(block_on(rx.clone()).unwrap().0.get(), 2);
223 assert_eq!(block_on(rx).unwrap().0.get(), 2);
224}
225
226#[test]
227fn shared_future_that_wakes_itself_until_pending_is_returned() {
228 let proceed = Cell::new(false);
229 let fut = futures::future::poll_fn(|cx| {
230 if proceed.get() {
231 Poll::Ready(())
232 } else {
233 cx.waker().wake_by_ref();
234 Poll::Pending
235 }
236 })
237 .shared();
238
239 // The join future can only complete if the second future gets a chance to run after the first
240 // has returned pending
241 assert_eq!(block_on(futures::future::join(fut, async { proceed.set(true) })), ((), ()));
242}
243
244#[test]
245#[should_panic(expected = "inner future panicked during poll")]
246fn panic_while_poll() {
247 let fut = futures::future::poll_fn::<i8, _>(|_cx| panic!("test")).shared();
248
249 let fut_captured = fut.clone();
250 std::panic::catch_unwind(AssertUnwindSafe(|| {
251 block_on(fut_captured);
252 }))
253 .unwrap_err();
254
255 block_on(fut);
256}
257
258#[test]
259#[should_panic(expected = "test_marker")]
260fn poll_while_panic() {
261 struct S;
262
263 impl Drop for S {
264 fn drop(&mut self) {
265 let fut = futures::future::ready(1).shared();
266 assert_eq!(block_on(fut.clone()), 1);
267 assert_eq!(block_on(fut), 1);
268 }
269 }
270
271 let _s = S {};
272 panic!("test_marker");
273}
274