1#![allow(clippy::redundant_clone)]
2#![warn(rust_2018_idioms)]
3#![cfg(feature = "sync")]
4
5#[cfg(all(target_family = "wasm", not(target_os = "wasi")))]
6use wasm_bindgen_test::wasm_bindgen_test as test;
7
8use std::sync::atomic::AtomicUsize;
9use std::sync::atomic::Ordering::{Acquire, Release};
10use tokio::sync::mpsc::{self, channel, unbounded_channel};
11use tokio::sync::oneshot;
12
13#[tokio::test]
14async fn weak_sender() {
15 let (tx, mut rx) = channel(11);
16
17 let tx_weak = tokio::spawn(async move {
18 let tx_weak = tx.clone().downgrade();
19
20 for i in 0..10 {
21 if tx.send(i).await.is_err() {
22 return None;
23 }
24 }
25
26 let tx2 = tx_weak
27 .upgrade()
28 .expect("expected to be able to upgrade tx_weak");
29 let _ = tx2.send(20).await;
30 let tx_weak = tx2.downgrade();
31
32 Some(tx_weak)
33 })
34 .await
35 .unwrap();
36
37 for i in 0..12 {
38 let recvd = rx.recv().await;
39
40 match recvd {
41 Some(msg) => {
42 if i == 10 {
43 assert_eq!(msg, 20);
44 }
45 }
46 None => {
47 assert_eq!(i, 11);
48 break;
49 }
50 }
51 }
52
53 let tx_weak = tx_weak.unwrap();
54 let upgraded = tx_weak.upgrade();
55 assert!(upgraded.is_none());
56}
57
58#[tokio::test]
59async fn actor_weak_sender() {
60 pub struct MyActor {
61 receiver: mpsc::Receiver<ActorMessage>,
62 sender: mpsc::WeakSender<ActorMessage>,
63 next_id: u32,
64 pub received_self_msg: bool,
65 }
66
67 enum ActorMessage {
68 GetUniqueId { respond_to: oneshot::Sender<u32> },
69 SelfMessage {},
70 }
71
72 impl MyActor {
73 fn new(
74 receiver: mpsc::Receiver<ActorMessage>,
75 sender: mpsc::WeakSender<ActorMessage>,
76 ) -> Self {
77 MyActor {
78 receiver,
79 sender,
80 next_id: 0,
81 received_self_msg: false,
82 }
83 }
84
85 fn handle_message(&mut self, msg: ActorMessage) {
86 match msg {
87 ActorMessage::GetUniqueId { respond_to } => {
88 self.next_id += 1;
89
90 // The `let _ =` ignores any errors when sending.
91 //
92 // This can happen if the `select!` macro is used
93 // to cancel waiting for the response.
94 let _ = respond_to.send(self.next_id);
95 }
96 ActorMessage::SelfMessage { .. } => {
97 self.received_self_msg = true;
98 }
99 }
100 }
101
102 async fn send_message_to_self(&mut self) {
103 let msg = ActorMessage::SelfMessage {};
104
105 let sender = self.sender.clone();
106
107 // cannot move self.sender here
108 if let Some(sender) = sender.upgrade() {
109 let _ = sender.send(msg).await;
110 self.sender = sender.downgrade();
111 }
112 }
113
114 async fn run(&mut self) {
115 let mut i = 0;
116 while let Some(msg) = self.receiver.recv().await {
117 self.handle_message(msg);
118
119 if i == 0 {
120 self.send_message_to_self().await;
121 }
122
123 i += 1
124 }
125
126 assert!(self.received_self_msg);
127 }
128 }
129
130 #[derive(Clone)]
131 pub struct MyActorHandle {
132 sender: mpsc::Sender<ActorMessage>,
133 }
134
135 impl MyActorHandle {
136 pub fn new() -> (Self, MyActor) {
137 let (sender, receiver) = mpsc::channel(8);
138 let actor = MyActor::new(receiver, sender.clone().downgrade());
139
140 (Self { sender }, actor)
141 }
142
143 pub async fn get_unique_id(&self) -> u32 {
144 let (send, recv) = oneshot::channel();
145 let msg = ActorMessage::GetUniqueId { respond_to: send };
146
147 // Ignore send errors. If this send fails, so does the
148 // recv.await below. There's no reason to check the
149 // failure twice.
150 let _ = self.sender.send(msg).await;
151 recv.await.expect("Actor task has been killed")
152 }
153 }
154
155 let (handle, mut actor) = MyActorHandle::new();
156
157 let actor_handle = tokio::spawn(async move { actor.run().await });
158
159 let _ = tokio::spawn(async move {
160 let _ = handle.get_unique_id().await;
161 drop(handle);
162 })
163 .await;
164
165 let _ = actor_handle.await;
166}
167
168static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0);
169
170#[derive(Debug)]
171struct Msg;
172
173impl Drop for Msg {
174 fn drop(&mut self) {
175 NUM_DROPPED.fetch_add(1, Release);
176 }
177}
178
179// Tests that no pending messages are put onto the channel after `Rx` was
180// dropped.
181//
182// Note: After the introduction of `WeakSender`, which internally
183// used `Arc` and doesn't call a drop of the channel after the last strong
184// `Sender` was dropped while more than one `WeakSender` remains, we want to
185// ensure that no messages are kept in the channel, which were sent after
186// the receiver was dropped.
187#[tokio::test]
188async fn test_msgs_dropped_on_rx_drop() {
189 let (tx, mut rx) = mpsc::channel(3);
190
191 tx.send(Msg {}).await.unwrap();
192 tx.send(Msg {}).await.unwrap();
193
194 // This msg will be pending and should be dropped when `rx` is dropped
195 let sent_fut = tx.send(Msg {});
196
197 let _ = rx.recv().await.unwrap();
198 let _ = rx.recv().await.unwrap();
199
200 sent_fut.await.unwrap();
201
202 drop(rx);
203
204 assert_eq!(NUM_DROPPED.load(Acquire), 3);
205
206 // This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
207 assert!(tx.send(Msg {}).await.is_err());
208
209 assert_eq!(NUM_DROPPED.load(Acquire), 4);
210}
211
212// Tests that a `WeakSender` is upgradeable when other `Sender`s exist.
213#[test]
214fn downgrade_upgrade_sender_success() {
215 let (tx, _rx) = mpsc::channel::<i32>(1);
216 let weak_tx = tx.downgrade();
217 assert!(weak_tx.upgrade().is_some());
218}
219
220// Tests that a `WeakSender` fails to upgrade when no other `Sender` exists.
221#[test]
222fn downgrade_upgrade_sender_failure() {
223 let (tx, _rx) = mpsc::channel::<i32>(1);
224 let weak_tx = tx.downgrade();
225 drop(tx);
226 assert!(weak_tx.upgrade().is_none());
227}
228
229// Tests that a `WeakSender` cannot be upgraded after a `Sender` was dropped,
230// which existed at the time of the `downgrade` call.
231#[test]
232fn downgrade_drop_upgrade() {
233 let (tx, _rx) = mpsc::channel::<i32>(1);
234
235 // the cloned `Tx` is dropped right away
236 let weak_tx = tx.clone().downgrade();
237 drop(tx);
238 assert!(weak_tx.upgrade().is_none());
239}
240
241// Tests that we can upgrade a weak sender with an outstanding permit
242// but no other strong senders.
243#[tokio::test]
244async fn downgrade_get_permit_upgrade_no_senders() {
245 let (tx, _rx) = mpsc::channel::<i32>(1);
246 let weak_tx = tx.downgrade();
247 let _permit = tx.reserve_owned().await.unwrap();
248 assert!(weak_tx.upgrade().is_some());
249}
250
251// Tests that you can downgrade and upgrade a sender with an outstanding permit
252// but no other senders left.
253#[tokio::test]
254async fn downgrade_upgrade_get_permit_no_senders() {
255 let (tx, _rx) = mpsc::channel::<i32>(1);
256 let tx2 = tx.clone();
257 let _permit = tx.reserve_owned().await.unwrap();
258 let weak_tx = tx2.downgrade();
259 drop(tx2);
260 assert!(weak_tx.upgrade().is_some());
261}
262
263// Tests that `downgrade` does not change the `tx_count` of the channel.
264#[test]
265fn test_tx_count_weak_sender() {
266 let (tx, _rx) = mpsc::channel::<i32>(1);
267 let tx_weak = tx.downgrade();
268 let tx_weak2 = tx.downgrade();
269 drop(tx);
270
271 assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
272}
273
274#[tokio::test]
275async fn weak_unbounded_sender() {
276 let (tx, mut rx) = unbounded_channel();
277
278 let tx_weak = tokio::spawn(async move {
279 let tx_weak = tx.clone().downgrade();
280
281 for i in 0..10 {
282 if tx.send(i).is_err() {
283 return None;
284 }
285 }
286
287 let tx2 = tx_weak
288 .upgrade()
289 .expect("expected to be able to upgrade tx_weak");
290 let _ = tx2.send(20);
291 let tx_weak = tx2.downgrade();
292
293 Some(tx_weak)
294 })
295 .await
296 .unwrap();
297
298 for i in 0..12 {
299 let recvd = rx.recv().await;
300
301 match recvd {
302 Some(msg) => {
303 if i == 10 {
304 assert_eq!(msg, 20);
305 }
306 }
307 None => {
308 assert_eq!(i, 11);
309 break;
310 }
311 }
312 }
313
314 let tx_weak = tx_weak.unwrap();
315 let upgraded = tx_weak.upgrade();
316 assert!(upgraded.is_none());
317}
318
319#[tokio::test]
320async fn actor_weak_unbounded_sender() {
321 pub struct MyActor {
322 receiver: mpsc::UnboundedReceiver<ActorMessage>,
323 sender: mpsc::WeakUnboundedSender<ActorMessage>,
324 next_id: u32,
325 pub received_self_msg: bool,
326 }
327
328 enum ActorMessage {
329 GetUniqueId { respond_to: oneshot::Sender<u32> },
330 SelfMessage {},
331 }
332
333 impl MyActor {
334 fn new(
335 receiver: mpsc::UnboundedReceiver<ActorMessage>,
336 sender: mpsc::WeakUnboundedSender<ActorMessage>,
337 ) -> Self {
338 MyActor {
339 receiver,
340 sender,
341 next_id: 0,
342 received_self_msg: false,
343 }
344 }
345
346 fn handle_message(&mut self, msg: ActorMessage) {
347 match msg {
348 ActorMessage::GetUniqueId { respond_to } => {
349 self.next_id += 1;
350
351 // The `let _ =` ignores any errors when sending.
352 //
353 // This can happen if the `select!` macro is used
354 // to cancel waiting for the response.
355 let _ = respond_to.send(self.next_id);
356 }
357 ActorMessage::SelfMessage { .. } => {
358 self.received_self_msg = true;
359 }
360 }
361 }
362
363 async fn send_message_to_self(&mut self) {
364 let msg = ActorMessage::SelfMessage {};
365
366 let sender = self.sender.clone();
367
368 // cannot move self.sender here
369 if let Some(sender) = sender.upgrade() {
370 let _ = sender.send(msg);
371 self.sender = sender.downgrade();
372 }
373 }
374
375 async fn run(&mut self) {
376 let mut i = 0;
377 while let Some(msg) = self.receiver.recv().await {
378 self.handle_message(msg);
379
380 if i == 0 {
381 self.send_message_to_self().await;
382 }
383
384 i += 1
385 }
386
387 assert!(self.received_self_msg);
388 }
389 }
390
391 #[derive(Clone)]
392 pub struct MyActorHandle {
393 sender: mpsc::UnboundedSender<ActorMessage>,
394 }
395
396 impl MyActorHandle {
397 pub fn new() -> (Self, MyActor) {
398 let (sender, receiver) = mpsc::unbounded_channel();
399 let actor = MyActor::new(receiver, sender.clone().downgrade());
400
401 (Self { sender }, actor)
402 }
403
404 pub async fn get_unique_id(&self) -> u32 {
405 let (send, recv) = oneshot::channel();
406 let msg = ActorMessage::GetUniqueId { respond_to: send };
407
408 // Ignore send errors. If this send fails, so does the
409 // recv.await below. There's no reason to check the
410 // failure twice.
411 let _ = self.sender.send(msg);
412 recv.await.expect("Actor task has been killed")
413 }
414 }
415
416 let (handle, mut actor) = MyActorHandle::new();
417
418 let actor_handle = tokio::spawn(async move { actor.run().await });
419
420 let _ = tokio::spawn(async move {
421 let _ = handle.get_unique_id().await;
422 drop(handle);
423 })
424 .await;
425
426 let _ = actor_handle.await;
427}
428
429static NUM_DROPPED_UNBOUNDED: AtomicUsize = AtomicUsize::new(0);
430
431#[derive(Debug)]
432struct MsgUnbounded;
433
434impl Drop for MsgUnbounded {
435 fn drop(&mut self) {
436 NUM_DROPPED_UNBOUNDED.fetch_add(1, Release);
437 }
438}
439
440// Tests that no pending messages are put onto the channel after `Rx` was
441// dropped.
442//
443// Note: After the introduction of `UnboundedWeakSender`, which internally
444// used `Arc` and doesn't call a drop of the channel after the last strong
445// `UnboundedSender` was dropped while more than one `UnboundedWeakSender`
446// remains, we want to ensure that no messages are kept in the channel, which
447// were sent after the receiver was dropped.
448#[tokio::test]
449async fn test_msgs_dropped_on_unbounded_rx_drop() {
450 let (tx, mut rx) = mpsc::unbounded_channel();
451
452 tx.send(MsgUnbounded {}).unwrap();
453 tx.send(MsgUnbounded {}).unwrap();
454
455 // This msg will be pending and should be dropped when `rx` is dropped
456 let sent = tx.send(MsgUnbounded {});
457
458 let _ = rx.recv().await.unwrap();
459 let _ = rx.recv().await.unwrap();
460
461 sent.unwrap();
462
463 drop(rx);
464
465 assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 3);
466
467 // This msg will not be put onto `Tx` list anymore, since `Rx` is closed.
468 assert!(tx.send(MsgUnbounded {}).is_err());
469
470 assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 4);
471}
472
473// Tests that an `WeakUnboundedSender` is upgradeable when other
474// `UnboundedSender`s exist.
475#[test]
476fn downgrade_upgrade_unbounded_sender_success() {
477 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
478 let weak_tx = tx.downgrade();
479 assert!(weak_tx.upgrade().is_some());
480}
481
482// Tests that a `WeakUnboundedSender` fails to upgrade when no other
483// `UnboundedSender` exists.
484#[test]
485fn downgrade_upgrade_unbounded_sender_failure() {
486 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
487 let weak_tx = tx.downgrade();
488 drop(tx);
489 assert!(weak_tx.upgrade().is_none());
490}
491
492// Tests that an `WeakUnboundedSender` cannot be upgraded after an
493// `UnboundedSender` was dropped, which existed at the time of the `downgrade` call.
494#[test]
495fn downgrade_drop_upgrade_unbounded() {
496 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
497
498 // the cloned `Tx` is dropped right away
499 let weak_tx = tx.clone().downgrade();
500 drop(tx);
501 assert!(weak_tx.upgrade().is_none());
502}
503
504// Tests that `downgrade` does not change the `tx_count` of the channel.
505#[test]
506fn test_tx_count_weak_unbounded_sender() {
507 let (tx, _rx) = mpsc::unbounded_channel::<i32>();
508 let tx_weak = tx.downgrade();
509 let tx_weak2 = tx.downgrade();
510 drop(tx);
511
512 assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none());
513}
514