1 | use futures::channel::{mpsc, oneshot}; |
2 | use futures::executor::{block_on, block_on_stream}; |
3 | use futures::future::{poll_fn, FutureExt}; |
4 | use futures::pin_mut; |
5 | use futures::sink::{Sink, SinkExt}; |
6 | use futures::stream::{Stream, StreamExt}; |
7 | use futures::task::{Context, Poll}; |
8 | use futures_test::task::{new_count_waker, noop_context}; |
9 | use std::sync::atomic::{AtomicUsize, Ordering}; |
10 | use std::sync::{Arc, Mutex}; |
11 | use std::thread; |
12 | |
13 | trait AssertSend: Send {} |
14 | impl AssertSend for mpsc::Sender<i32> {} |
15 | impl AssertSend for mpsc::Receiver<i32> {} |
16 | |
17 | #[test] |
18 | fn send_recv() { |
19 | let (mut tx, rx) = mpsc::channel::<i32>(16); |
20 | |
21 | block_on(tx.send(1)).unwrap(); |
22 | drop(tx); |
23 | let v: Vec<_> = block_on(rx.collect()); |
24 | assert_eq!(v, vec![1]); |
25 | } |
26 | |
27 | #[test] |
28 | fn send_recv_no_buffer() { |
29 | // Run on a task context |
30 | block_on(poll_fn(move |cx| { |
31 | let (tx, rx) = mpsc::channel::<i32>(0); |
32 | pin_mut!(tx, rx); |
33 | |
34 | assert!(tx.as_mut().poll_flush(cx).is_ready()); |
35 | assert!(tx.as_mut().poll_ready(cx).is_ready()); |
36 | |
37 | // Send first message |
38 | assert!(tx.as_mut().start_send(1).is_ok()); |
39 | assert!(tx.as_mut().poll_ready(cx).is_pending()); |
40 | |
41 | // poll_ready said Pending, so no room in buffer, therefore new sends |
42 | // should get rejected with is_full. |
43 | assert!(tx.as_mut().start_send(0).unwrap_err().is_full()); |
44 | assert!(tx.as_mut().poll_ready(cx).is_pending()); |
45 | |
46 | // Take the value |
47 | assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1))); |
48 | assert!(tx.as_mut().poll_ready(cx).is_ready()); |
49 | |
50 | // Send second message |
51 | assert!(tx.as_mut().poll_ready(cx).is_ready()); |
52 | assert!(tx.as_mut().start_send(2).is_ok()); |
53 | assert!(tx.as_mut().poll_ready(cx).is_pending()); |
54 | |
55 | // Take the value |
56 | assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2))); |
57 | assert!(tx.as_mut().poll_ready(cx).is_ready()); |
58 | |
59 | Poll::Ready(()) |
60 | })); |
61 | } |
62 | |
63 | #[test] |
64 | fn send_shared_recv() { |
65 | let (mut tx1, rx) = mpsc::channel::<i32>(16); |
66 | let mut rx = block_on_stream(rx); |
67 | let mut tx2 = tx1.clone(); |
68 | |
69 | block_on(tx1.send(1)).unwrap(); |
70 | assert_eq!(rx.next(), Some(1)); |
71 | |
72 | block_on(tx2.send(2)).unwrap(); |
73 | assert_eq!(rx.next(), Some(2)); |
74 | } |
75 | |
76 | #[test] |
77 | fn send_recv_threads() { |
78 | let (mut tx, rx) = mpsc::channel::<i32>(16); |
79 | |
80 | let t = thread::spawn(move || { |
81 | block_on(tx.send(1)).unwrap(); |
82 | }); |
83 | |
84 | let v: Vec<_> = block_on(rx.take(1).collect()); |
85 | assert_eq!(v, vec![1]); |
86 | |
87 | t.join().unwrap(); |
88 | } |
89 | |
90 | #[test] |
91 | fn send_recv_threads_no_capacity() { |
92 | let (mut tx, rx) = mpsc::channel::<i32>(0); |
93 | |
94 | let t = thread::spawn(move || { |
95 | block_on(tx.send(1)).unwrap(); |
96 | block_on(tx.send(2)).unwrap(); |
97 | }); |
98 | |
99 | let v: Vec<_> = block_on(rx.collect()); |
100 | assert_eq!(v, vec![1, 2]); |
101 | |
102 | t.join().unwrap(); |
103 | } |
104 | |
105 | #[test] |
106 | fn recv_close_gets_none() { |
107 | let (mut tx, mut rx) = mpsc::channel::<i32>(10); |
108 | |
109 | // Run on a task context |
110 | block_on(poll_fn(move |cx| { |
111 | rx.close(); |
112 | |
113 | assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); |
114 | match tx.poll_ready(cx) { |
115 | Poll::Pending | Poll::Ready(Ok(_)) => panic!(), |
116 | Poll::Ready(Err(e)) => assert!(e.is_disconnected()), |
117 | }; |
118 | |
119 | Poll::Ready(()) |
120 | })); |
121 | } |
122 | |
123 | #[test] |
124 | fn tx_close_gets_none() { |
125 | let (_, mut rx) = mpsc::channel::<i32>(10); |
126 | |
127 | // Run on a task context |
128 | block_on(poll_fn(move |cx| { |
129 | assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None)); |
130 | Poll::Ready(()) |
131 | })); |
132 | } |
133 | |
134 | // #[test] |
135 | // fn spawn_sends_items() { |
136 | // let core = local_executor::Core::new(); |
137 | // let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1)))); |
138 | // let rx = mpsc::spawn(stream, &core, 1); |
139 | // assert_eq!(core.run(rx.take(4).collect()).unwrap(), |
140 | // [0, 1, 2, 3]); |
141 | // } |
142 | |
143 | // #[test] |
144 | // fn spawn_kill_dead_stream() { |
145 | // use std::thread; |
146 | // use std::time::Duration; |
147 | // use futures::future::Either; |
148 | // use futures::sync::oneshot; |
149 | // |
150 | // // a stream which never returns anything (maybe a remote end isn't |
151 | // // responding), but dropping it leads to observable side effects |
152 | // // (like closing connections, releasing limited resources, ...) |
153 | // #[derive(Debug)] |
154 | // struct Dead { |
155 | // // when dropped you should get Err(oneshot::Canceled) on the |
156 | // // receiving end |
157 | // done: oneshot::Sender<()>, |
158 | // } |
159 | // impl Stream for Dead { |
160 | // type Item = (); |
161 | // type Error = (); |
162 | // |
163 | // fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |
164 | // Ok(Poll::Pending) |
165 | // } |
166 | // } |
167 | // |
168 | // // need to implement a timeout for the test, as it would hang |
169 | // // forever right now |
170 | // let (timeout_tx, timeout_rx) = oneshot::channel(); |
171 | // thread::spawn(move || { |
172 | // thread::sleep(Duration::from_millis(1000)); |
173 | // let _ = timeout_tx.send(()); |
174 | // }); |
175 | // |
176 | // let core = local_executor::Core::new(); |
177 | // let (done_tx, done_rx) = oneshot::channel(); |
178 | // let stream = Dead{done: done_tx}; |
179 | // let rx = mpsc::spawn(stream, &core, 1); |
180 | // let res = core.run( |
181 | // Ok::<_, ()>(()) |
182 | // .into_future() |
183 | // .then(move |_| { |
184 | // // now drop the spawned stream: maybe some timeout exceeded, |
185 | // // or some connection on this end was closed by the remote |
186 | // // end. |
187 | // drop(rx); |
188 | // // and wait for the spawned stream to release its resources |
189 | // done_rx |
190 | // }) |
191 | // .select2(timeout_rx) |
192 | // ); |
193 | // match res { |
194 | // Err(Either::A((oneshot::Canceled, _))) => (), |
195 | // _ => { |
196 | // panic!("dead stream wasn't canceled"); |
197 | // }, |
198 | // } |
199 | // } |
200 | |
201 | #[test] |
202 | fn stress_shared_unbounded() { |
203 | const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; |
204 | const NTHREADS: u32 = 8; |
205 | let (tx, rx) = mpsc::unbounded::<i32>(); |
206 | |
207 | let t = thread::spawn(move || { |
208 | let result: Vec<_> = block_on(rx.collect()); |
209 | assert_eq!(result.len(), (AMT * NTHREADS) as usize); |
210 | for item in result { |
211 | assert_eq!(item, 1); |
212 | } |
213 | }); |
214 | |
215 | for _ in 0..NTHREADS { |
216 | let tx = tx.clone(); |
217 | |
218 | thread::spawn(move || { |
219 | for _ in 0..AMT { |
220 | tx.unbounded_send(1).unwrap(); |
221 | } |
222 | }); |
223 | } |
224 | |
225 | drop(tx); |
226 | |
227 | t.join().ok().unwrap(); |
228 | } |
229 | |
230 | #[test] |
231 | fn stress_shared_bounded_hard() { |
232 | const AMT: u32 = if cfg!(miri) { 100 } else { 10000 }; |
233 | const NTHREADS: u32 = 8; |
234 | let (tx, rx) = mpsc::channel::<i32>(0); |
235 | |
236 | let t = thread::spawn(move || { |
237 | let result: Vec<_> = block_on(rx.collect()); |
238 | assert_eq!(result.len(), (AMT * NTHREADS) as usize); |
239 | for item in result { |
240 | assert_eq!(item, 1); |
241 | } |
242 | }); |
243 | |
244 | for _ in 0..NTHREADS { |
245 | let mut tx = tx.clone(); |
246 | |
247 | thread::spawn(move || { |
248 | for _ in 0..AMT { |
249 | block_on(tx.send(1)).unwrap(); |
250 | } |
251 | }); |
252 | } |
253 | |
254 | drop(tx); |
255 | |
256 | t.join().unwrap(); |
257 | } |
258 | |
259 | #[allow (clippy::same_item_push)] |
260 | #[test] |
261 | fn stress_receiver_multi_task_bounded_hard() { |
262 | const AMT: usize = if cfg!(miri) { 100 } else { 10_000 }; |
263 | const NTHREADS: u32 = 2; |
264 | |
265 | let (mut tx, rx) = mpsc::channel::<usize>(0); |
266 | let rx = Arc::new(Mutex::new(Some(rx))); |
267 | let n = Arc::new(AtomicUsize::new(0)); |
268 | |
269 | let mut th = vec![]; |
270 | |
271 | for _ in 0..NTHREADS { |
272 | let rx = rx.clone(); |
273 | let n = n.clone(); |
274 | |
275 | let t = thread::spawn(move || { |
276 | let mut i = 0; |
277 | |
278 | loop { |
279 | i += 1; |
280 | let mut rx_opt = rx.lock().unwrap(); |
281 | if let Some(rx) = &mut *rx_opt { |
282 | if i % 5 == 0 { |
283 | let item = block_on(rx.next()); |
284 | |
285 | if item.is_none() { |
286 | *rx_opt = None; |
287 | break; |
288 | } |
289 | |
290 | n.fetch_add(1, Ordering::Relaxed); |
291 | } else { |
292 | // Just poll |
293 | let n = n.clone(); |
294 | match rx.poll_next_unpin(&mut noop_context()) { |
295 | Poll::Ready(Some(_)) => { |
296 | n.fetch_add(1, Ordering::Relaxed); |
297 | } |
298 | Poll::Ready(None) => { |
299 | *rx_opt = None; |
300 | break; |
301 | } |
302 | Poll::Pending => {} |
303 | } |
304 | } |
305 | } else { |
306 | break; |
307 | } |
308 | } |
309 | }); |
310 | |
311 | th.push(t); |
312 | } |
313 | |
314 | for i in 0..AMT { |
315 | block_on(tx.send(i)).unwrap(); |
316 | } |
317 | drop(tx); |
318 | |
319 | for t in th { |
320 | t.join().unwrap(); |
321 | } |
322 | |
323 | assert_eq!(AMT, n.load(Ordering::Relaxed)); |
324 | } |
325 | |
326 | /// Stress test that receiver properly receives all the messages |
327 | /// after sender dropped. |
328 | #[test] |
329 | fn stress_drop_sender() { |
330 | const ITER: usize = if cfg!(miri) { 100 } else { 10000 }; |
331 | |
332 | fn list() -> impl Stream<Item = i32> { |
333 | let (tx, rx) = mpsc::channel(1); |
334 | thread::spawn(move || { |
335 | block_on(send_one_two_three(tx)); |
336 | }); |
337 | rx |
338 | } |
339 | |
340 | for _ in 0..ITER { |
341 | let v: Vec<_> = block_on(list().collect()); |
342 | assert_eq!(v, vec![1, 2, 3]); |
343 | } |
344 | } |
345 | |
346 | async fn send_one_two_three(mut tx: mpsc::Sender<i32>) { |
347 | for i in 1..=3 { |
348 | tx.send(i).await.unwrap(); |
349 | } |
350 | } |
351 | |
352 | /// Stress test that after receiver dropped, |
353 | /// no messages are lost. |
354 | fn stress_close_receiver_iter() { |
355 | let (tx, rx) = mpsc::unbounded(); |
356 | let mut rx = block_on_stream(rx); |
357 | let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel(); |
358 | let th = thread::spawn(move || { |
359 | for i in 1.. { |
360 | if tx.unbounded_send(i).is_err() { |
361 | unwritten_tx.send(i).expect("unwritten_tx" ); |
362 | return; |
363 | } |
364 | } |
365 | }); |
366 | |
367 | // Read one message to make sure thread effectively started |
368 | assert_eq!(Some(1), rx.next()); |
369 | |
370 | rx.close(); |
371 | |
372 | for i in 2.. { |
373 | match rx.next() { |
374 | Some(r) => assert!(i == r), |
375 | None => { |
376 | let unwritten = unwritten_rx.recv().expect("unwritten_rx" ); |
377 | assert_eq!(unwritten, i); |
378 | th.join().unwrap(); |
379 | return; |
380 | } |
381 | } |
382 | } |
383 | } |
384 | |
385 | #[test] |
386 | fn stress_close_receiver() { |
387 | const ITER: usize = if cfg!(miri) { 50 } else { 10000 }; |
388 | |
389 | for _ in 0..ITER { |
390 | stress_close_receiver_iter(); |
391 | } |
392 | } |
393 | |
394 | async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) { |
395 | for i in (1..=count).rev() { |
396 | sender.send(i).await.unwrap(); |
397 | } |
398 | } |
399 | |
400 | /// Tests that after `poll_ready` indicates capacity a channel can always send without waiting. |
401 | #[allow (clippy::same_item_push)] |
402 | #[test] |
403 | fn stress_poll_ready() { |
404 | const AMT: u32 = if cfg!(miri) { 100 } else { 1000 }; |
405 | const NTHREADS: u32 = 8; |
406 | |
407 | /// Run a stress test using the specified channel capacity. |
408 | fn stress(capacity: usize) { |
409 | let (tx, rx) = mpsc::channel(capacity); |
410 | let mut threads = Vec::new(); |
411 | for _ in 0..NTHREADS { |
412 | let sender = tx.clone(); |
413 | threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT)))); |
414 | } |
415 | drop(tx); |
416 | |
417 | let result: Vec<_> = block_on(rx.collect()); |
418 | assert_eq!(result.len() as u32, AMT * NTHREADS); |
419 | |
420 | for thread in threads { |
421 | thread.join().unwrap(); |
422 | } |
423 | } |
424 | |
425 | stress(0); |
426 | stress(1); |
427 | stress(8); |
428 | stress(16); |
429 | } |
430 | |
431 | #[test] |
432 | fn try_send_1() { |
433 | const N: usize = if cfg!(miri) { 100 } else { 3000 }; |
434 | let (mut tx, rx) = mpsc::channel(0); |
435 | |
436 | let t = thread::spawn(move || { |
437 | for i in 0..N { |
438 | loop { |
439 | if tx.try_send(i).is_ok() { |
440 | break; |
441 | } |
442 | } |
443 | } |
444 | }); |
445 | |
446 | let result: Vec<_> = block_on(rx.collect()); |
447 | for (i, j) in result.into_iter().enumerate() { |
448 | assert_eq!(i, j); |
449 | } |
450 | |
451 | t.join().unwrap(); |
452 | } |
453 | |
454 | #[test] |
455 | fn try_send_2() { |
456 | let (mut tx, rx) = mpsc::channel(0); |
457 | let mut rx = block_on_stream(rx); |
458 | |
459 | tx.try_send("hello" ).unwrap(); |
460 | |
461 | let (readytx, readyrx) = oneshot::channel::<()>(); |
462 | |
463 | let th = thread::spawn(move || { |
464 | block_on(poll_fn(|cx| { |
465 | assert!(tx.poll_ready(cx).is_pending()); |
466 | Poll::Ready(()) |
467 | })); |
468 | |
469 | drop(readytx); |
470 | block_on(tx.send("goodbye" )).unwrap(); |
471 | }); |
472 | |
473 | let _ = block_on(readyrx); |
474 | assert_eq!(rx.next(), Some("hello" )); |
475 | assert_eq!(rx.next(), Some("goodbye" )); |
476 | assert_eq!(rx.next(), None); |
477 | |
478 | th.join().unwrap(); |
479 | } |
480 | |
481 | #[test] |
482 | fn try_send_fail() { |
483 | let (mut tx, rx) = mpsc::channel(0); |
484 | let mut rx = block_on_stream(rx); |
485 | |
486 | tx.try_send("hello" ).unwrap(); |
487 | |
488 | // This should fail |
489 | assert!(tx.try_send("fail" ).is_err()); |
490 | |
491 | assert_eq!(rx.next(), Some("hello" )); |
492 | |
493 | tx.try_send("goodbye" ).unwrap(); |
494 | drop(tx); |
495 | |
496 | assert_eq!(rx.next(), Some("goodbye" )); |
497 | assert_eq!(rx.next(), None); |
498 | } |
499 | |
500 | #[test] |
501 | fn try_send_recv() { |
502 | let (mut tx, mut rx) = mpsc::channel(1); |
503 | tx.try_send("hello" ).unwrap(); |
504 | tx.try_send("hello" ).unwrap(); |
505 | tx.try_send("hello" ).unwrap_err(); // should be full |
506 | rx.try_next().unwrap(); |
507 | rx.try_next().unwrap(); |
508 | rx.try_next().unwrap_err(); // should be empty |
509 | tx.try_send("hello" ).unwrap(); |
510 | rx.try_next().unwrap(); |
511 | rx.try_next().unwrap_err(); // should be empty |
512 | } |
513 | |
514 | #[test] |
515 | fn same_receiver() { |
516 | let (mut txa1, _) = mpsc::channel::<i32>(1); |
517 | let txa2 = txa1.clone(); |
518 | |
519 | let (mut txb1, _) = mpsc::channel::<i32>(1); |
520 | let txb2 = txb1.clone(); |
521 | |
522 | assert!(txa1.same_receiver(&txa2)); |
523 | assert!(txb1.same_receiver(&txb2)); |
524 | assert!(!txa1.same_receiver(&txb1)); |
525 | |
526 | txa1.disconnect(); |
527 | txb1.close_channel(); |
528 | |
529 | assert!(!txa1.same_receiver(&txa2)); |
530 | assert!(txb1.same_receiver(&txb2)); |
531 | } |
532 | |
533 | #[test] |
534 | fn is_connected_to() { |
535 | let (txa, rxa) = mpsc::channel::<i32>(1); |
536 | let (txb, rxb) = mpsc::channel::<i32>(1); |
537 | |
538 | assert!(txa.is_connected_to(&rxa)); |
539 | assert!(txb.is_connected_to(&rxb)); |
540 | assert!(!txa.is_connected_to(&rxb)); |
541 | assert!(!txb.is_connected_to(&rxa)); |
542 | } |
543 | |
544 | #[test] |
545 | fn hash_receiver() { |
546 | use std::collections::hash_map::DefaultHasher; |
547 | use std::hash::Hasher; |
548 | |
549 | let mut hasher_a1 = DefaultHasher::new(); |
550 | let mut hasher_a2 = DefaultHasher::new(); |
551 | let mut hasher_b1 = DefaultHasher::new(); |
552 | let mut hasher_b2 = DefaultHasher::new(); |
553 | let (mut txa1, _) = mpsc::channel::<i32>(1); |
554 | let txa2 = txa1.clone(); |
555 | |
556 | let (mut txb1, _) = mpsc::channel::<i32>(1); |
557 | let txb2 = txb1.clone(); |
558 | |
559 | txa1.hash_receiver(&mut hasher_a1); |
560 | let hash_a1 = hasher_a1.finish(); |
561 | txa2.hash_receiver(&mut hasher_a2); |
562 | let hash_a2 = hasher_a2.finish(); |
563 | txb1.hash_receiver(&mut hasher_b1); |
564 | let hash_b1 = hasher_b1.finish(); |
565 | txb2.hash_receiver(&mut hasher_b2); |
566 | let hash_b2 = hasher_b2.finish(); |
567 | |
568 | assert_eq!(hash_a1, hash_a2); |
569 | assert_eq!(hash_b1, hash_b2); |
570 | assert!(hash_a1 != hash_b1); |
571 | |
572 | txa1.disconnect(); |
573 | txb1.close_channel(); |
574 | |
575 | let mut hasher_a1 = DefaultHasher::new(); |
576 | let mut hasher_a2 = DefaultHasher::new(); |
577 | let mut hasher_b1 = DefaultHasher::new(); |
578 | let mut hasher_b2 = DefaultHasher::new(); |
579 | |
580 | txa1.hash_receiver(&mut hasher_a1); |
581 | let hash_a1 = hasher_a1.finish(); |
582 | txa2.hash_receiver(&mut hasher_a2); |
583 | let hash_a2 = hasher_a2.finish(); |
584 | txb1.hash_receiver(&mut hasher_b1); |
585 | let hash_b1 = hasher_b1.finish(); |
586 | txb2.hash_receiver(&mut hasher_b2); |
587 | let hash_b2 = hasher_b2.finish(); |
588 | |
589 | assert!(hash_a1 != hash_a2); |
590 | assert_eq!(hash_b1, hash_b2); |
591 | } |
592 | |
593 | #[test] |
594 | fn send_backpressure() { |
595 | let (waker, counter) = new_count_waker(); |
596 | let mut cx = Context::from_waker(&waker); |
597 | |
598 | let (mut tx, mut rx) = mpsc::channel(1); |
599 | block_on(tx.send(1)).unwrap(); |
600 | |
601 | let mut task = tx.send(2); |
602 | assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); |
603 | assert_eq!(counter, 0); |
604 | |
605 | let item = block_on(rx.next()).unwrap(); |
606 | assert_eq!(item, 1); |
607 | assert_eq!(counter, 1); |
608 | assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); |
609 | |
610 | let item = block_on(rx.next()).unwrap(); |
611 | assert_eq!(item, 2); |
612 | } |
613 | |
614 | #[test] |
615 | fn send_backpressure_multi_senders() { |
616 | let (waker, counter) = new_count_waker(); |
617 | let mut cx = Context::from_waker(&waker); |
618 | |
619 | let (mut tx1, mut rx) = mpsc::channel(1); |
620 | let mut tx2 = tx1.clone(); |
621 | block_on(tx1.send(1)).unwrap(); |
622 | |
623 | let mut task = tx2.send(2); |
624 | assert_eq!(task.poll_unpin(&mut cx), Poll::Pending); |
625 | assert_eq!(counter, 0); |
626 | |
627 | let item = block_on(rx.next()).unwrap(); |
628 | assert_eq!(item, 1); |
629 | assert_eq!(counter, 1); |
630 | assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(()))); |
631 | |
632 | let item = block_on(rx.next()).unwrap(); |
633 | assert_eq!(item, 2); |
634 | } |
635 | |
636 | /// Test that empty channel has zero length and that non-empty channel has length equal to number |
637 | /// of enqueued items |
638 | #[test] |
639 | fn unbounded_len() { |
640 | let (tx, mut rx) = mpsc::unbounded(); |
641 | assert_eq!(tx.len(), 0); |
642 | assert!(tx.is_empty()); |
643 | tx.unbounded_send(1).unwrap(); |
644 | assert_eq!(tx.len(), 1); |
645 | assert!(!tx.is_empty()); |
646 | tx.unbounded_send(2).unwrap(); |
647 | assert_eq!(tx.len(), 2); |
648 | assert!(!tx.is_empty()); |
649 | let item = block_on(rx.next()).unwrap(); |
650 | assert_eq!(item, 1); |
651 | assert_eq!(tx.len(), 1); |
652 | assert!(!tx.is_empty()); |
653 | let item = block_on(rx.next()).unwrap(); |
654 | assert_eq!(item, 2); |
655 | assert_eq!(tx.len(), 0); |
656 | assert!(tx.is_empty()); |
657 | } |
658 | |