1 | #![allow (clippy::redundant_clone)] |
2 | #![warn (rust_2018_idioms)] |
3 | #![cfg (feature = "sync" )] |
4 | |
5 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
6 | use wasm_bindgen_test::wasm_bindgen_test as test; |
7 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
8 | use wasm_bindgen_test::wasm_bindgen_test as maybe_tokio_test; |
9 | |
10 | #[cfg (not(all(target_family = "wasm" , not(target_os = "wasi" ))))] |
11 | use tokio::test as maybe_tokio_test ; |
12 | |
13 | use std::fmt; |
14 | use std::sync::Arc; |
15 | use tokio::sync::mpsc; |
16 | use tokio::sync::mpsc::error::{TryRecvError, TrySendError}; |
17 | use tokio_test::*; |
18 | |
19 | #[cfg (not(target_family = "wasm" ))] |
20 | mod support { |
21 | pub(crate) mod mpsc_stream; |
22 | } |
23 | |
24 | trait AssertSend: Send {} |
25 | impl AssertSend for mpsc::Sender<i32> {} |
26 | impl AssertSend for mpsc::Receiver<i32> {} |
27 | |
28 | #[maybe_tokio_test ] |
29 | async fn send_recv_with_buffer() { |
30 | let (tx, mut rx) = mpsc::channel::<i32>(16); |
31 | |
32 | // Using poll_ready / try_send |
33 | // let permit assert_ready_ok!(tx.reserve()); |
34 | let permit = tx.reserve().await.unwrap(); |
35 | permit.send(1); |
36 | |
37 | // Without poll_ready |
38 | tx.try_send(2).unwrap(); |
39 | |
40 | drop(tx); |
41 | |
42 | let val = rx.recv().await; |
43 | assert_eq!(val, Some(1)); |
44 | |
45 | let val = rx.recv().await; |
46 | assert_eq!(val, Some(2)); |
47 | |
48 | let val = rx.recv().await; |
49 | assert!(val.is_none()); |
50 | } |
51 | |
52 | #[tokio::test ] |
53 | #[cfg (feature = "full" )] |
54 | async fn reserve_disarm() { |
55 | let (tx, mut rx) = mpsc::channel::<i32>(2); |
56 | let tx1 = tx.clone(); |
57 | let tx2 = tx.clone(); |
58 | let tx3 = tx.clone(); |
59 | let tx4 = tx; |
60 | |
61 | // We should be able to `poll_ready` two handles without problem |
62 | let permit1 = assert_ok!(tx1.reserve().await); |
63 | let permit2 = assert_ok!(tx2.reserve().await); |
64 | |
65 | // But a third should not be ready |
66 | let mut r3 = tokio_test::task::spawn(tx3.reserve()); |
67 | assert_pending!(r3.poll()); |
68 | |
69 | let mut r4 = tokio_test::task::spawn(tx4.reserve()); |
70 | assert_pending!(r4.poll()); |
71 | |
72 | // Using one of the reserved slots should allow a new handle to become ready |
73 | permit1.send(1); |
74 | |
75 | // We also need to receive for the slot to be free |
76 | assert!(!r3.is_woken()); |
77 | rx.recv().await.unwrap(); |
78 | // Now there's a free slot! |
79 | assert!(r3.is_woken()); |
80 | assert!(!r4.is_woken()); |
81 | |
82 | // Dropping a permit should also open up a slot |
83 | drop(permit2); |
84 | assert!(r4.is_woken()); |
85 | |
86 | let mut r1 = tokio_test::task::spawn(tx1.reserve()); |
87 | assert_pending!(r1.poll()); |
88 | } |
89 | |
90 | #[tokio::test ] |
91 | #[cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support threads |
92 | async fn send_recv_stream_with_buffer() { |
93 | use tokio_stream::StreamExt; |
94 | |
95 | let (tx, rx) = support::mpsc_stream::channel_stream::<i32>(16); |
96 | let mut rx = Box::pin(rx); |
97 | |
98 | tokio::spawn(async move { |
99 | assert_ok!(tx.send(1).await); |
100 | assert_ok!(tx.send(2).await); |
101 | }); |
102 | |
103 | assert_eq!(Some(1), rx.next().await); |
104 | assert_eq!(Some(2), rx.next().await); |
105 | assert_eq!(None, rx.next().await); |
106 | } |
107 | |
108 | #[tokio::test ] |
109 | #[cfg (feature = "full" )] |
110 | async fn async_send_recv_with_buffer() { |
111 | let (tx, mut rx) = mpsc::channel(16); |
112 | |
113 | tokio::spawn(async move { |
114 | assert_ok!(tx.send(1).await); |
115 | assert_ok!(tx.send(2).await); |
116 | }); |
117 | |
118 | assert_eq!(Some(1), rx.recv().await); |
119 | assert_eq!(Some(2), rx.recv().await); |
120 | assert_eq!(None, rx.recv().await); |
121 | } |
122 | |
123 | #[tokio::test ] |
124 | #[cfg (feature = "full" )] |
125 | async fn async_send_recv_many_with_buffer() { |
126 | let (tx, mut rx) = mpsc::channel(2); |
127 | let mut buffer = Vec::<i32>::with_capacity(3); |
128 | |
129 | // With `limit=0` does not sleep, returns immediately |
130 | assert_eq!(0, rx.recv_many(&mut buffer, 0).await); |
131 | |
132 | let handle = tokio::spawn(async move { |
133 | assert_ok!(tx.send(1).await); |
134 | assert_ok!(tx.send(2).await); |
135 | assert_ok!(tx.send(7).await); |
136 | assert_ok!(tx.send(0).await); |
137 | }); |
138 | |
139 | let limit = 3; |
140 | let mut recv_count = 0usize; |
141 | while recv_count < 4 { |
142 | recv_count += rx.recv_many(&mut buffer, limit).await; |
143 | assert_eq!(buffer.len(), recv_count); |
144 | } |
145 | |
146 | assert_eq!(vec![1, 2, 7, 0], buffer); |
147 | assert_eq!(0, rx.recv_many(&mut buffer, limit).await); |
148 | handle.await.unwrap(); |
149 | } |
150 | |
151 | #[tokio::test ] |
152 | #[cfg (feature = "full" )] |
153 | async fn start_send_past_cap() { |
154 | use std::future::Future; |
155 | |
156 | let mut t1 = tokio_test::task::spawn(()); |
157 | |
158 | let (tx1, mut rx) = mpsc::channel(1); |
159 | let tx2 = tx1.clone(); |
160 | |
161 | assert_ok!(tx1.try_send(())); |
162 | |
163 | let mut r1 = Box::pin(tx1.reserve()); |
164 | t1.enter(|cx, _| assert_pending!(r1.as_mut().poll(cx))); |
165 | |
166 | { |
167 | let mut r2 = tokio_test::task::spawn(tx2.reserve()); |
168 | assert_pending!(r2.poll()); |
169 | |
170 | drop(r1); |
171 | |
172 | assert!(rx.recv().await.is_some()); |
173 | |
174 | assert!(r2.is_woken()); |
175 | assert!(!t1.is_woken()); |
176 | } |
177 | |
178 | drop(tx1); |
179 | drop(tx2); |
180 | |
181 | assert!(rx.recv().await.is_none()); |
182 | } |
183 | |
184 | #[test] |
185 | #[should_panic ] |
186 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
187 | fn buffer_gteq_one() { |
188 | mpsc::channel::<i32>(0); |
189 | } |
190 | |
191 | #[maybe_tokio_test ] |
192 | async fn send_recv_unbounded() { |
193 | let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); |
194 | |
195 | // Using `try_send` |
196 | assert_ok!(tx.send(1)); |
197 | assert_ok!(tx.send(2)); |
198 | |
199 | assert_eq!(rx.recv().await, Some(1)); |
200 | assert_eq!(rx.recv().await, Some(2)); |
201 | |
202 | drop(tx); |
203 | |
204 | assert!(rx.recv().await.is_none()); |
205 | } |
206 | |
207 | #[maybe_tokio_test ] |
208 | async fn send_recv_many_unbounded() { |
209 | let (tx, mut rx) = mpsc::unbounded_channel::<i32>(); |
210 | |
211 | let mut buffer: Vec<i32> = Vec::new(); |
212 | |
213 | // With `limit=0` does not sleep, returns immediately |
214 | rx.recv_many(&mut buffer, 0).await; |
215 | assert_eq!(0, buffer.len()); |
216 | |
217 | assert_ok!(tx.send(7)); |
218 | assert_ok!(tx.send(13)); |
219 | assert_ok!(tx.send(100)); |
220 | assert_ok!(tx.send(1002)); |
221 | |
222 | rx.recv_many(&mut buffer, 0).await; |
223 | assert_eq!(0, buffer.len()); |
224 | |
225 | let mut count = 0; |
226 | while count < 4 { |
227 | count += rx.recv_many(&mut buffer, 1).await; |
228 | } |
229 | assert_eq!(count, 4); |
230 | assert_eq!(vec![7, 13, 100, 1002], buffer); |
231 | let final_capacity = buffer.capacity(); |
232 | assert!(final_capacity > 0); |
233 | |
234 | buffer.clear(); |
235 | |
236 | assert_ok!(tx.send(5)); |
237 | assert_ok!(tx.send(6)); |
238 | assert_ok!(tx.send(7)); |
239 | assert_ok!(tx.send(2)); |
240 | |
241 | // Re-use existing capacity |
242 | count = rx.recv_many(&mut buffer, 32).await; |
243 | |
244 | assert_eq!(final_capacity, buffer.capacity()); |
245 | assert_eq!(count, 4); |
246 | assert_eq!(vec![5, 6, 7, 2], buffer); |
247 | |
248 | drop(tx); |
249 | |
250 | // recv_many will immediately return zero if the channel |
251 | // is closed and no more messages are waiting |
252 | assert_eq!(0, rx.recv_many(&mut buffer, 4).await); |
253 | assert!(rx.recv().await.is_none()); |
254 | } |
255 | |
256 | #[tokio::test ] |
257 | #[cfg (feature = "full" )] |
258 | async fn send_recv_many_bounded_capacity() { |
259 | let mut buffer: Vec<String> = Vec::with_capacity(9); |
260 | let limit = buffer.capacity(); |
261 | let (tx, mut rx) = mpsc::channel(100); |
262 | |
263 | let mut expected: Vec<String> = (0..limit) |
264 | .map(|x: usize| format!("{x}" )) |
265 | .collect::<Vec<_>>(); |
266 | for x in expected.clone() { |
267 | tx.send(x).await.unwrap() |
268 | } |
269 | tx.send("one more" .to_string()).await.unwrap(); |
270 | |
271 | // Here `recv_many` receives all but the last value; |
272 | // the initial capacity is adequate, so the buffer does |
273 | // not increase in side. |
274 | assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await); |
275 | assert_eq!(expected, buffer); |
276 | assert_eq!(limit, buffer.capacity()); |
277 | |
278 | // Receive up more values: |
279 | assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
280 | assert!(buffer.capacity() > limit); |
281 | expected.push("one more" .to_string()); |
282 | assert_eq!(expected, buffer); |
283 | |
284 | tokio::spawn(async move { |
285 | tx.send("final" .to_string()).await.unwrap(); |
286 | }); |
287 | |
288 | // 'tx' is dropped, but `recv_many` is guaranteed not |
289 | // to return 0 as the channel has outstanding permits |
290 | assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
291 | expected.push("final" .to_string()); |
292 | assert_eq!(expected, buffer); |
293 | // The channel is now closed and `recv_many` returns 0. |
294 | assert_eq!(0, rx.recv_many(&mut buffer, limit).await); |
295 | assert_eq!(expected, buffer); |
296 | } |
297 | |
298 | #[tokio::test ] |
299 | #[cfg (feature = "full" )] |
300 | async fn send_recv_many_unbounded_capacity() { |
301 | let mut buffer: Vec<String> = Vec::with_capacity(9); // capacity >= 9 |
302 | let limit = buffer.capacity(); |
303 | let (tx, mut rx) = mpsc::unbounded_channel(); |
304 | |
305 | let mut expected: Vec<String> = (0..limit) |
306 | .map(|x: usize| format!("{x}" )) |
307 | .collect::<Vec<_>>(); |
308 | for x in expected.clone() { |
309 | tx.send(x).unwrap() |
310 | } |
311 | tx.send("one more" .to_string()).unwrap(); |
312 | |
313 | // Here `recv_many` receives all but the last value; |
314 | // the initial capacity is adequate, so the buffer does |
315 | // not increase in side. |
316 | assert_eq!(buffer.capacity(), rx.recv_many(&mut buffer, limit).await); |
317 | assert_eq!(expected, buffer); |
318 | assert_eq!(limit, buffer.capacity()); |
319 | |
320 | // Receive up more values: |
321 | assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
322 | assert!(buffer.capacity() > limit); |
323 | expected.push("one more" .to_string()); |
324 | assert_eq!(expected, buffer); |
325 | |
326 | tokio::spawn(async move { |
327 | tx.send("final" .to_string()).unwrap(); |
328 | }); |
329 | |
330 | // 'tx' is dropped, but `recv_many` is guaranteed not |
331 | // to return 0 as the channel has outstanding permits |
332 | assert_eq!(1, rx.recv_many(&mut buffer, limit).await); |
333 | expected.push("final" .to_string()); |
334 | assert_eq!(expected, buffer); |
335 | // The channel is now closed and `recv_many` returns 0. |
336 | assert_eq!(0, rx.recv_many(&mut buffer, limit).await); |
337 | assert_eq!(expected, buffer); |
338 | } |
339 | |
340 | #[tokio::test ] |
341 | #[cfg (feature = "full" )] |
342 | async fn async_send_recv_unbounded() { |
343 | let (tx, mut rx) = mpsc::unbounded_channel(); |
344 | |
345 | tokio::spawn(async move { |
346 | assert_ok!(tx.send(1)); |
347 | assert_ok!(tx.send(2)); |
348 | }); |
349 | |
350 | assert_eq!(Some(1), rx.recv().await); |
351 | assert_eq!(Some(2), rx.recv().await); |
352 | assert_eq!(None, rx.recv().await); |
353 | } |
354 | |
355 | #[tokio::test ] |
356 | #[cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support threads |
357 | async fn send_recv_stream_unbounded() { |
358 | use tokio_stream::StreamExt; |
359 | |
360 | let (tx, rx) = support::mpsc_stream::unbounded_channel_stream::<i32>(); |
361 | |
362 | let mut rx = Box::pin(rx); |
363 | |
364 | tokio::spawn(async move { |
365 | assert_ok!(tx.send(1)); |
366 | assert_ok!(tx.send(2)); |
367 | }); |
368 | |
369 | assert_eq!(Some(1), rx.next().await); |
370 | assert_eq!(Some(2), rx.next().await); |
371 | assert_eq!(None, rx.next().await); |
372 | } |
373 | |
374 | #[maybe_tokio_test ] |
375 | async fn no_t_bounds_buffer() { |
376 | struct NoImpls; |
377 | |
378 | let (tx, mut rx) = mpsc::channel(100); |
379 | |
380 | // sender should be Debug even though T isn't Debug |
381 | is_debug(&tx); |
382 | // same with Receiver |
383 | is_debug(&rx); |
384 | // and sender should be Clone even though T isn't Clone |
385 | assert!(tx.clone().try_send(NoImpls).is_ok()); |
386 | |
387 | assert!(rx.recv().await.is_some()); |
388 | } |
389 | |
390 | #[maybe_tokio_test ] |
391 | async fn no_t_bounds_unbounded() { |
392 | struct NoImpls; |
393 | |
394 | let (tx, mut rx) = mpsc::unbounded_channel(); |
395 | |
396 | // sender should be Debug even though T isn't Debug |
397 | is_debug(&tx); |
398 | // same with Receiver |
399 | is_debug(&rx); |
400 | // and sender should be Clone even though T isn't Clone |
401 | assert!(tx.clone().send(NoImpls).is_ok()); |
402 | |
403 | assert!(rx.recv().await.is_some()); |
404 | } |
405 | |
406 | #[tokio::test ] |
407 | #[cfg (feature = "full" )] |
408 | async fn send_recv_buffer_limited() { |
409 | let (tx, mut rx) = mpsc::channel::<i32>(1); |
410 | |
411 | // Reserve capacity |
412 | let p1 = assert_ok!(tx.reserve().await); |
413 | |
414 | // Send first message |
415 | p1.send(1); |
416 | |
417 | // Not ready |
418 | let mut p2 = tokio_test::task::spawn(tx.reserve()); |
419 | assert_pending!(p2.poll()); |
420 | |
421 | // Take the value |
422 | assert!(rx.recv().await.is_some()); |
423 | |
424 | // Notified |
425 | assert!(p2.is_woken()); |
426 | |
427 | // Trying to send fails |
428 | assert_err!(tx.try_send(1337)); |
429 | |
430 | // Send second |
431 | let permit = assert_ready_ok!(p2.poll()); |
432 | permit.send(2); |
433 | |
434 | assert!(rx.recv().await.is_some()); |
435 | } |
436 | |
437 | #[maybe_tokio_test ] |
438 | async fn recv_close_gets_none_idle() { |
439 | let (tx, mut rx) = mpsc::channel::<i32>(10); |
440 | |
441 | rx.close(); |
442 | |
443 | assert!(rx.recv().await.is_none()); |
444 | |
445 | assert_err!(tx.send(1).await); |
446 | } |
447 | |
448 | #[tokio::test ] |
449 | #[cfg (feature = "full" )] |
450 | async fn recv_close_gets_none_reserved() { |
451 | let (tx1, mut rx) = mpsc::channel::<i32>(1); |
452 | let tx2 = tx1.clone(); |
453 | |
454 | let permit1 = assert_ok!(tx1.reserve().await); |
455 | let mut permit2 = tokio_test::task::spawn(tx2.reserve()); |
456 | assert_pending!(permit2.poll()); |
457 | |
458 | rx.close(); |
459 | |
460 | assert!(permit2.is_woken()); |
461 | assert_ready_err!(permit2.poll()); |
462 | |
463 | { |
464 | let mut recv = tokio_test::task::spawn(rx.recv()); |
465 | assert_pending!(recv.poll()); |
466 | |
467 | permit1.send(123); |
468 | assert!(recv.is_woken()); |
469 | |
470 | let v = assert_ready!(recv.poll()); |
471 | assert_eq!(v, Some(123)); |
472 | } |
473 | |
474 | assert!(rx.recv().await.is_none()); |
475 | } |
476 | |
477 | #[maybe_tokio_test ] |
478 | async fn tx_close_gets_none() { |
479 | let (_, mut rx) = mpsc::channel::<i32>(10); |
480 | assert!(rx.recv().await.is_none()); |
481 | } |
482 | |
483 | #[maybe_tokio_test ] |
484 | async fn try_send_fail() { |
485 | let (tx, mut rx) = mpsc::channel(1); |
486 | |
487 | tx.try_send("hello" ).unwrap(); |
488 | |
489 | // This should fail |
490 | match assert_err!(tx.try_send("fail" )) { |
491 | TrySendError::Full(..) => {} |
492 | _ => panic!(), |
493 | } |
494 | |
495 | assert_eq!(rx.recv().await, Some("hello" )); |
496 | |
497 | assert_ok!(tx.try_send("goodbye" )); |
498 | drop(tx); |
499 | |
500 | assert_eq!(rx.recv().await, Some("goodbye" )); |
501 | assert!(rx.recv().await.is_none()); |
502 | } |
503 | |
504 | #[maybe_tokio_test ] |
505 | async fn try_send_fail_with_try_recv() { |
506 | let (tx, mut rx) = mpsc::channel(1); |
507 | |
508 | tx.try_send("hello" ).unwrap(); |
509 | |
510 | // This should fail |
511 | match assert_err!(tx.try_send("fail" )) { |
512 | TrySendError::Full(..) => {} |
513 | _ => panic!(), |
514 | } |
515 | |
516 | assert_eq!(rx.try_recv(), Ok("hello" )); |
517 | |
518 | assert_ok!(tx.try_send("goodbye" )); |
519 | drop(tx); |
520 | |
521 | assert_eq!(rx.try_recv(), Ok("goodbye" )); |
522 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); |
523 | } |
524 | |
525 | #[maybe_tokio_test ] |
526 | async fn try_reserve_fails() { |
527 | let (tx, mut rx) = mpsc::channel(1); |
528 | |
529 | let permit = tx.try_reserve().unwrap(); |
530 | |
531 | // This should fail |
532 | match assert_err!(tx.try_reserve()) { |
533 | TrySendError::Full(()) => {} |
534 | _ => panic!(), |
535 | } |
536 | |
537 | permit.send("foo" ); |
538 | |
539 | assert_eq!(rx.recv().await, Some("foo" )); |
540 | |
541 | // Dropping permit releases the slot. |
542 | let permit = tx.try_reserve().unwrap(); |
543 | drop(permit); |
544 | |
545 | let _permit = tx.try_reserve().unwrap(); |
546 | } |
547 | |
548 | #[tokio::test ] |
549 | #[cfg (feature = "full" )] |
550 | async fn drop_permit_releases_permit() { |
551 | // poll_ready reserves capacity, ensure that the capacity is released if tx |
552 | // is dropped w/o sending a value. |
553 | let (tx1, _rx) = mpsc::channel::<i32>(1); |
554 | let tx2 = tx1.clone(); |
555 | |
556 | let permit = assert_ok!(tx1.reserve().await); |
557 | |
558 | let mut reserve2 = tokio_test::task::spawn(tx2.reserve()); |
559 | assert_pending!(reserve2.poll()); |
560 | |
561 | drop(permit); |
562 | |
563 | assert!(reserve2.is_woken()); |
564 | assert_ready_ok!(reserve2.poll()); |
565 | } |
566 | |
567 | #[maybe_tokio_test ] |
568 | async fn dropping_rx_closes_channel() { |
569 | let (tx, rx) = mpsc::channel(100); |
570 | |
571 | let msg = Arc::new(()); |
572 | assert_ok!(tx.try_send(msg.clone())); |
573 | |
574 | drop(rx); |
575 | assert_err!(tx.reserve().await); |
576 | assert_eq!(1, Arc::strong_count(&msg)); |
577 | } |
578 | |
579 | #[test] |
580 | fn dropping_rx_closes_channel_for_try() { |
581 | let (tx, rx) = mpsc::channel(100); |
582 | |
583 | let msg = Arc::new(()); |
584 | tx.try_send(msg.clone()).unwrap(); |
585 | |
586 | drop(rx); |
587 | |
588 | assert!(matches!( |
589 | tx.try_send(msg.clone()), |
590 | Err(TrySendError::Closed(_)) |
591 | )); |
592 | assert!(matches!(tx.try_reserve(), Err(TrySendError::Closed(_)))); |
593 | assert!(matches!( |
594 | tx.try_reserve_owned(), |
595 | Err(TrySendError::Closed(_)) |
596 | )); |
597 | |
598 | assert_eq!(1, Arc::strong_count(&msg)); |
599 | } |
600 | |
601 | #[test] |
602 | fn unconsumed_messages_are_dropped() { |
603 | let msg = Arc::new(()); |
604 | |
605 | let (tx, rx) = mpsc::channel(100); |
606 | |
607 | tx.try_send(msg.clone()).unwrap(); |
608 | |
609 | assert_eq!(2, Arc::strong_count(&msg)); |
610 | |
611 | drop((tx, rx)); |
612 | |
613 | assert_eq!(1, Arc::strong_count(&msg)); |
614 | } |
615 | |
616 | #[test] |
617 | #[cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support threads |
618 | fn blocking_recv() { |
619 | let (tx, mut rx) = mpsc::channel::<u8>(1); |
620 | |
621 | let sync_code = std::thread::spawn(move || { |
622 | assert_eq!(Some(10), rx.blocking_recv()); |
623 | }); |
624 | |
625 | tokio::runtime::Runtime::new() |
626 | .unwrap() |
627 | .block_on(async move { |
628 | let _ = tx.send(10).await; |
629 | }); |
630 | sync_code.join().unwrap() |
631 | } |
632 | |
633 | #[tokio::test ] |
634 | #[should_panic ] |
635 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
636 | async fn blocking_recv_async() { |
637 | let (_tx, mut rx) = mpsc::channel::<()>(1); |
638 | let _ = rx.blocking_recv(); |
639 | } |
640 | |
641 | #[test] |
642 | #[cfg (all(feature = "full" , not(target_os = "wasi" )))] // Wasi doesn't support threads |
643 | fn blocking_send() { |
644 | let (tx, mut rx) = mpsc::channel::<u8>(1); |
645 | |
646 | let sync_code = std::thread::spawn(move || { |
647 | tx.blocking_send(10).unwrap(); |
648 | }); |
649 | |
650 | tokio::runtime::Runtime::new() |
651 | .unwrap() |
652 | .block_on(async move { |
653 | assert_eq!(Some(10), rx.recv().await); |
654 | }); |
655 | sync_code.join().unwrap() |
656 | } |
657 | |
658 | #[tokio::test ] |
659 | #[should_panic ] |
660 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
661 | async fn blocking_send_async() { |
662 | let (tx, _rx) = mpsc::channel::<()>(1); |
663 | let _ = tx.blocking_send(()); |
664 | } |
665 | |
666 | #[tokio::test ] |
667 | #[cfg (feature = "full" )] |
668 | async fn ready_close_cancel_bounded() { |
669 | let (tx, mut rx) = mpsc::channel::<()>(100); |
670 | let _tx2 = tx.clone(); |
671 | |
672 | let permit = assert_ok!(tx.reserve().await); |
673 | |
674 | rx.close(); |
675 | |
676 | let mut recv = tokio_test::task::spawn(rx.recv()); |
677 | assert_pending!(recv.poll()); |
678 | |
679 | drop(permit); |
680 | |
681 | assert!(recv.is_woken()); |
682 | let val = assert_ready!(recv.poll()); |
683 | assert!(val.is_none()); |
684 | } |
685 | |
686 | #[tokio::test ] |
687 | #[cfg (feature = "full" )] |
688 | async fn permit_available_not_acquired_close() { |
689 | let (tx1, mut rx) = mpsc::channel::<()>(1); |
690 | let tx2 = tx1.clone(); |
691 | |
692 | let permit1 = assert_ok!(tx1.reserve().await); |
693 | |
694 | let mut permit2 = tokio_test::task::spawn(tx2.reserve()); |
695 | assert_pending!(permit2.poll()); |
696 | |
697 | rx.close(); |
698 | |
699 | drop(permit1); |
700 | assert!(permit2.is_woken()); |
701 | |
702 | drop(permit2); |
703 | assert!(rx.recv().await.is_none()); |
704 | } |
705 | |
706 | #[test] |
707 | fn try_recv_bounded() { |
708 | let (tx, mut rx) = mpsc::channel(5); |
709 | |
710 | tx.try_send("hello" ).unwrap(); |
711 | tx.try_send("hello" ).unwrap(); |
712 | tx.try_send("hello" ).unwrap(); |
713 | tx.try_send("hello" ).unwrap(); |
714 | tx.try_send("hello" ).unwrap(); |
715 | assert!(tx.try_send("hello" ).is_err()); |
716 | |
717 | assert_eq!(Ok("hello" ), rx.try_recv()); |
718 | assert_eq!(Ok("hello" ), rx.try_recv()); |
719 | assert_eq!(Ok("hello" ), rx.try_recv()); |
720 | assert_eq!(Ok("hello" ), rx.try_recv()); |
721 | assert_eq!(Ok("hello" ), rx.try_recv()); |
722 | assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
723 | |
724 | tx.try_send("hello" ).unwrap(); |
725 | tx.try_send("hello" ).unwrap(); |
726 | tx.try_send("hello" ).unwrap(); |
727 | tx.try_send("hello" ).unwrap(); |
728 | assert_eq!(Ok("hello" ), rx.try_recv()); |
729 | tx.try_send("hello" ).unwrap(); |
730 | tx.try_send("hello" ).unwrap(); |
731 | assert!(tx.try_send("hello" ).is_err()); |
732 | assert_eq!(Ok("hello" ), rx.try_recv()); |
733 | assert_eq!(Ok("hello" ), rx.try_recv()); |
734 | assert_eq!(Ok("hello" ), rx.try_recv()); |
735 | assert_eq!(Ok("hello" ), rx.try_recv()); |
736 | assert_eq!(Ok("hello" ), rx.try_recv()); |
737 | assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
738 | |
739 | tx.try_send("hello" ).unwrap(); |
740 | tx.try_send("hello" ).unwrap(); |
741 | tx.try_send("hello" ).unwrap(); |
742 | drop(tx); |
743 | assert_eq!(Ok("hello" ), rx.try_recv()); |
744 | assert_eq!(Ok("hello" ), rx.try_recv()); |
745 | assert_eq!(Ok("hello" ), rx.try_recv()); |
746 | assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
747 | } |
748 | |
749 | #[test] |
750 | fn try_recv_unbounded() { |
751 | for num in 0..100 { |
752 | let (tx, mut rx) = mpsc::unbounded_channel(); |
753 | |
754 | for i in 0..num { |
755 | tx.send(i).unwrap(); |
756 | } |
757 | |
758 | for i in 0..num { |
759 | assert_eq!(rx.try_recv(), Ok(i)); |
760 | } |
761 | |
762 | assert_eq!(rx.try_recv(), Err(TryRecvError::Empty)); |
763 | drop(tx); |
764 | assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected)); |
765 | } |
766 | } |
767 | |
768 | #[test] |
769 | fn try_recv_close_while_empty_bounded() { |
770 | let (tx, mut rx) = mpsc::channel::<()>(5); |
771 | |
772 | assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
773 | drop(tx); |
774 | assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
775 | } |
776 | |
777 | #[test] |
778 | fn try_recv_close_while_empty_unbounded() { |
779 | let (tx, mut rx) = mpsc::unbounded_channel::<()>(); |
780 | |
781 | assert_eq!(Err(TryRecvError::Empty), rx.try_recv()); |
782 | drop(tx); |
783 | assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv()); |
784 | } |
785 | |
786 | #[tokio::test (start_paused = true)] |
787 | #[cfg (feature = "full" )] |
788 | async fn recv_timeout() { |
789 | use tokio::sync::mpsc::error::SendTimeoutError::{Closed, Timeout}; |
790 | use tokio::time::Duration; |
791 | |
792 | let (tx, rx) = mpsc::channel(5); |
793 | |
794 | assert_eq!(tx.send_timeout(10, Duration::from_secs(1)).await, Ok(())); |
795 | assert_eq!(tx.send_timeout(20, Duration::from_secs(1)).await, Ok(())); |
796 | assert_eq!(tx.send_timeout(30, Duration::from_secs(1)).await, Ok(())); |
797 | assert_eq!(tx.send_timeout(40, Duration::from_secs(1)).await, Ok(())); |
798 | assert_eq!(tx.send_timeout(50, Duration::from_secs(1)).await, Ok(())); |
799 | assert_eq!( |
800 | tx.send_timeout(60, Duration::from_secs(1)).await, |
801 | Err(Timeout(60)) |
802 | ); |
803 | |
804 | drop(rx); |
805 | assert_eq!( |
806 | tx.send_timeout(70, Duration::from_secs(1)).await, |
807 | Err(Closed(70)) |
808 | ); |
809 | } |
810 | |
811 | #[test] |
812 | #[should_panic = "there is no reactor running, must be called from the context of a Tokio 1.x runtime" ] |
813 | #[cfg (not(target_family = "wasm" ))] // wasm currently doesn't support unwinding |
814 | fn recv_timeout_panic() { |
815 | use futures::future::FutureExt; |
816 | use tokio::time::Duration; |
817 | |
818 | let (tx, _rx) = mpsc::channel(5); |
819 | tx.send_timeout(10, Duration::from_secs(1)).now_or_never(); |
820 | } |
821 | |
822 | // Tests that channel `capacity` changes and `max_capacity` stays the same |
823 | #[tokio::test ] |
824 | async fn test_tx_capacity() { |
825 | let (tx, _rx) = mpsc::channel::<()>(10); |
826 | // both capacities are same before |
827 | assert_eq!(tx.capacity(), 10); |
828 | assert_eq!(tx.max_capacity(), 10); |
829 | |
830 | let _permit = tx.reserve().await.unwrap(); |
831 | // after reserve, only capacity should drop by one |
832 | assert_eq!(tx.capacity(), 9); |
833 | assert_eq!(tx.max_capacity(), 10); |
834 | |
835 | tx.send(()).await.unwrap(); |
836 | // after send, capacity should drop by one again |
837 | assert_eq!(tx.capacity(), 8); |
838 | assert_eq!(tx.max_capacity(), 10); |
839 | } |
840 | |
841 | fn is_debug<T: fmt::Debug>(_: &T) {} |
842 | |