1use futures::channel::{mpsc, oneshot};
2use futures::executor::block_on;
3use futures::future::{self, poll_fn, Future, FutureExt, TryFutureExt};
4use futures::never::Never;
5use futures::ready;
6use futures::sink::{self, Sink, SinkErrInto, SinkExt};
7use futures::stream::{self, Stream, StreamExt};
8use futures::task::{self, ArcWake, Context, Poll, Waker};
9use futures_test::task::panic_context;
10use std::cell::{Cell, RefCell};
11use std::collections::VecDeque;
12use std::fmt;
13use std::mem;
14use std::pin::Pin;
15use std::rc::Rc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18
19fn sassert_next<S>(s: &mut S, item: S::Item)
20where
21 S: Stream + Unpin,
22 S::Item: Eq + fmt::Debug,
23{
24 match s.poll_next_unpin(&mut panic_context()) {
25 Poll::Ready(None) => panic!("stream is at its end"),
26 Poll::Ready(Some(e)) => assert_eq!(e, item),
27 Poll::Pending => panic!("stream wasn't ready"),
28 }
29}
30
31fn unwrap<T, E: fmt::Debug>(x: Poll<Result<T, E>>) -> T {
32 match x {
33 Poll::Ready(Ok(x)) => x,
34 Poll::Ready(Err(_)) => panic!("Poll::Ready(Err(_))"),
35 Poll::Pending => panic!("Poll::Pending"),
36 }
37}
38
39// An Unpark struct that records unpark events for inspection
40struct Flag(AtomicBool);
41
42impl Flag {
43 fn new() -> Arc<Self> {
44 Arc::new(Self(AtomicBool::new(false)))
45 }
46
47 fn take(&self) -> bool {
48 self.0.swap(false, Ordering::SeqCst)
49 }
50
51 fn set(&self, v: bool) {
52 self.0.store(v, Ordering::SeqCst)
53 }
54}
55
56impl ArcWake for Flag {
57 fn wake_by_ref(arc_self: &Arc<Self>) {
58 arc_self.set(true)
59 }
60}
61
62fn flag_cx<F, R>(f: F) -> R
63where
64 F: FnOnce(Arc<Flag>, &mut Context<'_>) -> R,
65{
66 let flag = Flag::new();
67 let waker = task::waker_ref(&flag);
68 let cx = &mut Context::from_waker(&waker);
69 f(flag.clone(), cx)
70}
71
72// Sends a value on an i32 channel sink
73struct StartSendFut<S: Sink<Item> + Unpin, Item: Unpin>(Option<S>, Option<Item>);
74
75impl<S: Sink<Item> + Unpin, Item: Unpin> StartSendFut<S, Item> {
76 fn new(sink: S, item: Item) -> Self {
77 Self(Some(sink), Some(item))
78 }
79}
80
81impl<S: Sink<Item> + Unpin, Item: Unpin> Future for StartSendFut<S, Item> {
82 type Output = Result<S, S::Error>;
83
84 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
85 let Self(inner, item) = self.get_mut();
86 {
87 let mut inner = inner.as_mut().unwrap();
88 ready!(Pin::new(&mut inner).poll_ready(cx))?;
89 Pin::new(&mut inner).start_send(item.take().unwrap())?;
90 }
91 Poll::Ready(Ok(inner.take().unwrap()))
92 }
93}
94
95// Immediately accepts all requests to start pushing, but completion is managed
96// by manually flushing
97struct ManualFlush<T: Unpin> {
98 data: Vec<T>,
99 waiting_tasks: Vec<Waker>,
100}
101
102impl<T: Unpin> Sink<Option<T>> for ManualFlush<T> {
103 type Error = ();
104
105 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
106 Poll::Ready(Ok(()))
107 }
108
109 fn start_send(mut self: Pin<&mut Self>, item: Option<T>) -> Result<(), Self::Error> {
110 if let Some(item) = item {
111 self.data.push(item);
112 } else {
113 self.force_flush();
114 }
115 Ok(())
116 }
117
118 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
119 if self.data.is_empty() {
120 Poll::Ready(Ok(()))
121 } else {
122 self.waiting_tasks.push(cx.waker().clone());
123 Poll::Pending
124 }
125 }
126
127 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
128 self.poll_flush(cx)
129 }
130}
131
132impl<T: Unpin> ManualFlush<T> {
133 fn new() -> Self {
134 Self { data: Vec::new(), waiting_tasks: Vec::new() }
135 }
136
137 fn force_flush(&mut self) -> Vec<T> {
138 for task in self.waiting_tasks.drain(..) {
139 task.wake()
140 }
141 mem::take(&mut self.data)
142 }
143}
144
145struct ManualAllow<T: Unpin> {
146 data: Vec<T>,
147 allow: Rc<Allow>,
148}
149
150struct Allow {
151 flag: Cell<bool>,
152 tasks: RefCell<Vec<Waker>>,
153}
154
155impl Allow {
156 fn new() -> Self {
157 Self { flag: Cell::new(false), tasks: RefCell::new(Vec::new()) }
158 }
159
160 fn check(&self, cx: &mut Context<'_>) -> bool {
161 if self.flag.get() {
162 true
163 } else {
164 self.tasks.borrow_mut().push(cx.waker().clone());
165 false
166 }
167 }
168
169 fn start(&self) {
170 self.flag.set(true);
171 let mut tasks = self.tasks.borrow_mut();
172 for task in tasks.drain(..) {
173 task.wake();
174 }
175 }
176}
177
178impl<T: Unpin> Sink<T> for ManualAllow<T> {
179 type Error = ();
180
181 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
182 if self.allow.check(cx) {
183 Poll::Ready(Ok(()))
184 } else {
185 Poll::Pending
186 }
187 }
188
189 fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
190 self.data.push(item);
191 Ok(())
192 }
193
194 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
195 Poll::Ready(Ok(()))
196 }
197
198 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
199 Poll::Ready(Ok(()))
200 }
201}
202
203fn manual_allow<T: Unpin>() -> (ManualAllow<T>, Rc<Allow>) {
204 let allow = Rc::new(Allow::new());
205 let manual_allow = ManualAllow { data: Vec::new(), allow: allow.clone() };
206 (manual_allow, allow)
207}
208
209#[test]
210fn either_sink() {
211 let mut s =
212 if true { Vec::<i32>::new().left_sink() } else { VecDeque::<i32>::new().right_sink() };
213
214 Pin::new(&mut s).start_send(0).unwrap();
215}
216
217#[test]
218fn vec_sink() {
219 let mut v = Vec::new();
220 Pin::new(&mut v).start_send(0).unwrap();
221 Pin::new(&mut v).start_send(1).unwrap();
222 assert_eq!(v, vec![0, 1]);
223 block_on(v.flush()).unwrap();
224 assert_eq!(v, vec![0, 1]);
225}
226
227#[test]
228fn vecdeque_sink() {
229 let mut deque = VecDeque::new();
230 Pin::new(&mut deque).start_send(2).unwrap();
231 Pin::new(&mut deque).start_send(3).unwrap();
232
233 assert_eq!(deque.pop_front(), Some(2));
234 assert_eq!(deque.pop_front(), Some(3));
235 assert_eq!(deque.pop_front(), None);
236}
237
238#[test]
239fn send() {
240 let mut v = Vec::new();
241
242 block_on(v.send(0)).unwrap();
243 assert_eq!(v, vec![0]);
244
245 block_on(v.send(1)).unwrap();
246 assert_eq!(v, vec![0, 1]);
247
248 block_on(v.send(2)).unwrap();
249 assert_eq!(v, vec![0, 1, 2]);
250}
251
252#[test]
253fn send_all() {
254 let mut v = Vec::new();
255
256 block_on(v.send_all(&mut stream::iter(vec![0, 1]).map(Ok))).unwrap();
257 assert_eq!(v, vec![0, 1]);
258
259 block_on(v.send_all(&mut stream::iter(vec![2, 3]).map(Ok))).unwrap();
260 assert_eq!(v, vec![0, 1, 2, 3]);
261
262 block_on(v.send_all(&mut stream::iter(vec![4, 5]).map(Ok))).unwrap();
263 assert_eq!(v, vec![0, 1, 2, 3, 4, 5]);
264}
265
266// Test that `start_send` on an `mpsc` channel does indeed block when the
267// channel is full
268#[test]
269fn mpsc_blocking_start_send() {
270 let (mut tx, mut rx) = mpsc::channel::<i32>(0);
271
272 block_on(future::lazy(|_| {
273 tx.start_send(0).unwrap();
274
275 flag_cx(|flag, cx| {
276 let mut task = StartSendFut::new(tx, 1);
277
278 assert!(task.poll_unpin(cx).is_pending());
279 assert!(!flag.take());
280 sassert_next(&mut rx, 0);
281 assert!(flag.take());
282 unwrap(task.poll_unpin(cx));
283 assert!(!flag.take());
284 sassert_next(&mut rx, 1);
285 })
286 }));
287}
288
289// test `flush` by using `with` to make the first insertion into a sink block
290// until a oneshot is completed
291#[test]
292fn with_flush() {
293 let (tx, rx) = oneshot::channel();
294 let mut block = rx.boxed();
295 let mut sink = Vec::new().with(|elem| {
296 mem::replace(&mut block, future::ok(()).boxed())
297 .map_ok(move |()| elem + 1)
298 .map_err(|_| -> Never { panic!() })
299 });
300
301 assert_eq!(Pin::new(&mut sink).start_send(0).ok(), Some(()));
302
303 flag_cx(|flag, cx| {
304 let mut task = sink.flush();
305 assert!(task.poll_unpin(cx).is_pending());
306 tx.send(()).unwrap();
307 assert!(flag.take());
308
309 unwrap(task.poll_unpin(cx));
310
311 block_on(sink.send(1)).unwrap();
312 assert_eq!(sink.get_ref(), &[1, 2]);
313 })
314}
315
316// test simple use of with to change data
317#[test]
318fn with_as_map() {
319 let mut sink = Vec::new().with(|item| future::ok::<i32, Never>(item * 2));
320 block_on(sink.send(0)).unwrap();
321 block_on(sink.send(1)).unwrap();
322 block_on(sink.send(2)).unwrap();
323 assert_eq!(sink.get_ref(), &[0, 2, 4]);
324}
325
326// test simple use of with_flat_map
327#[test]
328fn with_flat_map() {
329 let mut sink = Vec::new().with_flat_map(|item| stream::iter(vec![item; item]).map(Ok));
330 block_on(sink.send(0)).unwrap();
331 block_on(sink.send(1)).unwrap();
332 block_on(sink.send(2)).unwrap();
333 block_on(sink.send(3)).unwrap();
334 assert_eq!(sink.get_ref(), &[1, 2, 2, 3, 3, 3]);
335}
336
337// Check that `with` propagates `poll_ready` to the inner sink.
338// Regression test for the issue #1834.
339#[test]
340fn with_propagates_poll_ready() {
341 let (tx, mut rx) = mpsc::channel::<i32>(0);
342 let mut tx = tx.with(|item: i32| future::ok::<i32, mpsc::SendError>(item + 10));
343
344 block_on(future::lazy(|_| {
345 flag_cx(|flag, cx| {
346 let mut tx = Pin::new(&mut tx);
347
348 // Should be ready for the first item.
349 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
350 assert_eq!(tx.as_mut().start_send(0), Ok(()));
351
352 // Should be ready for the second item only after the first one is received.
353 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Pending);
354 assert!(!flag.take());
355 sassert_next(&mut rx, 10);
356 assert!(flag.take());
357 assert_eq!(tx.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
358 assert_eq!(tx.as_mut().start_send(1), Ok(()));
359 })
360 }));
361}
362
363// test that the `with` sink doesn't require the underlying sink to flush,
364// but doesn't claim to be flushed until the underlying sink is
365#[test]
366fn with_flush_propagate() {
367 let mut sink = ManualFlush::new().with(future::ok::<Option<i32>, ()>);
368 flag_cx(|flag, cx| {
369 unwrap(Pin::new(&mut sink).poll_ready(cx));
370 Pin::new(&mut sink).start_send(Some(0)).unwrap();
371 unwrap(Pin::new(&mut sink).poll_ready(cx));
372 Pin::new(&mut sink).start_send(Some(1)).unwrap();
373
374 {
375 let mut task = sink.flush();
376 assert!(task.poll_unpin(cx).is_pending());
377 assert!(!flag.take());
378 }
379 assert_eq!(sink.get_mut().force_flush(), vec![0, 1]);
380 assert!(flag.take());
381 unwrap(sink.flush().poll_unpin(cx));
382 })
383}
384
385// test that `Clone` is implemented on `with` sinks
386#[test]
387fn with_implements_clone() {
388 let (mut tx, rx) = mpsc::channel(5);
389
390 {
391 let mut is_positive = tx.clone().with(|item| future::ok::<bool, mpsc::SendError>(item > 0));
392
393 let mut is_long =
394 tx.clone().with(|item: &str| future::ok::<bool, mpsc::SendError>(item.len() > 5));
395
396 block_on(is_positive.clone().send(-1)).unwrap();
397 block_on(is_long.clone().send("123456")).unwrap();
398 block_on(is_long.send("123")).unwrap();
399 block_on(is_positive.send(1)).unwrap();
400 }
401
402 block_on(tx.send(false)).unwrap();
403
404 block_on(tx.close()).unwrap();
405
406 assert_eq!(block_on(rx.collect::<Vec<_>>()), vec![false, true, false, true, false]);
407}
408
409// test that a buffer is a no-nop around a sink that always accepts sends
410#[test]
411fn buffer_noop() {
412 let mut sink = Vec::new().buffer(0);
413 block_on(sink.send(0)).unwrap();
414 block_on(sink.send(1)).unwrap();
415 assert_eq!(sink.get_ref(), &[0, 1]);
416
417 let mut sink = Vec::new().buffer(1);
418 block_on(sink.send(0)).unwrap();
419 block_on(sink.send(1)).unwrap();
420 assert_eq!(sink.get_ref(), &[0, 1]);
421}
422
423// test basic buffer functionality, including both filling up to capacity,
424// and writing out when the underlying sink is ready
425#[test]
426fn buffer() {
427 let (sink, allow) = manual_allow::<i32>();
428 let sink = sink.buffer(2);
429
430 let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
431 let mut sink = block_on(StartSendFut::new(sink, 1)).unwrap();
432
433 flag_cx(|flag, cx| {
434 let mut task = sink.send(2);
435 assert!(task.poll_unpin(cx).is_pending());
436 assert!(!flag.take());
437 allow.start();
438 assert!(flag.take());
439 unwrap(task.poll_unpin(cx));
440 assert_eq!(sink.get_ref().data, vec![0, 1, 2]);
441 })
442}
443
444#[test]
445fn fanout_smoke() {
446 let sink1 = Vec::new();
447 let sink2 = Vec::new();
448 let mut sink = sink1.fanout(sink2);
449 block_on(sink.send_all(&mut stream::iter(vec![1, 2, 3]).map(Ok))).unwrap();
450 let (sink1, sink2) = sink.into_inner();
451 assert_eq!(sink1, vec![1, 2, 3]);
452 assert_eq!(sink2, vec![1, 2, 3]);
453}
454
455#[test]
456fn fanout_backpressure() {
457 let (left_send, mut left_recv) = mpsc::channel(0);
458 let (right_send, mut right_recv) = mpsc::channel(0);
459 let sink = left_send.fanout(right_send);
460
461 let mut sink = block_on(StartSendFut::new(sink, 0)).unwrap();
462
463 flag_cx(|flag, cx| {
464 let mut task = sink.send(2);
465 assert!(!flag.take());
466 assert!(task.poll_unpin(cx).is_pending());
467 assert_eq!(block_on(left_recv.next()), Some(0));
468 assert!(flag.take());
469 assert!(task.poll_unpin(cx).is_pending());
470 assert_eq!(block_on(right_recv.next()), Some(0));
471 assert!(flag.take());
472
473 assert!(task.poll_unpin(cx).is_pending());
474 assert_eq!(block_on(left_recv.next()), Some(2));
475 assert!(flag.take());
476 assert!(task.poll_unpin(cx).is_pending());
477 assert_eq!(block_on(right_recv.next()), Some(2));
478 assert!(flag.take());
479
480 unwrap(task.poll_unpin(cx));
481 // make sure receivers live until end of test to prevent send errors
482 drop(left_recv);
483 drop(right_recv);
484 })
485}
486
487#[test]
488fn sink_map_err() {
489 {
490 let cx = &mut panic_context();
491 let (tx, _rx) = mpsc::channel(1);
492 let mut tx = tx.sink_map_err(|_| ());
493 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
494 assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
495 }
496
497 let tx = mpsc::channel(0).0;
498 assert_eq!(Pin::new(&mut tx.sink_map_err(|_| ())).start_send(()), Err(()));
499}
500
501#[test]
502fn sink_unfold() {
503 block_on(poll_fn(|cx| {
504 let (tx, mut rx) = mpsc::channel(1);
505 let unfold = sink::unfold((), |(), i: i32| {
506 let mut tx = tx.clone();
507 async move {
508 tx.send(i).await.unwrap();
509 Ok::<_, String>(())
510 }
511 });
512 futures::pin_mut!(unfold);
513 assert_eq!(unfold.as_mut().start_send(1), Ok(()));
514 assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Ready(Ok(())));
515 assert_eq!(rx.try_next().unwrap(), Some(1));
516
517 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
518 assert_eq!(unfold.as_mut().start_send(2), Ok(()));
519 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
520 assert_eq!(unfold.as_mut().start_send(3), Ok(()));
521 assert_eq!(rx.try_next().unwrap(), Some(2));
522 assert!(rx.try_next().is_err());
523 assert_eq!(unfold.as_mut().poll_ready(cx), Poll::Ready(Ok(())));
524 assert_eq!(unfold.as_mut().start_send(4), Ok(()));
525 assert_eq!(unfold.as_mut().poll_flush(cx), Poll::Pending); // Channel full
526 assert_eq!(rx.try_next().unwrap(), Some(3));
527 assert_eq!(rx.try_next().unwrap(), Some(4));
528
529 Poll::Ready(())
530 }))
531}
532
533#[test]
534fn err_into() {
535 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
536 struct ErrIntoTest;
537
538 impl From<mpsc::SendError> for ErrIntoTest {
539 fn from(_: mpsc::SendError) -> Self {
540 Self
541 }
542 }
543
544 {
545 let cx = &mut panic_context();
546 let (tx, _rx) = mpsc::channel(1);
547 let mut tx: SinkErrInto<mpsc::Sender<()>, _, ErrIntoTest> = tx.sink_err_into();
548 assert_eq!(Pin::new(&mut tx).start_send(()), Ok(()));
549 assert_eq!(Pin::new(&mut tx).poll_flush(cx), Poll::Ready(Ok(())));
550 }
551
552 let tx = mpsc::channel(0).0;
553 assert_eq!(Pin::new(&mut tx.sink_err_into()).start_send(()), Err(ErrIntoTest));
554}
555