1 | //! Adapters for async IO objects |
2 | //! |
3 | //! This module mainly hosts the [`Async`] adapter for making IO objects async with readiness |
4 | //! monitoring backed by an [`EventLoop`](crate::EventLoop). See [`LoopHandle::adapt_io`] for |
5 | //! how to create them. |
6 | //! |
7 | //! [`LoopHandle::adapt_io`]: crate::LoopHandle#method.adapt_io |
8 | |
9 | use std::cell::RefCell; |
10 | use std::pin::Pin; |
11 | use std::rc::Rc; |
12 | use std::task::{Context, Poll as TaskPoll, Waker}; |
13 | |
14 | #[cfg (unix)] |
15 | use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd}; |
16 | #[cfg (windows)] |
17 | use std::os::windows::io::{ |
18 | AsRawSocket as AsRawFd, AsSocket as AsFd, BorrowedSocket as BorrowedFd, RawSocket as RawFd, |
19 | }; |
20 | |
21 | #[cfg (feature = "futures-io" )] |
22 | use futures_io::{AsyncRead, AsyncWrite, IoSlice, IoSliceMut}; |
23 | |
24 | use crate::loop_logic::EventIterator; |
25 | use crate::{ |
26 | loop_logic::LoopInner, sources::EventDispatcher, Interest, Mode, Poll, PostAction, Readiness, |
27 | Token, TokenFactory, |
28 | }; |
29 | use crate::{AdditionalLifecycleEventsSet, RegistrationToken}; |
30 | |
31 | /// Adapter for async IO manipulations |
32 | /// |
33 | /// This type wraps an IO object, providing methods to create futures waiting for its |
34 | /// readiness. |
35 | /// |
36 | /// If the `futures-io` cargo feature is enabled, it also implements `AsyncRead` and/or |
37 | /// `AsyncWrite` if the underlying type implements `Read` and/or `Write`. |
38 | /// |
39 | /// Note that this adapter and the futures procuded from it and *not* threadsafe. |
40 | /// |
41 | /// ## Platform-Specific |
42 | /// |
43 | /// - **Windows:** Usually, on drop, the file descriptor is set back to its previous status. |
44 | /// For example, if the file was previously nonblocking it will be set to nonblocking, and |
45 | /// if the file was blocking it will be set to blocking. However, on Windows, it is impossible |
46 | /// to tell what its status was before. Therefore it will always be set to blocking. |
47 | pub struct Async<'l, F: AsFd> { |
48 | fd: Option<F>, |
49 | dispatcher: Rc<RefCell<IoDispatcher>>, |
50 | inner: Rc<dyn IoLoopInner + 'l>, |
51 | was_nonblocking: bool, |
52 | } |
53 | |
54 | impl<'l, F: AsFd + std::fmt::Debug> std::fmt::Debug for Async<'l, F> { |
55 | #[cfg_attr (feature = "nightly_coverage" , coverage(off))] |
56 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
57 | f.debug_struct("Async" ).field(name:"fd" , &self.fd).finish() |
58 | } |
59 | } |
60 | |
61 | impl<'l, F: AsFd> Async<'l, F> { |
62 | pub(crate) fn new<Data>(inner: Rc<LoopInner<'l, Data>>, fd: F) -> crate::Result<Async<'l, F>> { |
63 | // set non-blocking |
64 | let was_nonblocking = set_nonblocking( |
65 | #[cfg (unix)] |
66 | fd.as_fd(), |
67 | #[cfg (windows)] |
68 | fd.as_socket(), |
69 | true, |
70 | )?; |
71 | // register in the loop |
72 | let dispatcher = Rc::new(RefCell::new(IoDispatcher { |
73 | #[cfg (unix)] |
74 | fd: fd.as_fd().as_raw_fd(), |
75 | #[cfg (windows)] |
76 | fd: fd.as_socket().as_raw_socket(), |
77 | token: None, |
78 | waker: None, |
79 | is_registered: false, |
80 | interest: Interest::EMPTY, |
81 | last_readiness: Readiness::EMPTY, |
82 | })); |
83 | |
84 | { |
85 | let mut sources = inner.sources.borrow_mut(); |
86 | let slot = sources.vacant_entry(); |
87 | slot.source = Some(dispatcher.clone()); |
88 | dispatcher.borrow_mut().token = Some(Token { inner: slot.token }); |
89 | } |
90 | |
91 | // SAFETY: We are sure to deregister on drop. |
92 | unsafe { |
93 | inner.register(&dispatcher)?; |
94 | } |
95 | |
96 | // Straightforward casting would require us to add the bound `Data: 'l` but we don't actually need it |
97 | // as this module never accesses the dispatch data, so we use transmute to erase it |
98 | let inner: Rc<dyn IoLoopInner + 'l> = |
99 | unsafe { std::mem::transmute(inner as Rc<dyn IoLoopInner>) }; |
100 | |
101 | Ok(Async { |
102 | fd: Some(fd), |
103 | dispatcher, |
104 | inner, |
105 | was_nonblocking, |
106 | }) |
107 | } |
108 | |
109 | /// Mutably access the underlying IO object |
110 | pub fn get_mut(&mut self) -> &mut F { |
111 | self.fd.as_mut().unwrap() |
112 | } |
113 | |
114 | /// A future that resolves once the object becomes ready for reading |
115 | pub fn readable<'s>(&'s mut self) -> Readable<'s, 'l, F> { |
116 | Readable { io: self } |
117 | } |
118 | |
119 | /// A future that resolves once the object becomes ready for writing |
120 | pub fn writable<'s>(&'s mut self) -> Writable<'s, 'l, F> { |
121 | Writable { io: self } |
122 | } |
123 | |
124 | /// Remove the async adapter and retrieve the underlying object |
125 | pub fn into_inner(mut self) -> F { |
126 | self.fd.take().unwrap() |
127 | } |
128 | |
129 | fn readiness(&self) -> Readiness { |
130 | self.dispatcher.borrow_mut().readiness() |
131 | } |
132 | |
133 | fn register_waker(&self, interest: Interest, waker: Waker) -> crate::Result<()> { |
134 | { |
135 | let mut disp = self.dispatcher.borrow_mut(); |
136 | disp.interest = interest; |
137 | disp.waker = Some(waker); |
138 | } |
139 | self.inner.reregister(&self.dispatcher) |
140 | } |
141 | } |
142 | |
143 | /// A future that resolves once the associated object becomes ready for reading |
144 | #[derive (Debug)] |
145 | pub struct Readable<'s, 'l, F: AsFd> { |
146 | io: &'s mut Async<'l, F>, |
147 | } |
148 | |
149 | impl<'s, 'l, F: AsFd> std::future::Future for Readable<'s, 'l, F> { |
150 | type Output = (); |
151 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<()> { |
152 | let io: &mut &mut Async<'_, F> = &mut self.as_mut().io; |
153 | let readiness: Readiness = io.readiness(); |
154 | if readiness.readable || readiness.error { |
155 | TaskPoll::Ready(()) |
156 | } else { |
157 | let _ = io.register_waker(interest:Interest::READ, waker:cx.waker().clone()); |
158 | TaskPoll::Pending |
159 | } |
160 | } |
161 | } |
162 | |
163 | /// A future that resolves once the associated object becomes ready for writing |
164 | #[derive (Debug)] |
165 | pub struct Writable<'s, 'l, F: AsFd> { |
166 | io: &'s mut Async<'l, F>, |
167 | } |
168 | |
169 | impl<'s, 'l, F: AsFd> std::future::Future for Writable<'s, 'l, F> { |
170 | type Output = (); |
171 | fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<()> { |
172 | let io: &mut &mut Async<'_, F> = &mut self.as_mut().io; |
173 | let readiness: Readiness = io.readiness(); |
174 | if readiness.writable || readiness.error { |
175 | TaskPoll::Ready(()) |
176 | } else { |
177 | let _ = io.register_waker(interest:Interest::WRITE, waker:cx.waker().clone()); |
178 | TaskPoll::Pending |
179 | } |
180 | } |
181 | } |
182 | |
183 | impl<'l, F: AsFd> Drop for Async<'l, F> { |
184 | fn drop(&mut self) { |
185 | self.inner.kill(&self.dispatcher); |
186 | // restore flags |
187 | let _ = set_nonblocking( |
188 | fd:unsafe { BorrowedFd::borrow_raw(self.dispatcher.borrow().fd) }, |
189 | self.was_nonblocking, |
190 | ); |
191 | } |
192 | } |
193 | |
194 | impl<'l, F: AsFd> Unpin for Async<'l, F> {} |
195 | |
196 | trait IoLoopInner { |
197 | unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>; |
198 | fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()>; |
199 | fn kill(&self, dispatcher: &RefCell<IoDispatcher>); |
200 | } |
201 | |
202 | impl<'l, Data> IoLoopInner for LoopInner<'l, Data> { |
203 | unsafe fn register(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> { |
204 | let disp = dispatcher.borrow(); |
205 | self.poll.borrow_mut().register( |
206 | unsafe { BorrowedFd::borrow_raw(disp.fd) }, |
207 | Interest::EMPTY, |
208 | Mode::OneShot, |
209 | disp.token.expect("No token for IO dispatcher" ), |
210 | ) |
211 | } |
212 | |
213 | fn reregister(&self, dispatcher: &RefCell<IoDispatcher>) -> crate::Result<()> { |
214 | let disp = dispatcher.borrow(); |
215 | self.poll.borrow_mut().reregister( |
216 | unsafe { BorrowedFd::borrow_raw(disp.fd) }, |
217 | disp.interest, |
218 | Mode::OneShot, |
219 | disp.token.expect("No token for IO dispatcher" ), |
220 | ) |
221 | } |
222 | |
223 | fn kill(&self, dispatcher: &RefCell<IoDispatcher>) { |
224 | let token = dispatcher |
225 | .borrow() |
226 | .token |
227 | .expect("No token for IO dispatcher" ); |
228 | if let Ok(slot) = self.sources.borrow_mut().get_mut(token.inner) { |
229 | slot.source = None; |
230 | } |
231 | } |
232 | } |
233 | |
234 | struct IoDispatcher { |
235 | fd: RawFd, // FIXME: `BorrowedFd`? How to statically verify it doesn't outlive file? |
236 | token: Option<Token>, |
237 | waker: Option<Waker>, |
238 | is_registered: bool, |
239 | interest: Interest, |
240 | last_readiness: Readiness, |
241 | } |
242 | |
243 | impl IoDispatcher { |
244 | fn readiness(&mut self) -> Readiness { |
245 | std::mem::replace(&mut self.last_readiness, src:Readiness::EMPTY) |
246 | } |
247 | } |
248 | |
249 | impl<Data> EventDispatcher<Data> for RefCell<IoDispatcher> { |
250 | fn process_events( |
251 | &self, |
252 | readiness: Readiness, |
253 | _token: Token, |
254 | _data: &mut Data, |
255 | ) -> crate::Result<PostAction> { |
256 | let mut disp = self.borrow_mut(); |
257 | disp.last_readiness = readiness; |
258 | if let Some(waker) = disp.waker.take() { |
259 | waker.wake(); |
260 | } |
261 | Ok(PostAction::Continue) |
262 | } |
263 | |
264 | fn register( |
265 | &self, |
266 | _: &mut Poll, |
267 | _: &mut AdditionalLifecycleEventsSet, |
268 | _: &mut TokenFactory, |
269 | ) -> crate::Result<()> { |
270 | // registration is handled by IoLoopInner |
271 | unreachable!() |
272 | } |
273 | |
274 | fn reregister( |
275 | &self, |
276 | _: &mut Poll, |
277 | _: &mut AdditionalLifecycleEventsSet, |
278 | _: &mut TokenFactory, |
279 | ) -> crate::Result<bool> { |
280 | // registration is handled by IoLoopInner |
281 | unreachable!() |
282 | } |
283 | |
284 | fn unregister( |
285 | &self, |
286 | poll: &mut Poll, |
287 | _: &mut AdditionalLifecycleEventsSet, |
288 | _: RegistrationToken, |
289 | ) -> crate::Result<bool> { |
290 | let disp = self.borrow(); |
291 | if disp.is_registered { |
292 | poll.unregister(unsafe { BorrowedFd::borrow_raw(disp.fd) })?; |
293 | } |
294 | Ok(true) |
295 | } |
296 | |
297 | fn before_sleep(&self) -> crate::Result<Option<(Readiness, Token)>> { |
298 | Ok(None) |
299 | } |
300 | fn before_handle_events(&self, _: EventIterator<'_>) {} |
301 | } |
302 | |
303 | /* |
304 | * Async IO trait implementations |
305 | */ |
306 | |
307 | #[cfg (feature = "futures-io" )] |
308 | #[cfg_attr (docsrs, doc(cfg(feature = "futures-io" )))] |
309 | impl<'l, F: AsFd + std::io::Read> AsyncRead for Async<'l, F> { |
310 | fn poll_read( |
311 | mut self: Pin<&mut Self>, |
312 | cx: &mut Context<'_>, |
313 | buf: &mut [u8], |
314 | ) -> TaskPoll<std::io::Result<usize>> { |
315 | match (*self).get_mut().read(buf) { |
316 | Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {} |
317 | res => return TaskPoll::Ready(res), |
318 | } |
319 | self.register_waker(Interest::READ, cx.waker().clone())?; |
320 | TaskPoll::Pending |
321 | } |
322 | |
323 | fn poll_read_vectored( |
324 | mut self: Pin<&mut Self>, |
325 | cx: &mut Context<'_>, |
326 | bufs: &mut [IoSliceMut<'_>], |
327 | ) -> TaskPoll<std::io::Result<usize>> { |
328 | match (*self).get_mut().read_vectored(bufs) { |
329 | Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {} |
330 | res => return TaskPoll::Ready(res), |
331 | } |
332 | self.register_waker(Interest::READ, cx.waker().clone())?; |
333 | TaskPoll::Pending |
334 | } |
335 | } |
336 | |
337 | #[cfg (feature = "futures-io" )] |
338 | #[cfg_attr (docsrs, doc(cfg(feature = "futures-io" )))] |
339 | impl<'l, F: AsFd + std::io::Write> AsyncWrite for Async<'l, F> { |
340 | fn poll_write( |
341 | mut self: Pin<&mut Self>, |
342 | cx: &mut Context<'_>, |
343 | buf: &[u8], |
344 | ) -> TaskPoll<std::io::Result<usize>> { |
345 | match (*self).get_mut().write(buf) { |
346 | Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {} |
347 | res => return TaskPoll::Ready(res), |
348 | } |
349 | self.register_waker(Interest::WRITE, cx.waker().clone())?; |
350 | TaskPoll::Pending |
351 | } |
352 | |
353 | fn poll_write_vectored( |
354 | mut self: Pin<&mut Self>, |
355 | cx: &mut Context<'_>, |
356 | bufs: &[IoSlice<'_>], |
357 | ) -> TaskPoll<std::io::Result<usize>> { |
358 | match (*self).get_mut().write_vectored(bufs) { |
359 | Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {} |
360 | res => return TaskPoll::Ready(res), |
361 | } |
362 | self.register_waker(Interest::WRITE, cx.waker().clone())?; |
363 | TaskPoll::Pending |
364 | } |
365 | |
366 | fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<std::io::Result<()>> { |
367 | match (*self).get_mut().flush() { |
368 | Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {} |
369 | res => return TaskPoll::Ready(res), |
370 | } |
371 | self.register_waker(Interest::WRITE, cx.waker().clone())?; |
372 | TaskPoll::Pending |
373 | } |
374 | |
375 | fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> TaskPoll<std::io::Result<()>> { |
376 | self.poll_flush(cx) |
377 | } |
378 | } |
379 | |
380 | // https://github.com/smol-rs/async-io/blob/6499077421495f2200d5b86918399f3a84bbe8e4/src/lib.rs#L2171-L2195 |
381 | /// Set the nonblocking status of an FD and return whether it was nonblocking before. |
382 | #[allow (clippy::needless_return)] |
383 | #[inline ] |
384 | fn set_nonblocking(fd: BorrowedFd<'_>, is_nonblocking: bool) -> std::io::Result<bool> { |
385 | #[cfg (windows)] |
386 | { |
387 | rustix::io::ioctl_fionbio(fd, is_nonblocking)?; |
388 | |
389 | // Unfortunately it is impossible to tell if a socket was nonblocking on Windows. |
390 | // Just say it wasn't for now. |
391 | return Ok(false); |
392 | } |
393 | |
394 | #[cfg (not(windows))] |
395 | { |
396 | let previous = rustix::fs::fcntl_getfl(fd)?; |
397 | let new = if is_nonblocking { |
398 | previous | rustix::fs::OFlags::NONBLOCK |
399 | } else { |
400 | previous & !(rustix::fs::OFlags::NONBLOCK) |
401 | }; |
402 | if new != previous { |
403 | rustix::fs::fcntl_setfl(fd, new)?; |
404 | } |
405 | |
406 | return Ok(previous.contains(rustix::fs::OFlags::NONBLOCK)); |
407 | } |
408 | } |
409 | |
410 | #[cfg (all(test, unix, feature = "executor" , feature = "futures-io" ))] |
411 | mod tests { |
412 | use futures::io::{AsyncReadExt, AsyncWriteExt}; |
413 | |
414 | use crate::sources::futures::executor; |
415 | |
416 | #[test ] |
417 | fn read_write() { |
418 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
419 | let handle = event_loop.handle(); |
420 | let (exec, sched) = executor().unwrap(); |
421 | handle |
422 | .insert_source(exec, move |ret, &mut (), got| { |
423 | *got = ret; |
424 | }) |
425 | .unwrap(); |
426 | |
427 | let (tx, rx) = std::os::unix::net::UnixStream::pair().unwrap(); |
428 | let mut tx = handle.adapt_io(tx).unwrap(); |
429 | let mut rx = handle.adapt_io(rx).unwrap(); |
430 | let received = std::rc::Rc::new(std::cell::Cell::new(false)); |
431 | let fut_received = received.clone(); |
432 | |
433 | sched |
434 | .schedule(async move { |
435 | let mut buf = [0; 12]; |
436 | rx.read_exact(&mut buf).await.unwrap(); |
437 | assert_eq!(&buf, b"Hello World!" ); |
438 | fut_received.set(true); |
439 | }) |
440 | .unwrap(); |
441 | |
442 | // The receiving future alone cannot advance |
443 | event_loop |
444 | .dispatch(Some(std::time::Duration::from_millis(10)), &mut ()) |
445 | .unwrap(); |
446 | assert!(!received.get()); |
447 | |
448 | // schedule the writing future as well and wait until finish |
449 | sched |
450 | .schedule(async move { |
451 | tx.write_all(b"Hello World!" ).await.unwrap(); |
452 | tx.flush().await.unwrap(); |
453 | }) |
454 | .unwrap(); |
455 | |
456 | while !received.get() { |
457 | event_loop.dispatch(None, &mut ()).unwrap(); |
458 | } |
459 | } |
460 | |
461 | #[test ] |
462 | fn read_write_vectored() { |
463 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
464 | let handle = event_loop.handle(); |
465 | let (exec, sched) = executor().unwrap(); |
466 | handle |
467 | .insert_source(exec, move |ret, &mut (), got| { |
468 | *got = ret; |
469 | }) |
470 | .unwrap(); |
471 | |
472 | let (tx, rx) = std::os::unix::net::UnixStream::pair().unwrap(); |
473 | let mut tx = handle.adapt_io(tx).unwrap(); |
474 | let mut rx = handle.adapt_io(rx).unwrap(); |
475 | let received = std::rc::Rc::new(std::cell::Cell::new(false)); |
476 | let fut_received = received.clone(); |
477 | |
478 | sched |
479 | .schedule(async move { |
480 | let mut buf = [0; 12]; |
481 | let mut ioslices = buf |
482 | .chunks_mut(2) |
483 | .map(std::io::IoSliceMut::new) |
484 | .collect::<Vec<_>>(); |
485 | let count = rx.read_vectored(&mut ioslices).await.unwrap(); |
486 | assert_eq!(count, 12); |
487 | assert_eq!(&buf, b"Hello World!" ); |
488 | fut_received.set(true); |
489 | }) |
490 | .unwrap(); |
491 | |
492 | // The receiving future alone cannot advance |
493 | event_loop |
494 | .dispatch(Some(std::time::Duration::from_millis(10)), &mut ()) |
495 | .unwrap(); |
496 | assert!(!received.get()); |
497 | |
498 | // schedule the writing future as well and wait until finish |
499 | sched |
500 | .schedule(async move { |
501 | let buf = b"Hello World!" ; |
502 | let ioslices = buf.chunks(2).map(std::io::IoSlice::new).collect::<Vec<_>>(); |
503 | let count = tx.write_vectored(&ioslices).await.unwrap(); |
504 | assert_eq!(count, 12); |
505 | tx.flush().await.unwrap(); |
506 | }) |
507 | .unwrap(); |
508 | |
509 | while !received.get() { |
510 | event_loop.dispatch(None, &mut ()).unwrap(); |
511 | } |
512 | } |
513 | |
514 | #[test ] |
515 | fn readable() { |
516 | use std::io::Write; |
517 | |
518 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
519 | let handle = event_loop.handle(); |
520 | let (exec, sched) = executor().unwrap(); |
521 | handle |
522 | .insert_source(exec, move |(), &mut (), got| { |
523 | *got = true; |
524 | }) |
525 | .unwrap(); |
526 | |
527 | let (mut tx, rx) = std::os::unix::net::UnixStream::pair().unwrap(); |
528 | |
529 | let mut rx = handle.adapt_io(rx).unwrap(); |
530 | sched |
531 | .schedule(async move { |
532 | rx.readable().await; |
533 | }) |
534 | .unwrap(); |
535 | |
536 | let mut dispatched = false; |
537 | |
538 | event_loop |
539 | .dispatch(Some(std::time::Duration::from_millis(100)), &mut dispatched) |
540 | .unwrap(); |
541 | // The socket is not yet readable, so the readable() future has not completed |
542 | assert!(!dispatched); |
543 | |
544 | tx.write_all(&[42]).unwrap(); |
545 | tx.flush().unwrap(); |
546 | |
547 | // Now we should become readable |
548 | while !dispatched { |
549 | event_loop.dispatch(None, &mut dispatched).unwrap(); |
550 | } |
551 | } |
552 | |
553 | #[test ] |
554 | fn writable() { |
555 | use std::io::{BufReader, BufWriter, Read, Write}; |
556 | |
557 | let mut event_loop = crate::EventLoop::try_new().unwrap(); |
558 | let handle = event_loop.handle(); |
559 | let (exec, sched) = executor().unwrap(); |
560 | handle |
561 | .insert_source(exec, move |(), &mut (), got| { |
562 | *got = true; |
563 | }) |
564 | .unwrap(); |
565 | |
566 | let (mut tx, mut rx) = std::os::unix::net::UnixStream::pair().unwrap(); |
567 | tx.set_nonblocking(true).unwrap(); |
568 | rx.set_nonblocking(true).unwrap(); |
569 | |
570 | // First, fill the socket buffers |
571 | { |
572 | let mut writer = BufWriter::new(&mut tx); |
573 | let data = vec![42u8; 1024]; |
574 | loop { |
575 | if writer.write(&data).is_err() { |
576 | break; |
577 | } |
578 | } |
579 | } |
580 | |
581 | // Now, wait for it to be readable |
582 | let mut tx = handle.adapt_io(tx).unwrap(); |
583 | sched |
584 | .schedule(async move { |
585 | tx.writable().await; |
586 | }) |
587 | .unwrap(); |
588 | |
589 | let mut dispatched = false; |
590 | |
591 | event_loop |
592 | .dispatch(Some(std::time::Duration::from_millis(100)), &mut dispatched) |
593 | .unwrap(); |
594 | // The socket is not yet writable, so the readable() future has not completed |
595 | assert!(!dispatched); |
596 | |
597 | // now read everything |
598 | { |
599 | let mut reader = BufReader::new(&mut rx); |
600 | let mut buffer = vec![0u8; 1024]; |
601 | loop { |
602 | if reader.read(&mut buffer).is_err() { |
603 | break; |
604 | } |
605 | } |
606 | } |
607 | |
608 | // Now we should become writable |
609 | while !dispatched { |
610 | event_loop.dispatch(None, &mut dispatched).unwrap(); |
611 | } |
612 | } |
613 | } |
614 | |