1#![warn(rust_2018_idioms)]
2
3use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
4use tokio_test::task;
5use tokio_test::{
6 assert_err, assert_ok, assert_pending, assert_ready, assert_ready_err, assert_ready_ok,
7};
8use tokio_util::codec::*;
9
10use bytes::{BufMut, Bytes, BytesMut};
11use futures::{pin_mut, Sink, Stream};
12use std::collections::VecDeque;
13use std::io;
14use std::pin::Pin;
15use std::task::{Context, Poll};
16
17macro_rules! mock {
18 ($($x:expr,)*) => {{
19 let mut v = VecDeque::new();
20 v.extend(vec![$($x),*]);
21 Mock { calls: v }
22 }};
23}
24
25macro_rules! assert_next_eq {
26 ($io:ident, $expect:expr) => {{
27 task::spawn(()).enter(|cx, _| {
28 let res = assert_ready!($io.as_mut().poll_next(cx));
29 match res {
30 Some(Ok(v)) => assert_eq!(v, $expect.as_ref()),
31 Some(Err(e)) => panic!("error = {:?}", e),
32 None => panic!("none"),
33 }
34 });
35 }};
36}
37
38macro_rules! assert_next_pending {
39 ($io:ident) => {{
40 task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
41 Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
42 Poll::Ready(Some(Err(e))) => panic!("error = {:?}", e),
43 Poll::Ready(None) => panic!("done"),
44 Poll::Pending => {}
45 });
46 }};
47}
48
49macro_rules! assert_next_err {
50 ($io:ident) => {{
51 task::spawn(()).enter(|cx, _| match $io.as_mut().poll_next(cx) {
52 Poll::Ready(Some(Ok(v))) => panic!("value = {:?}", v),
53 Poll::Ready(Some(Err(_))) => {}
54 Poll::Ready(None) => panic!("done"),
55 Poll::Pending => panic!("pending"),
56 });
57 }};
58}
59
60macro_rules! assert_done {
61 ($io:ident) => {{
62 task::spawn(()).enter(|cx, _| {
63 let res = assert_ready!($io.as_mut().poll_next(cx));
64 match res {
65 Some(Ok(v)) => panic!("value = {:?}", v),
66 Some(Err(e)) => panic!("error = {:?}", e),
67 None => {}
68 }
69 });
70 }};
71}
72
73#[test]
74fn read_empty_io_yields_nothing() {
75 let io = Box::pin(FramedRead::new(mock!(), LengthDelimitedCodec::new()));
76 pin_mut!(io);
77
78 assert_done!(io);
79}
80
81#[test]
82fn read_single_frame_one_packet() {
83 let io = FramedRead::new(
84 mock! {
85 data(b"\x00\x00\x00\x09abcdefghi"),
86 },
87 LengthDelimitedCodec::new(),
88 );
89 pin_mut!(io);
90
91 assert_next_eq!(io, b"abcdefghi");
92 assert_done!(io);
93}
94
95#[test]
96fn read_single_frame_one_packet_little_endian() {
97 let io = length_delimited::Builder::new()
98 .little_endian()
99 .new_read(mock! {
100 data(b"\x09\x00\x00\x00abcdefghi"),
101 });
102 pin_mut!(io);
103
104 assert_next_eq!(io, b"abcdefghi");
105 assert_done!(io);
106}
107
108#[test]
109fn read_single_frame_one_packet_native_endian() {
110 let d = if cfg!(target_endian = "big") {
111 b"\x00\x00\x00\x09abcdefghi"
112 } else {
113 b"\x09\x00\x00\x00abcdefghi"
114 };
115 let io = length_delimited::Builder::new()
116 .native_endian()
117 .new_read(mock! {
118 data(d),
119 });
120 pin_mut!(io);
121
122 assert_next_eq!(io, b"abcdefghi");
123 assert_done!(io);
124}
125
126#[test]
127fn read_single_multi_frame_one_packet() {
128 let mut d: Vec<u8> = vec![];
129 d.extend_from_slice(b"\x00\x00\x00\x09abcdefghi");
130 d.extend_from_slice(b"\x00\x00\x00\x03123");
131 d.extend_from_slice(b"\x00\x00\x00\x0bhello world");
132
133 let io = FramedRead::new(
134 mock! {
135 data(&d),
136 },
137 LengthDelimitedCodec::new(),
138 );
139 pin_mut!(io);
140
141 assert_next_eq!(io, b"abcdefghi");
142 assert_next_eq!(io, b"123");
143 assert_next_eq!(io, b"hello world");
144 assert_done!(io);
145}
146
147#[test]
148fn read_single_frame_multi_packet() {
149 let io = FramedRead::new(
150 mock! {
151 data(b"\x00\x00"),
152 data(b"\x00\x09abc"),
153 data(b"defghi"),
154 },
155 LengthDelimitedCodec::new(),
156 );
157 pin_mut!(io);
158
159 assert_next_eq!(io, b"abcdefghi");
160 assert_done!(io);
161}
162
163#[test]
164fn read_multi_frame_multi_packet() {
165 let io = FramedRead::new(
166 mock! {
167 data(b"\x00\x00"),
168 data(b"\x00\x09abc"),
169 data(b"defghi"),
170 data(b"\x00\x00\x00\x0312"),
171 data(b"3\x00\x00\x00\x0bhello world"),
172 },
173 LengthDelimitedCodec::new(),
174 );
175 pin_mut!(io);
176
177 assert_next_eq!(io, b"abcdefghi");
178 assert_next_eq!(io, b"123");
179 assert_next_eq!(io, b"hello world");
180 assert_done!(io);
181}
182
183#[test]
184fn read_single_frame_multi_packet_wait() {
185 let io = FramedRead::new(
186 mock! {
187 data(b"\x00\x00"),
188 Poll::Pending,
189 data(b"\x00\x09abc"),
190 Poll::Pending,
191 data(b"defghi"),
192 Poll::Pending,
193 },
194 LengthDelimitedCodec::new(),
195 );
196 pin_mut!(io);
197
198 assert_next_pending!(io);
199 assert_next_pending!(io);
200 assert_next_eq!(io, b"abcdefghi");
201 assert_next_pending!(io);
202 assert_done!(io);
203}
204
205#[test]
206fn read_multi_frame_multi_packet_wait() {
207 let io = FramedRead::new(
208 mock! {
209 data(b"\x00\x00"),
210 Poll::Pending,
211 data(b"\x00\x09abc"),
212 Poll::Pending,
213 data(b"defghi"),
214 Poll::Pending,
215 data(b"\x00\x00\x00\x0312"),
216 Poll::Pending,
217 data(b"3\x00\x00\x00\x0bhello world"),
218 Poll::Pending,
219 },
220 LengthDelimitedCodec::new(),
221 );
222 pin_mut!(io);
223
224 assert_next_pending!(io);
225 assert_next_pending!(io);
226 assert_next_eq!(io, b"abcdefghi");
227 assert_next_pending!(io);
228 assert_next_pending!(io);
229 assert_next_eq!(io, b"123");
230 assert_next_eq!(io, b"hello world");
231 assert_next_pending!(io);
232 assert_done!(io);
233}
234
235#[test]
236fn read_incomplete_head() {
237 let io = FramedRead::new(
238 mock! {
239 data(b"\x00\x00"),
240 },
241 LengthDelimitedCodec::new(),
242 );
243 pin_mut!(io);
244
245 assert_next_err!(io);
246}
247
248#[test]
249fn read_incomplete_head_multi() {
250 let io = FramedRead::new(
251 mock! {
252 Poll::Pending,
253 data(b"\x00"),
254 Poll::Pending,
255 },
256 LengthDelimitedCodec::new(),
257 );
258 pin_mut!(io);
259
260 assert_next_pending!(io);
261 assert_next_pending!(io);
262 assert_next_err!(io);
263}
264
265#[test]
266fn read_incomplete_payload() {
267 let io = FramedRead::new(
268 mock! {
269 data(b"\x00\x00\x00\x09ab"),
270 Poll::Pending,
271 data(b"cd"),
272 Poll::Pending,
273 },
274 LengthDelimitedCodec::new(),
275 );
276 pin_mut!(io);
277
278 assert_next_pending!(io);
279 assert_next_pending!(io);
280 assert_next_err!(io);
281}
282
283#[test]
284fn read_max_frame_len() {
285 let io = length_delimited::Builder::new()
286 .max_frame_length(5)
287 .new_read(mock! {
288 data(b"\x00\x00\x00\x09abcdefghi"),
289 });
290 pin_mut!(io);
291
292 assert_next_err!(io);
293}
294
295#[test]
296fn read_update_max_frame_len_at_rest() {
297 let io = length_delimited::Builder::new().new_read(mock! {
298 data(b"\x00\x00\x00\x09abcdefghi"),
299 data(b"\x00\x00\x00\x09abcdefghi"),
300 });
301 pin_mut!(io);
302
303 assert_next_eq!(io, b"abcdefghi");
304 io.decoder_mut().set_max_frame_length(5);
305 assert_next_err!(io);
306}
307
308#[test]
309fn read_update_max_frame_len_in_flight() {
310 let io = length_delimited::Builder::new().new_read(mock! {
311 data(b"\x00\x00\x00\x09abcd"),
312 Poll::Pending,
313 data(b"efghi"),
314 data(b"\x00\x00\x00\x09abcdefghi"),
315 });
316 pin_mut!(io);
317
318 assert_next_pending!(io);
319 io.decoder_mut().set_max_frame_length(5);
320 assert_next_eq!(io, b"abcdefghi");
321 assert_next_err!(io);
322}
323
324#[test]
325fn read_one_byte_length_field() {
326 let io = length_delimited::Builder::new()
327 .length_field_length(1)
328 .new_read(mock! {
329 data(b"\x09abcdefghi"),
330 });
331 pin_mut!(io);
332
333 assert_next_eq!(io, b"abcdefghi");
334 assert_done!(io);
335}
336
337#[test]
338fn read_header_offset() {
339 let io = length_delimited::Builder::new()
340 .length_field_length(2)
341 .length_field_offset(4)
342 .new_read(mock! {
343 data(b"zzzz\x00\x09abcdefghi"),
344 });
345 pin_mut!(io);
346
347 assert_next_eq!(io, b"abcdefghi");
348 assert_done!(io);
349}
350
351#[test]
352fn read_single_multi_frame_one_packet_skip_none_adjusted() {
353 let mut d: Vec<u8> = vec![];
354 d.extend_from_slice(b"xx\x00\x09abcdefghi");
355 d.extend_from_slice(b"yy\x00\x03123");
356 d.extend_from_slice(b"zz\x00\x0bhello world");
357
358 let io = length_delimited::Builder::new()
359 .length_field_length(2)
360 .length_field_offset(2)
361 .num_skip(0)
362 .length_adjustment(4)
363 .new_read(mock! {
364 data(&d),
365 });
366 pin_mut!(io);
367
368 assert_next_eq!(io, b"xx\x00\x09abcdefghi");
369 assert_next_eq!(io, b"yy\x00\x03123");
370 assert_next_eq!(io, b"zz\x00\x0bhello world");
371 assert_done!(io);
372}
373
374#[test]
375fn read_single_frame_length_adjusted() {
376 let mut d: Vec<u8> = vec![];
377 d.extend_from_slice(b"\x00\x00\x0b\x0cHello world");
378
379 let io = length_delimited::Builder::new()
380 .length_field_offset(0)
381 .length_field_length(3)
382 .length_adjustment(0)
383 .num_skip(4)
384 .new_read(mock! {
385 data(&d),
386 });
387 pin_mut!(io);
388
389 assert_next_eq!(io, b"Hello world");
390 assert_done!(io);
391}
392
393#[test]
394fn read_single_multi_frame_one_packet_length_includes_head() {
395 let mut d: Vec<u8> = vec![];
396 d.extend_from_slice(b"\x00\x0babcdefghi");
397 d.extend_from_slice(b"\x00\x05123");
398 d.extend_from_slice(b"\x00\x0dhello world");
399
400 let io = length_delimited::Builder::new()
401 .length_field_length(2)
402 .length_adjustment(-2)
403 .new_read(mock! {
404 data(&d),
405 });
406 pin_mut!(io);
407
408 assert_next_eq!(io, b"abcdefghi");
409 assert_next_eq!(io, b"123");
410 assert_next_eq!(io, b"hello world");
411 assert_done!(io);
412}
413
414#[test]
415fn write_single_frame_length_adjusted() {
416 let io = length_delimited::Builder::new()
417 .length_adjustment(-2)
418 .new_write(mock! {
419 data(b"\x00\x00\x00\x0b"),
420 data(b"abcdefghi"),
421 flush(),
422 });
423 pin_mut!(io);
424
425 task::spawn(()).enter(|cx, _| {
426 assert_ready_ok!(io.as_mut().poll_ready(cx));
427 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
428 assert_ready_ok!(io.as_mut().poll_flush(cx));
429 assert!(io.get_ref().calls.is_empty());
430 });
431}
432
433#[test]
434fn write_nothing_yields_nothing() {
435 let io = FramedWrite::new(mock!(), LengthDelimitedCodec::new());
436 pin_mut!(io);
437
438 task::spawn(()).enter(|cx, _| {
439 assert_ready_ok!(io.poll_flush(cx));
440 });
441}
442
443#[test]
444fn write_single_frame_one_packet() {
445 let io = FramedWrite::new(
446 mock! {
447 data(b"\x00\x00\x00\x09"),
448 data(b"abcdefghi"),
449 flush(),
450 },
451 LengthDelimitedCodec::new(),
452 );
453 pin_mut!(io);
454
455 task::spawn(()).enter(|cx, _| {
456 assert_ready_ok!(io.as_mut().poll_ready(cx));
457 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
458 assert_ready_ok!(io.as_mut().poll_flush(cx));
459 assert!(io.get_ref().calls.is_empty());
460 });
461}
462
463#[test]
464fn write_single_multi_frame_one_packet() {
465 let io = FramedWrite::new(
466 mock! {
467 data(b"\x00\x00\x00\x09"),
468 data(b"abcdefghi"),
469 data(b"\x00\x00\x00\x03"),
470 data(b"123"),
471 data(b"\x00\x00\x00\x0b"),
472 data(b"hello world"),
473 flush(),
474 },
475 LengthDelimitedCodec::new(),
476 );
477 pin_mut!(io);
478
479 task::spawn(()).enter(|cx, _| {
480 assert_ready_ok!(io.as_mut().poll_ready(cx));
481 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
482
483 assert_ready_ok!(io.as_mut().poll_ready(cx));
484 assert_ok!(io.as_mut().start_send(Bytes::from("123")));
485
486 assert_ready_ok!(io.as_mut().poll_ready(cx));
487 assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
488
489 assert_ready_ok!(io.as_mut().poll_flush(cx));
490 assert!(io.get_ref().calls.is_empty());
491 });
492}
493
494#[test]
495fn write_single_multi_frame_multi_packet() {
496 let io = FramedWrite::new(
497 mock! {
498 data(b"\x00\x00\x00\x09"),
499 data(b"abcdefghi"),
500 flush(),
501 data(b"\x00\x00\x00\x03"),
502 data(b"123"),
503 flush(),
504 data(b"\x00\x00\x00\x0b"),
505 data(b"hello world"),
506 flush(),
507 },
508 LengthDelimitedCodec::new(),
509 );
510 pin_mut!(io);
511
512 task::spawn(()).enter(|cx, _| {
513 assert_ready_ok!(io.as_mut().poll_ready(cx));
514 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
515
516 assert_ready_ok!(io.as_mut().poll_flush(cx));
517
518 assert_ready_ok!(io.as_mut().poll_ready(cx));
519 assert_ok!(io.as_mut().start_send(Bytes::from("123")));
520
521 assert_ready_ok!(io.as_mut().poll_flush(cx));
522
523 assert_ready_ok!(io.as_mut().poll_ready(cx));
524 assert_ok!(io.as_mut().start_send(Bytes::from("hello world")));
525
526 assert_ready_ok!(io.as_mut().poll_flush(cx));
527 assert!(io.get_ref().calls.is_empty());
528 });
529}
530
531#[test]
532fn write_single_frame_would_block() {
533 let io = FramedWrite::new(
534 mock! {
535 Poll::Pending,
536 data(b"\x00\x00"),
537 Poll::Pending,
538 data(b"\x00\x09"),
539 data(b"abcdefghi"),
540 flush(),
541 },
542 LengthDelimitedCodec::new(),
543 );
544 pin_mut!(io);
545
546 task::spawn(()).enter(|cx, _| {
547 assert_ready_ok!(io.as_mut().poll_ready(cx));
548 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
549
550 assert_pending!(io.as_mut().poll_flush(cx));
551 assert_pending!(io.as_mut().poll_flush(cx));
552 assert_ready_ok!(io.as_mut().poll_flush(cx));
553
554 assert!(io.get_ref().calls.is_empty());
555 });
556}
557
558#[test]
559fn write_single_frame_little_endian() {
560 let io = length_delimited::Builder::new()
561 .little_endian()
562 .new_write(mock! {
563 data(b"\x09\x00\x00\x00"),
564 data(b"abcdefghi"),
565 flush(),
566 });
567 pin_mut!(io);
568
569 task::spawn(()).enter(|cx, _| {
570 assert_ready_ok!(io.as_mut().poll_ready(cx));
571 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
572
573 assert_ready_ok!(io.as_mut().poll_flush(cx));
574 assert!(io.get_ref().calls.is_empty());
575 });
576}
577
578#[test]
579fn write_single_frame_with_short_length_field() {
580 let io = length_delimited::Builder::new()
581 .length_field_length(1)
582 .new_write(mock! {
583 data(b"\x09"),
584 data(b"abcdefghi"),
585 flush(),
586 });
587 pin_mut!(io);
588
589 task::spawn(()).enter(|cx, _| {
590 assert_ready_ok!(io.as_mut().poll_ready(cx));
591 assert_ok!(io.as_mut().start_send(Bytes::from("abcdefghi")));
592
593 assert_ready_ok!(io.as_mut().poll_flush(cx));
594
595 assert!(io.get_ref().calls.is_empty());
596 });
597}
598
599#[test]
600fn write_max_frame_len() {
601 let io = length_delimited::Builder::new()
602 .max_frame_length(5)
603 .new_write(mock! {});
604 pin_mut!(io);
605
606 task::spawn(()).enter(|cx, _| {
607 assert_ready_ok!(io.as_mut().poll_ready(cx));
608 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
609
610 assert!(io.get_ref().calls.is_empty());
611 });
612}
613
614#[test]
615fn write_update_max_frame_len_at_rest() {
616 let io = length_delimited::Builder::new().new_write(mock! {
617 data(b"\x00\x00\x00\x06"),
618 data(b"abcdef"),
619 flush(),
620 });
621 pin_mut!(io);
622
623 task::spawn(()).enter(|cx, _| {
624 assert_ready_ok!(io.as_mut().poll_ready(cx));
625 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
626
627 assert_ready_ok!(io.as_mut().poll_flush(cx));
628
629 io.encoder_mut().set_max_frame_length(5);
630
631 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
632
633 assert!(io.get_ref().calls.is_empty());
634 });
635}
636
637#[test]
638fn write_update_max_frame_len_in_flight() {
639 let io = length_delimited::Builder::new().new_write(mock! {
640 data(b"\x00\x00\x00\x06"),
641 data(b"ab"),
642 Poll::Pending,
643 data(b"cdef"),
644 flush(),
645 });
646 pin_mut!(io);
647
648 task::spawn(()).enter(|cx, _| {
649 assert_ready_ok!(io.as_mut().poll_ready(cx));
650 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
651
652 assert_pending!(io.as_mut().poll_flush(cx));
653
654 io.encoder_mut().set_max_frame_length(5);
655
656 assert_ready_ok!(io.as_mut().poll_flush(cx));
657
658 assert_err!(io.as_mut().start_send(Bytes::from("abcdef")));
659 assert!(io.get_ref().calls.is_empty());
660 });
661}
662
663#[test]
664fn write_zero() {
665 let io = length_delimited::Builder::new().new_write(mock! {});
666 pin_mut!(io);
667
668 task::spawn(()).enter(|cx, _| {
669 assert_ready_ok!(io.as_mut().poll_ready(cx));
670 assert_ok!(io.as_mut().start_send(Bytes::from("abcdef")));
671
672 assert_ready_err!(io.as_mut().poll_flush(cx));
673
674 assert!(io.get_ref().calls.is_empty());
675 });
676}
677
678#[test]
679fn encode_overflow() {
680 // Test reproducing tokio-rs/tokio#681.
681 let mut codec = length_delimited::Builder::new().new_codec();
682 let mut buf = BytesMut::with_capacity(1024);
683
684 // Put some data into the buffer without resizing it to hold more.
685 let some_as = std::iter::repeat(b'a').take(1024).collect::<Vec<_>>();
686 buf.put_slice(&some_as[..]);
687
688 // Trying to encode the length header should resize the buffer if it won't fit.
689 codec.encode(Bytes::from("hello"), &mut buf).unwrap();
690}
691
692// ===== Test utils =====
693
694struct Mock {
695 calls: VecDeque<Poll<io::Result<Op>>>,
696}
697
698enum Op {
699 Data(Vec<u8>),
700 Flush,
701}
702
703impl AsyncRead for Mock {
704 fn poll_read(
705 mut self: Pin<&mut Self>,
706 _cx: &mut Context<'_>,
707 dst: &mut ReadBuf<'_>,
708 ) -> Poll<io::Result<()>> {
709 match self.calls.pop_front() {
710 Some(Poll::Ready(Ok(Op::Data(data)))) => {
711 debug_assert!(dst.remaining() >= data.len());
712 dst.put_slice(&data);
713 Poll::Ready(Ok(()))
714 }
715 Some(Poll::Ready(Ok(_))) => panic!(),
716 Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
717 Some(Poll::Pending) => Poll::Pending,
718 None => Poll::Ready(Ok(())),
719 }
720 }
721}
722
723impl AsyncWrite for Mock {
724 fn poll_write(
725 mut self: Pin<&mut Self>,
726 _cx: &mut Context<'_>,
727 src: &[u8],
728 ) -> Poll<Result<usize, io::Error>> {
729 match self.calls.pop_front() {
730 Some(Poll::Ready(Ok(Op::Data(data)))) => {
731 let len = data.len();
732 assert!(src.len() >= len, "expect={:?}; actual={:?}", data, src);
733 assert_eq!(&data[..], &src[..len]);
734 Poll::Ready(Ok(len))
735 }
736 Some(Poll::Ready(Ok(_))) => panic!(),
737 Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
738 Some(Poll::Pending) => Poll::Pending,
739 None => Poll::Ready(Ok(0)),
740 }
741 }
742
743 fn poll_flush(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
744 match self.calls.pop_front() {
745 Some(Poll::Ready(Ok(Op::Flush))) => Poll::Ready(Ok(())),
746 Some(Poll::Ready(Ok(_))) => panic!(),
747 Some(Poll::Ready(Err(e))) => Poll::Ready(Err(e)),
748 Some(Poll::Pending) => Poll::Pending,
749 None => Poll::Ready(Ok(())),
750 }
751 }
752
753 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
754 Poll::Ready(Ok(()))
755 }
756}
757
758impl<'a> From<&'a [u8]> for Op {
759 fn from(src: &'a [u8]) -> Op {
760 Op::Data(src.into())
761 }
762}
763
764impl From<Vec<u8>> for Op {
765 fn from(src: Vec<u8>) -> Op {
766 Op::Data(src)
767 }
768}
769
770fn data(bytes: &[u8]) -> Poll<io::Result<Op>> {
771 Poll::Ready(Ok(bytes.into()))
772}
773
774fn flush() -> Poll<io::Result<Op>> {
775 Poll::Ready(Ok(Op::Flush))
776}
777