1 | #![allow (clippy::redundant_clone)] |
2 | #![warn (rust_2018_idioms)] |
3 | #![cfg (feature = "sync" )] |
4 | |
5 | #[cfg (all(target_family = "wasm" , not(target_os = "wasi" )))] |
6 | use wasm_bindgen_test::wasm_bindgen_test as test; |
7 | |
8 | use std::sync::atomic::AtomicUsize; |
9 | use std::sync::atomic::Ordering::{Acquire, Release}; |
10 | use tokio::sync::mpsc::{self, channel, unbounded_channel}; |
11 | use tokio::sync::oneshot; |
12 | |
13 | #[tokio::test ] |
14 | async fn weak_sender() { |
15 | let (tx, mut rx) = channel(11); |
16 | |
17 | let tx_weak = tokio::spawn(async move { |
18 | let tx_weak = tx.clone().downgrade(); |
19 | |
20 | for i in 0..10 { |
21 | if tx.send(i).await.is_err() { |
22 | return None; |
23 | } |
24 | } |
25 | |
26 | let tx2 = tx_weak |
27 | .upgrade() |
28 | .expect("expected to be able to upgrade tx_weak" ); |
29 | let _ = tx2.send(20).await; |
30 | let tx_weak = tx2.downgrade(); |
31 | |
32 | Some(tx_weak) |
33 | }) |
34 | .await |
35 | .unwrap(); |
36 | |
37 | for i in 0..12 { |
38 | let recvd = rx.recv().await; |
39 | |
40 | match recvd { |
41 | Some(msg) => { |
42 | if i == 10 { |
43 | assert_eq!(msg, 20); |
44 | } |
45 | } |
46 | None => { |
47 | assert_eq!(i, 11); |
48 | break; |
49 | } |
50 | } |
51 | } |
52 | |
53 | let tx_weak = tx_weak.unwrap(); |
54 | let upgraded = tx_weak.upgrade(); |
55 | assert!(upgraded.is_none()); |
56 | } |
57 | |
58 | #[tokio::test ] |
59 | async fn actor_weak_sender() { |
60 | pub struct MyActor { |
61 | receiver: mpsc::Receiver<ActorMessage>, |
62 | sender: mpsc::WeakSender<ActorMessage>, |
63 | next_id: u32, |
64 | pub received_self_msg: bool, |
65 | } |
66 | |
67 | enum ActorMessage { |
68 | GetUniqueId { respond_to: oneshot::Sender<u32> }, |
69 | SelfMessage {}, |
70 | } |
71 | |
72 | impl MyActor { |
73 | fn new( |
74 | receiver: mpsc::Receiver<ActorMessage>, |
75 | sender: mpsc::WeakSender<ActorMessage>, |
76 | ) -> Self { |
77 | MyActor { |
78 | receiver, |
79 | sender, |
80 | next_id: 0, |
81 | received_self_msg: false, |
82 | } |
83 | } |
84 | |
85 | fn handle_message(&mut self, msg: ActorMessage) { |
86 | match msg { |
87 | ActorMessage::GetUniqueId { respond_to } => { |
88 | self.next_id += 1; |
89 | |
90 | // The `let _ =` ignores any errors when sending. |
91 | // |
92 | // This can happen if the `select!` macro is used |
93 | // to cancel waiting for the response. |
94 | let _ = respond_to.send(self.next_id); |
95 | } |
96 | ActorMessage::SelfMessage { .. } => { |
97 | self.received_self_msg = true; |
98 | } |
99 | } |
100 | } |
101 | |
102 | async fn send_message_to_self(&mut self) { |
103 | let msg = ActorMessage::SelfMessage {}; |
104 | |
105 | let sender = self.sender.clone(); |
106 | |
107 | // cannot move self.sender here |
108 | if let Some(sender) = sender.upgrade() { |
109 | let _ = sender.send(msg).await; |
110 | self.sender = sender.downgrade(); |
111 | } |
112 | } |
113 | |
114 | async fn run(&mut self) { |
115 | let mut i = 0; |
116 | while let Some(msg) = self.receiver.recv().await { |
117 | self.handle_message(msg); |
118 | |
119 | if i == 0 { |
120 | self.send_message_to_self().await; |
121 | } |
122 | |
123 | i += 1 |
124 | } |
125 | |
126 | assert!(self.received_self_msg); |
127 | } |
128 | } |
129 | |
130 | #[derive(Clone)] |
131 | pub struct MyActorHandle { |
132 | sender: mpsc::Sender<ActorMessage>, |
133 | } |
134 | |
135 | impl MyActorHandle { |
136 | pub fn new() -> (Self, MyActor) { |
137 | let (sender, receiver) = mpsc::channel(8); |
138 | let actor = MyActor::new(receiver, sender.clone().downgrade()); |
139 | |
140 | (Self { sender }, actor) |
141 | } |
142 | |
143 | pub async fn get_unique_id(&self) -> u32 { |
144 | let (send, recv) = oneshot::channel(); |
145 | let msg = ActorMessage::GetUniqueId { respond_to: send }; |
146 | |
147 | // Ignore send errors. If this send fails, so does the |
148 | // recv.await below. There's no reason to check the |
149 | // failure twice. |
150 | let _ = self.sender.send(msg).await; |
151 | recv.await.expect("Actor task has been killed" ) |
152 | } |
153 | } |
154 | |
155 | let (handle, mut actor) = MyActorHandle::new(); |
156 | |
157 | let actor_handle = tokio::spawn(async move { actor.run().await }); |
158 | |
159 | let _ = tokio::spawn(async move { |
160 | let _ = handle.get_unique_id().await; |
161 | drop(handle); |
162 | }) |
163 | .await; |
164 | |
165 | let _ = actor_handle.await; |
166 | } |
167 | |
168 | static NUM_DROPPED: AtomicUsize = AtomicUsize::new(0); |
169 | |
170 | #[derive(Debug)] |
171 | struct Msg; |
172 | |
173 | impl Drop for Msg { |
174 | fn drop(&mut self) { |
175 | NUM_DROPPED.fetch_add(1, Release); |
176 | } |
177 | } |
178 | |
179 | // Tests that no pending messages are put onto the channel after `Rx` was |
180 | // dropped. |
181 | // |
182 | // Note: After the introduction of `WeakSender`, which internally |
183 | // used `Arc` and doesn't call a drop of the channel after the last strong |
184 | // `Sender` was dropped while more than one `WeakSender` remains, we want to |
185 | // ensure that no messages are kept in the channel, which were sent after |
186 | // the receiver was dropped. |
187 | #[tokio::test ] |
188 | async fn test_msgs_dropped_on_rx_drop() { |
189 | let (tx, mut rx) = mpsc::channel(3); |
190 | |
191 | tx.send(Msg {}).await.unwrap(); |
192 | tx.send(Msg {}).await.unwrap(); |
193 | |
194 | // This msg will be pending and should be dropped when `rx` is dropped |
195 | let sent_fut = tx.send(Msg {}); |
196 | |
197 | let _ = rx.recv().await.unwrap(); |
198 | let _ = rx.recv().await.unwrap(); |
199 | |
200 | sent_fut.await.unwrap(); |
201 | |
202 | drop(rx); |
203 | |
204 | assert_eq!(NUM_DROPPED.load(Acquire), 3); |
205 | |
206 | // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. |
207 | assert!(tx.send(Msg {}).await.is_err()); |
208 | |
209 | assert_eq!(NUM_DROPPED.load(Acquire), 4); |
210 | } |
211 | |
212 | // Tests that a `WeakSender` is upgradeable when other `Sender`s exist. |
213 | #[test] |
214 | fn downgrade_upgrade_sender_success() { |
215 | let (tx, _rx) = mpsc::channel::<i32>(1); |
216 | let weak_tx = tx.downgrade(); |
217 | assert!(weak_tx.upgrade().is_some()); |
218 | } |
219 | |
220 | // Tests that a `WeakSender` fails to upgrade when no other `Sender` exists. |
221 | #[test] |
222 | fn downgrade_upgrade_sender_failure() { |
223 | let (tx, _rx) = mpsc::channel::<i32>(1); |
224 | let weak_tx = tx.downgrade(); |
225 | drop(tx); |
226 | assert!(weak_tx.upgrade().is_none()); |
227 | } |
228 | |
229 | // Tests that a `WeakSender` cannot be upgraded after a `Sender` was dropped, |
230 | // which existed at the time of the `downgrade` call. |
231 | #[test] |
232 | fn downgrade_drop_upgrade() { |
233 | let (tx, _rx) = mpsc::channel::<i32>(1); |
234 | |
235 | // the cloned `Tx` is dropped right away |
236 | let weak_tx = tx.clone().downgrade(); |
237 | drop(tx); |
238 | assert!(weak_tx.upgrade().is_none()); |
239 | } |
240 | |
241 | // Tests that we can upgrade a weak sender with an outstanding permit |
242 | // but no other strong senders. |
243 | #[tokio::test ] |
244 | async fn downgrade_get_permit_upgrade_no_senders() { |
245 | let (tx, _rx) = mpsc::channel::<i32>(1); |
246 | let weak_tx = tx.downgrade(); |
247 | let _permit = tx.reserve_owned().await.unwrap(); |
248 | assert!(weak_tx.upgrade().is_some()); |
249 | } |
250 | |
251 | // Tests that you can downgrade and upgrade a sender with an outstanding permit |
252 | // but no other senders left. |
253 | #[tokio::test ] |
254 | async fn downgrade_upgrade_get_permit_no_senders() { |
255 | let (tx, _rx) = mpsc::channel::<i32>(1); |
256 | let tx2 = tx.clone(); |
257 | let _permit = tx.reserve_owned().await.unwrap(); |
258 | let weak_tx = tx2.downgrade(); |
259 | drop(tx2); |
260 | assert!(weak_tx.upgrade().is_some()); |
261 | } |
262 | |
263 | // Tests that `downgrade` does not change the `tx_count` of the channel. |
264 | #[test] |
265 | fn test_tx_count_weak_sender() { |
266 | let (tx, _rx) = mpsc::channel::<i32>(1); |
267 | let tx_weak = tx.downgrade(); |
268 | let tx_weak2 = tx.downgrade(); |
269 | drop(tx); |
270 | |
271 | assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); |
272 | } |
273 | |
274 | #[tokio::test ] |
275 | async fn weak_unbounded_sender() { |
276 | let (tx, mut rx) = unbounded_channel(); |
277 | |
278 | let tx_weak = tokio::spawn(async move { |
279 | let tx_weak = tx.clone().downgrade(); |
280 | |
281 | for i in 0..10 { |
282 | if tx.send(i).is_err() { |
283 | return None; |
284 | } |
285 | } |
286 | |
287 | let tx2 = tx_weak |
288 | .upgrade() |
289 | .expect("expected to be able to upgrade tx_weak" ); |
290 | let _ = tx2.send(20); |
291 | let tx_weak = tx2.downgrade(); |
292 | |
293 | Some(tx_weak) |
294 | }) |
295 | .await |
296 | .unwrap(); |
297 | |
298 | for i in 0..12 { |
299 | let recvd = rx.recv().await; |
300 | |
301 | match recvd { |
302 | Some(msg) => { |
303 | if i == 10 { |
304 | assert_eq!(msg, 20); |
305 | } |
306 | } |
307 | None => { |
308 | assert_eq!(i, 11); |
309 | break; |
310 | } |
311 | } |
312 | } |
313 | |
314 | let tx_weak = tx_weak.unwrap(); |
315 | let upgraded = tx_weak.upgrade(); |
316 | assert!(upgraded.is_none()); |
317 | } |
318 | |
319 | #[tokio::test ] |
320 | async fn actor_weak_unbounded_sender() { |
321 | pub struct MyActor { |
322 | receiver: mpsc::UnboundedReceiver<ActorMessage>, |
323 | sender: mpsc::WeakUnboundedSender<ActorMessage>, |
324 | next_id: u32, |
325 | pub received_self_msg: bool, |
326 | } |
327 | |
328 | enum ActorMessage { |
329 | GetUniqueId { respond_to: oneshot::Sender<u32> }, |
330 | SelfMessage {}, |
331 | } |
332 | |
333 | impl MyActor { |
334 | fn new( |
335 | receiver: mpsc::UnboundedReceiver<ActorMessage>, |
336 | sender: mpsc::WeakUnboundedSender<ActorMessage>, |
337 | ) -> Self { |
338 | MyActor { |
339 | receiver, |
340 | sender, |
341 | next_id: 0, |
342 | received_self_msg: false, |
343 | } |
344 | } |
345 | |
346 | fn handle_message(&mut self, msg: ActorMessage) { |
347 | match msg { |
348 | ActorMessage::GetUniqueId { respond_to } => { |
349 | self.next_id += 1; |
350 | |
351 | // The `let _ =` ignores any errors when sending. |
352 | // |
353 | // This can happen if the `select!` macro is used |
354 | // to cancel waiting for the response. |
355 | let _ = respond_to.send(self.next_id); |
356 | } |
357 | ActorMessage::SelfMessage { .. } => { |
358 | self.received_self_msg = true; |
359 | } |
360 | } |
361 | } |
362 | |
363 | async fn send_message_to_self(&mut self) { |
364 | let msg = ActorMessage::SelfMessage {}; |
365 | |
366 | let sender = self.sender.clone(); |
367 | |
368 | // cannot move self.sender here |
369 | if let Some(sender) = sender.upgrade() { |
370 | let _ = sender.send(msg); |
371 | self.sender = sender.downgrade(); |
372 | } |
373 | } |
374 | |
375 | async fn run(&mut self) { |
376 | let mut i = 0; |
377 | while let Some(msg) = self.receiver.recv().await { |
378 | self.handle_message(msg); |
379 | |
380 | if i == 0 { |
381 | self.send_message_to_self().await; |
382 | } |
383 | |
384 | i += 1 |
385 | } |
386 | |
387 | assert!(self.received_self_msg); |
388 | } |
389 | } |
390 | |
391 | #[derive(Clone)] |
392 | pub struct MyActorHandle { |
393 | sender: mpsc::UnboundedSender<ActorMessage>, |
394 | } |
395 | |
396 | impl MyActorHandle { |
397 | pub fn new() -> (Self, MyActor) { |
398 | let (sender, receiver) = mpsc::unbounded_channel(); |
399 | let actor = MyActor::new(receiver, sender.clone().downgrade()); |
400 | |
401 | (Self { sender }, actor) |
402 | } |
403 | |
404 | pub async fn get_unique_id(&self) -> u32 { |
405 | let (send, recv) = oneshot::channel(); |
406 | let msg = ActorMessage::GetUniqueId { respond_to: send }; |
407 | |
408 | // Ignore send errors. If this send fails, so does the |
409 | // recv.await below. There's no reason to check the |
410 | // failure twice. |
411 | let _ = self.sender.send(msg); |
412 | recv.await.expect("Actor task has been killed" ) |
413 | } |
414 | } |
415 | |
416 | let (handle, mut actor) = MyActorHandle::new(); |
417 | |
418 | let actor_handle = tokio::spawn(async move { actor.run().await }); |
419 | |
420 | let _ = tokio::spawn(async move { |
421 | let _ = handle.get_unique_id().await; |
422 | drop(handle); |
423 | }) |
424 | .await; |
425 | |
426 | let _ = actor_handle.await; |
427 | } |
428 | |
429 | static NUM_DROPPED_UNBOUNDED: AtomicUsize = AtomicUsize::new(0); |
430 | |
431 | #[derive(Debug)] |
432 | struct MsgUnbounded; |
433 | |
434 | impl Drop for MsgUnbounded { |
435 | fn drop(&mut self) { |
436 | NUM_DROPPED_UNBOUNDED.fetch_add(1, Release); |
437 | } |
438 | } |
439 | |
440 | // Tests that no pending messages are put onto the channel after `Rx` was |
441 | // dropped. |
442 | // |
443 | // Note: After the introduction of `UnboundedWeakSender`, which internally |
444 | // used `Arc` and doesn't call a drop of the channel after the last strong |
445 | // `UnboundedSender` was dropped while more than one `UnboundedWeakSender` |
446 | // remains, we want to ensure that no messages are kept in the channel, which |
447 | // were sent after the receiver was dropped. |
448 | #[tokio::test ] |
449 | async fn test_msgs_dropped_on_unbounded_rx_drop() { |
450 | let (tx, mut rx) = mpsc::unbounded_channel(); |
451 | |
452 | tx.send(MsgUnbounded {}).unwrap(); |
453 | tx.send(MsgUnbounded {}).unwrap(); |
454 | |
455 | // This msg will be pending and should be dropped when `rx` is dropped |
456 | let sent = tx.send(MsgUnbounded {}); |
457 | |
458 | let _ = rx.recv().await.unwrap(); |
459 | let _ = rx.recv().await.unwrap(); |
460 | |
461 | sent.unwrap(); |
462 | |
463 | drop(rx); |
464 | |
465 | assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 3); |
466 | |
467 | // This msg will not be put onto `Tx` list anymore, since `Rx` is closed. |
468 | assert!(tx.send(MsgUnbounded {}).is_err()); |
469 | |
470 | assert_eq!(NUM_DROPPED_UNBOUNDED.load(Acquire), 4); |
471 | } |
472 | |
473 | // Tests that an `WeakUnboundedSender` is upgradeable when other |
474 | // `UnboundedSender`s exist. |
475 | #[test] |
476 | fn downgrade_upgrade_unbounded_sender_success() { |
477 | let (tx, _rx) = mpsc::unbounded_channel::<i32>(); |
478 | let weak_tx = tx.downgrade(); |
479 | assert!(weak_tx.upgrade().is_some()); |
480 | } |
481 | |
482 | // Tests that a `WeakUnboundedSender` fails to upgrade when no other |
483 | // `UnboundedSender` exists. |
484 | #[test] |
485 | fn downgrade_upgrade_unbounded_sender_failure() { |
486 | let (tx, _rx) = mpsc::unbounded_channel::<i32>(); |
487 | let weak_tx = tx.downgrade(); |
488 | drop(tx); |
489 | assert!(weak_tx.upgrade().is_none()); |
490 | } |
491 | |
492 | // Tests that an `WeakUnboundedSender` cannot be upgraded after an |
493 | // `UnboundedSender` was dropped, which existed at the time of the `downgrade` call. |
494 | #[test] |
495 | fn downgrade_drop_upgrade_unbounded() { |
496 | let (tx, _rx) = mpsc::unbounded_channel::<i32>(); |
497 | |
498 | // the cloned `Tx` is dropped right away |
499 | let weak_tx = tx.clone().downgrade(); |
500 | drop(tx); |
501 | assert!(weak_tx.upgrade().is_none()); |
502 | } |
503 | |
504 | // Tests that `downgrade` does not change the `tx_count` of the channel. |
505 | #[test] |
506 | fn test_tx_count_weak_unbounded_sender() { |
507 | let (tx, _rx) = mpsc::unbounded_channel::<i32>(); |
508 | let tx_weak = tx.downgrade(); |
509 | let tx_weak2 = tx.downgrade(); |
510 | drop(tx); |
511 | |
512 | assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); |
513 | } |
514 | |