1 | use super::{SendError, Sender, TrySendError, UnboundedSender}; |
2 | use futures_core::task::{Context, Poll}; |
3 | use futures_sink::Sink; |
4 | use std::pin::Pin; |
5 | |
6 | impl<T> Sink<T> for Sender<T> { |
7 | type Error = SendError; |
8 | |
9 | fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
10 | (*self).poll_ready(cx) |
11 | } |
12 | |
13 | fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { |
14 | (*self).start_send(msg) |
15 | } |
16 | |
17 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
18 | match (*self).poll_ready(cx) { |
19 | Poll::Ready(Err(ref e)) if e.is_disconnected() => { |
20 | // If the receiver disconnected, we consider the sink to be flushed. |
21 | Poll::Ready(Ok(())) |
22 | } |
23 | x => x, |
24 | } |
25 | } |
26 | |
27 | fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
28 | self.disconnect(); |
29 | Poll::Ready(Ok(())) |
30 | } |
31 | } |
32 | |
33 | impl<T> Sink<T> for UnboundedSender<T> { |
34 | type Error = SendError; |
35 | |
36 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
37 | Self::poll_ready(&*self, cx) |
38 | } |
39 | |
40 | fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { |
41 | Self::start_send(&mut *self, msg) |
42 | } |
43 | |
44 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
45 | Poll::Ready(Ok(())) |
46 | } |
47 | |
48 | fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
49 | self.disconnect(); |
50 | Poll::Ready(Ok(())) |
51 | } |
52 | } |
53 | |
54 | impl<T> Sink<T> for &UnboundedSender<T> { |
55 | type Error = SendError; |
56 | |
57 | fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
58 | UnboundedSender::poll_ready(*self, cx) |
59 | } |
60 | |
61 | fn start_send(self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { |
62 | self.unbounded_send(msg).map_err(op:TrySendError::into_send_error) |
63 | } |
64 | |
65 | fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
66 | Poll::Ready(Ok(())) |
67 | } |
68 | |
69 | fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { |
70 | self.close_channel(); |
71 | Poll::Ready(Ok(())) |
72 | } |
73 | } |
74 | |