1 | use futures::channel::mpsc; |
2 | use futures::executor::block_on; |
3 | use futures::future::Future; |
4 | use futures::sink::SinkExt; |
5 | use futures::stream::StreamExt; |
6 | use futures::task::{Context, Poll}; |
7 | use std::pin::Pin; |
8 | use std::sync::{Arc, Weak}; |
9 | use std::thread; |
10 | use std::time::{Duration, Instant}; |
11 | |
12 | #[test] |
13 | fn smoke() { |
14 | let (mut sender, receiver) = mpsc::channel(1); |
15 | |
16 | let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {}); |
17 | |
18 | // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join. |
19 | block_on(receiver.take(3).for_each(|_| futures::future::ready(()))); |
20 | |
21 | t.join().unwrap() |
22 | } |
23 | |
24 | #[test] |
25 | fn multiple_senders_disconnect() { |
26 | { |
27 | let (mut tx1, mut rx) = mpsc::channel(1); |
28 | let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); |
29 | |
30 | // disconnect, dropping and Sink::poll_close should all close this sender but leave the |
31 | // channel open for other senders |
32 | tx1.disconnect(); |
33 | drop(tx2); |
34 | block_on(tx3.close()).unwrap(); |
35 | |
36 | assert!(tx1.is_closed()); |
37 | assert!(tx3.is_closed()); |
38 | assert!(!tx4.is_closed()); |
39 | |
40 | block_on(tx4.send(5)).unwrap(); |
41 | assert_eq!(block_on(rx.next()), Some(5)); |
42 | |
43 | // dropping the final sender will close the channel |
44 | drop(tx4); |
45 | assert_eq!(block_on(rx.next()), None); |
46 | } |
47 | |
48 | { |
49 | let (mut tx1, mut rx) = mpsc::unbounded(); |
50 | let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone()); |
51 | |
52 | // disconnect, dropping and Sink::poll_close should all close this sender but leave the |
53 | // channel open for other senders |
54 | tx1.disconnect(); |
55 | drop(tx2); |
56 | block_on(tx3.close()).unwrap(); |
57 | |
58 | assert!(tx1.is_closed()); |
59 | assert!(tx3.is_closed()); |
60 | assert!(!tx4.is_closed()); |
61 | |
62 | block_on(tx4.send(5)).unwrap(); |
63 | assert_eq!(block_on(rx.next()), Some(5)); |
64 | |
65 | // dropping the final sender will close the channel |
66 | drop(tx4); |
67 | assert_eq!(block_on(rx.next()), None); |
68 | } |
69 | } |
70 | |
71 | #[test] |
72 | fn multiple_senders_close_channel() { |
73 | { |
74 | let (mut tx1, mut rx) = mpsc::channel(1); |
75 | let mut tx2 = tx1.clone(); |
76 | |
77 | // close_channel should shut down the whole channel |
78 | tx1.close_channel(); |
79 | |
80 | assert!(tx1.is_closed()); |
81 | assert!(tx2.is_closed()); |
82 | |
83 | let err = block_on(tx2.send(5)).unwrap_err(); |
84 | assert!(err.is_disconnected()); |
85 | |
86 | assert_eq!(block_on(rx.next()), None); |
87 | } |
88 | |
89 | { |
90 | let (tx1, mut rx) = mpsc::unbounded(); |
91 | let mut tx2 = tx1.clone(); |
92 | |
93 | // close_channel should shut down the whole channel |
94 | tx1.close_channel(); |
95 | |
96 | assert!(tx1.is_closed()); |
97 | assert!(tx2.is_closed()); |
98 | |
99 | let err = block_on(tx2.send(5)).unwrap_err(); |
100 | assert!(err.is_disconnected()); |
101 | |
102 | assert_eq!(block_on(rx.next()), None); |
103 | } |
104 | } |
105 | |
106 | #[test] |
107 | fn single_receiver_drop_closes_channel_and_drains() { |
108 | { |
109 | let ref_count = Arc::new(0); |
110 | let weak_ref = Arc::downgrade(&ref_count); |
111 | |
112 | let (sender, receiver) = mpsc::unbounded(); |
113 | sender.unbounded_send(ref_count).expect("failed to send" ); |
114 | |
115 | // Verify that the sent message is still live. |
116 | assert!(weak_ref.upgrade().is_some()); |
117 | |
118 | drop(receiver); |
119 | |
120 | // The sender should know the channel is closed. |
121 | assert!(sender.is_closed()); |
122 | |
123 | // Verify that the sent message has been dropped. |
124 | assert!(weak_ref.upgrade().is_none()); |
125 | } |
126 | |
127 | { |
128 | let ref_count = Arc::new(0); |
129 | let weak_ref = Arc::downgrade(&ref_count); |
130 | |
131 | let (mut sender, receiver) = mpsc::channel(1); |
132 | sender.try_send(ref_count).expect("failed to send" ); |
133 | |
134 | // Verify that the sent message is still live. |
135 | assert!(weak_ref.upgrade().is_some()); |
136 | |
137 | drop(receiver); |
138 | |
139 | // The sender should know the channel is closed. |
140 | assert!(sender.is_closed()); |
141 | |
142 | // Verify that the sent message has been dropped. |
143 | assert!(weak_ref.upgrade().is_none()); |
144 | assert!(sender.is_closed()); |
145 | } |
146 | } |
147 | |
148 | // Stress test that `try_send()`s occurring concurrently with receiver |
149 | // close/drops don't appear as successful sends. |
150 | #[cfg_attr (miri, ignore)] // Miri is too slow |
151 | #[test] |
152 | fn stress_try_send_as_receiver_closes() { |
153 | const AMT: usize = 10000; |
154 | // To provide variable timing characteristics (in the hopes of |
155 | // reproducing the collision that leads to a race), we busy-re-poll |
156 | // the test MPSC receiver a variable number of times before actually |
157 | // stopping. We vary this countdown between 1 and the following |
158 | // value. |
159 | const MAX_COUNTDOWN: usize = 20; |
160 | // When we detect that a successfully sent item is still in the |
161 | // queue after a disconnect, we spin for up to 100ms to confirm that |
162 | // it is a persistent condition and not a concurrency illusion. |
163 | const SPIN_TIMEOUT_S: u64 = 10; |
164 | const SPIN_SLEEP_MS: u64 = 10; |
165 | struct TestRx { |
166 | rx: mpsc::Receiver<Arc<()>>, |
167 | // The number of times to query `rx` before dropping it. |
168 | poll_count: usize, |
169 | } |
170 | struct TestTask { |
171 | command_rx: mpsc::Receiver<TestRx>, |
172 | test_rx: Option<mpsc::Receiver<Arc<()>>>, |
173 | countdown: usize, |
174 | } |
175 | impl TestTask { |
176 | /// Create a new TestTask |
177 | fn new() -> (TestTask, mpsc::Sender<TestRx>) { |
178 | let (command_tx, command_rx) = mpsc::channel::<TestRx>(0); |
179 | ( |
180 | TestTask { |
181 | command_rx, |
182 | test_rx: None, |
183 | countdown: 0, // 0 means no countdown is in progress. |
184 | }, |
185 | command_tx, |
186 | ) |
187 | } |
188 | } |
189 | impl Future for TestTask { |
190 | type Output = (); |
191 | |
192 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { |
193 | // Poll the test channel, if one is present. |
194 | if let Some(rx) = &mut self.test_rx { |
195 | if let Poll::Ready(v) = rx.poll_next_unpin(cx) { |
196 | let _ = v.expect("test finished unexpectedly!" ); |
197 | } |
198 | self.countdown -= 1; |
199 | // Busy-poll until the countdown is finished. |
200 | cx.waker().wake_by_ref(); |
201 | } |
202 | // Accept any newly submitted MPSC channels for testing. |
203 | match self.command_rx.poll_next_unpin(cx) { |
204 | Poll::Ready(Some(TestRx { rx, poll_count })) => { |
205 | self.test_rx = Some(rx); |
206 | self.countdown = poll_count; |
207 | cx.waker().wake_by_ref(); |
208 | } |
209 | Poll::Ready(None) => return Poll::Ready(()), |
210 | Poll::Pending => {} |
211 | } |
212 | if self.countdown == 0 { |
213 | // Countdown complete -- drop the Receiver. |
214 | self.test_rx = None; |
215 | } |
216 | Poll::Pending |
217 | } |
218 | } |
219 | let (f, mut cmd_tx) = TestTask::new(); |
220 | let bg = thread::spawn(move || block_on(f)); |
221 | for i in 0..AMT { |
222 | let (mut test_tx, rx) = mpsc::channel(0); |
223 | let poll_count = i % MAX_COUNTDOWN; |
224 | cmd_tx.try_send(TestRx { rx, poll_count }).unwrap(); |
225 | let mut prev_weak: Option<Weak<()>> = None; |
226 | let mut attempted_sends = 0; |
227 | let mut successful_sends = 0; |
228 | loop { |
229 | // Create a test item. |
230 | let item = Arc::new(()); |
231 | let weak = Arc::downgrade(&item); |
232 | match test_tx.try_send(item) { |
233 | Ok(_) => { |
234 | prev_weak = Some(weak); |
235 | successful_sends += 1; |
236 | } |
237 | Err(ref e) if e.is_full() => {} |
238 | Err(ref e) if e.is_disconnected() => { |
239 | // Test for evidence of the race condition. |
240 | if let Some(prev_weak) = prev_weak { |
241 | if prev_weak.upgrade().is_some() { |
242 | // The previously sent item is still allocated. |
243 | // However, there appears to be some aspect of the |
244 | // concurrency that can legitimately cause the Arc |
245 | // to be momentarily valid. Spin for up to 100ms |
246 | // waiting for the previously sent item to be |
247 | // dropped. |
248 | let t0 = Instant::now(); |
249 | let mut spins = 0; |
250 | loop { |
251 | if prev_weak.upgrade().is_none() { |
252 | break; |
253 | } |
254 | assert!( |
255 | t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S), |
256 | "item not dropped on iteration {} after \ |
257 | {} sends ({} successful). spin=({})" , |
258 | i, |
259 | attempted_sends, |
260 | successful_sends, |
261 | spins |
262 | ); |
263 | spins += 1; |
264 | thread::sleep(Duration::from_millis(SPIN_SLEEP_MS)); |
265 | } |
266 | } |
267 | } |
268 | break; |
269 | } |
270 | Err(ref e) => panic!("unexpected error: {}" , e), |
271 | } |
272 | attempted_sends += 1; |
273 | } |
274 | } |
275 | drop(cmd_tx); |
276 | bg.join().expect("background thread join" ); |
277 | } |
278 | |
279 | #[test] |
280 | fn unbounded_try_next_after_none() { |
281 | let (tx, mut rx) = mpsc::unbounded::<String>(); |
282 | // Drop the sender, close the channel. |
283 | drop(tx); |
284 | // Receive the end of channel. |
285 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
286 | // None received, check we can call `try_next` again. |
287 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
288 | } |
289 | |
290 | #[test] |
291 | fn bounded_try_next_after_none() { |
292 | let (tx, mut rx) = mpsc::channel::<String>(17); |
293 | // Drop the sender, close the channel. |
294 | drop(tx); |
295 | // Receive the end of channel. |
296 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
297 | // None received, check we can call `try_next` again. |
298 | assert_eq!(Ok(None), rx.try_next().map_err(|_| ())); |
299 | } |
300 | |