1use futures::channel::mpsc;
2use futures::executor::block_on;
3use futures::future::Future;
4use futures::sink::SinkExt;
5use futures::stream::StreamExt;
6use futures::task::{Context, Poll};
7use std::pin::Pin;
8use std::sync::{Arc, Weak};
9use std::thread;
10use std::time::{Duration, Instant};
11
12#[test]
13fn smoke() {
14 let (mut sender, receiver) = mpsc::channel(1);
15
16 let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
17
18 // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
19 block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
20
21 t.join().unwrap()
22}
23
24#[test]
25fn multiple_senders_disconnect() {
26 {
27 let (mut tx1, mut rx) = mpsc::channel(1);
28 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
29
30 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
31 // channel open for other senders
32 tx1.disconnect();
33 drop(tx2);
34 block_on(tx3.close()).unwrap();
35
36 assert!(tx1.is_closed());
37 assert!(tx3.is_closed());
38 assert!(!tx4.is_closed());
39
40 block_on(tx4.send(5)).unwrap();
41 assert_eq!(block_on(rx.next()), Some(5));
42
43 // dropping the final sender will close the channel
44 drop(tx4);
45 assert_eq!(block_on(rx.next()), None);
46 }
47
48 {
49 let (mut tx1, mut rx) = mpsc::unbounded();
50 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
51
52 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
53 // channel open for other senders
54 tx1.disconnect();
55 drop(tx2);
56 block_on(tx3.close()).unwrap();
57
58 assert!(tx1.is_closed());
59 assert!(tx3.is_closed());
60 assert!(!tx4.is_closed());
61
62 block_on(tx4.send(5)).unwrap();
63 assert_eq!(block_on(rx.next()), Some(5));
64
65 // dropping the final sender will close the channel
66 drop(tx4);
67 assert_eq!(block_on(rx.next()), None);
68 }
69}
70
71#[test]
72fn multiple_senders_close_channel() {
73 {
74 let (mut tx1, mut rx) = mpsc::channel(1);
75 let mut tx2 = tx1.clone();
76
77 // close_channel should shut down the whole channel
78 tx1.close_channel();
79
80 assert!(tx1.is_closed());
81 assert!(tx2.is_closed());
82
83 let err = block_on(tx2.send(5)).unwrap_err();
84 assert!(err.is_disconnected());
85
86 assert_eq!(block_on(rx.next()), None);
87 }
88
89 {
90 let (tx1, mut rx) = mpsc::unbounded();
91 let mut tx2 = tx1.clone();
92
93 // close_channel should shut down the whole channel
94 tx1.close_channel();
95
96 assert!(tx1.is_closed());
97 assert!(tx2.is_closed());
98
99 let err = block_on(tx2.send(5)).unwrap_err();
100 assert!(err.is_disconnected());
101
102 assert_eq!(block_on(rx.next()), None);
103 }
104}
105
106#[test]
107fn single_receiver_drop_closes_channel_and_drains() {
108 {
109 let ref_count = Arc::new(0);
110 let weak_ref = Arc::downgrade(&ref_count);
111
112 let (sender, receiver) = mpsc::unbounded();
113 sender.unbounded_send(ref_count).expect("failed to send");
114
115 // Verify that the sent message is still live.
116 assert!(weak_ref.upgrade().is_some());
117
118 drop(receiver);
119
120 // The sender should know the channel is closed.
121 assert!(sender.is_closed());
122
123 // Verify that the sent message has been dropped.
124 assert!(weak_ref.upgrade().is_none());
125 }
126
127 {
128 let ref_count = Arc::new(0);
129 let weak_ref = Arc::downgrade(&ref_count);
130
131 let (mut sender, receiver) = mpsc::channel(1);
132 sender.try_send(ref_count).expect("failed to send");
133
134 // Verify that the sent message is still live.
135 assert!(weak_ref.upgrade().is_some());
136
137 drop(receiver);
138
139 // The sender should know the channel is closed.
140 assert!(sender.is_closed());
141
142 // Verify that the sent message has been dropped.
143 assert!(weak_ref.upgrade().is_none());
144 assert!(sender.is_closed());
145 }
146}
147
148// Stress test that `try_send()`s occurring concurrently with receiver
149// close/drops don't appear as successful sends.
150#[cfg_attr(miri, ignore)] // Miri is too slow
151#[test]
152fn stress_try_send_as_receiver_closes() {
153 const AMT: usize = 10000;
154 // To provide variable timing characteristics (in the hopes of
155 // reproducing the collision that leads to a race), we busy-re-poll
156 // the test MPSC receiver a variable number of times before actually
157 // stopping. We vary this countdown between 1 and the following
158 // value.
159 const MAX_COUNTDOWN: usize = 20;
160 // When we detect that a successfully sent item is still in the
161 // queue after a disconnect, we spin for up to 100ms to confirm that
162 // it is a persistent condition and not a concurrency illusion.
163 const SPIN_TIMEOUT_S: u64 = 10;
164 const SPIN_SLEEP_MS: u64 = 10;
165 struct TestRx {
166 rx: mpsc::Receiver<Arc<()>>,
167 // The number of times to query `rx` before dropping it.
168 poll_count: usize,
169 }
170 struct TestTask {
171 command_rx: mpsc::Receiver<TestRx>,
172 test_rx: Option<mpsc::Receiver<Arc<()>>>,
173 countdown: usize,
174 }
175 impl TestTask {
176 /// Create a new TestTask
177 fn new() -> (TestTask, mpsc::Sender<TestRx>) {
178 let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
179 (
180 TestTask {
181 command_rx,
182 test_rx: None,
183 countdown: 0, // 0 means no countdown is in progress.
184 },
185 command_tx,
186 )
187 }
188 }
189 impl Future for TestTask {
190 type Output = ();
191
192 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
193 // Poll the test channel, if one is present.
194 if let Some(rx) = &mut self.test_rx {
195 if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
196 let _ = v.expect("test finished unexpectedly!");
197 }
198 self.countdown -= 1;
199 // Busy-poll until the countdown is finished.
200 cx.waker().wake_by_ref();
201 }
202 // Accept any newly submitted MPSC channels for testing.
203 match self.command_rx.poll_next_unpin(cx) {
204 Poll::Ready(Some(TestRx { rx, poll_count })) => {
205 self.test_rx = Some(rx);
206 self.countdown = poll_count;
207 cx.waker().wake_by_ref();
208 }
209 Poll::Ready(None) => return Poll::Ready(()),
210 Poll::Pending => {}
211 }
212 if self.countdown == 0 {
213 // Countdown complete -- drop the Receiver.
214 self.test_rx = None;
215 }
216 Poll::Pending
217 }
218 }
219 let (f, mut cmd_tx) = TestTask::new();
220 let bg = thread::spawn(move || block_on(f));
221 for i in 0..AMT {
222 let (mut test_tx, rx) = mpsc::channel(0);
223 let poll_count = i % MAX_COUNTDOWN;
224 cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
225 let mut prev_weak: Option<Weak<()>> = None;
226 let mut attempted_sends = 0;
227 let mut successful_sends = 0;
228 loop {
229 // Create a test item.
230 let item = Arc::new(());
231 let weak = Arc::downgrade(&item);
232 match test_tx.try_send(item) {
233 Ok(_) => {
234 prev_weak = Some(weak);
235 successful_sends += 1;
236 }
237 Err(ref e) if e.is_full() => {}
238 Err(ref e) if e.is_disconnected() => {
239 // Test for evidence of the race condition.
240 if let Some(prev_weak) = prev_weak {
241 if prev_weak.upgrade().is_some() {
242 // The previously sent item is still allocated.
243 // However, there appears to be some aspect of the
244 // concurrency that can legitimately cause the Arc
245 // to be momentarily valid. Spin for up to 100ms
246 // waiting for the previously sent item to be
247 // dropped.
248 let t0 = Instant::now();
249 let mut spins = 0;
250 loop {
251 if prev_weak.upgrade().is_none() {
252 break;
253 }
254 assert!(
255 t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
256 "item not dropped on iteration {} after \
257 {} sends ({} successful). spin=({})",
258 i,
259 attempted_sends,
260 successful_sends,
261 spins
262 );
263 spins += 1;
264 thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
265 }
266 }
267 }
268 break;
269 }
270 Err(ref e) => panic!("unexpected error: {}", e),
271 }
272 attempted_sends += 1;
273 }
274 }
275 drop(cmd_tx);
276 bg.join().expect("background thread join");
277}
278
279#[test]
280fn unbounded_try_next_after_none() {
281 let (tx, mut rx) = mpsc::unbounded::<String>();
282 // Drop the sender, close the channel.
283 drop(tx);
284 // Receive the end of channel.
285 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
286 // None received, check we can call `try_next` again.
287 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
288}
289
290#[test]
291fn bounded_try_next_after_none() {
292 let (tx, mut rx) = mpsc::channel::<String>(17);
293 // Drop the sender, close the channel.
294 drop(tx);
295 // Receive the end of channel.
296 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
297 // None received, check we can call `try_next` again.
298 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
299}
300