1use std::io;
2
3#[cfg(windows)]
4async fn windows_main() -> io::Result<()> {
5 use std::time::Duration;
6 use tokio::io::{AsyncReadExt, AsyncWriteExt};
7 use tokio::net::windows::named_pipe::{ClientOptions, ServerOptions};
8 use tokio::time;
9 use windows_sys::Win32::Foundation::ERROR_PIPE_BUSY;
10
11 const PIPE_NAME: &str = r"\\.\pipe\named-pipe-multi-client";
12 const N: usize = 10;
13
14 // The first server needs to be constructed early so that clients can
15 // be correctly connected. Otherwise a waiting client will error.
16 //
17 // Here we also make use of `first_pipe_instance`, which will ensure
18 // that there are no other servers up and running already.
19 let mut server = ServerOptions::new()
20 .first_pipe_instance(true)
21 .create(PIPE_NAME)?;
22
23 let server = tokio::spawn(async move {
24 // Artificial workload.
25 time::sleep(Duration::from_secs(1)).await;
26
27 for _ in 0..N {
28 // Wait for client to connect.
29 server.connect().await?;
30 let mut inner = server;
31
32 // Construct the next server to be connected before sending the one
33 // we already have of onto a task. This ensures that the server
34 // isn't closed (after it's done in the task) before a new one is
35 // available. Otherwise the client might error with
36 // `io::ErrorKind::NotFound`.
37 server = ServerOptions::new().create(PIPE_NAME)?;
38
39 let _ = tokio::spawn(async move {
40 let mut buf = vec![0u8; 4];
41 inner.read_exact(&mut buf).await?;
42 inner.write_all(b"pong").await?;
43 Ok::<_, io::Error>(())
44 });
45 }
46
47 Ok::<_, io::Error>(())
48 });
49
50 let mut clients = Vec::new();
51
52 for _ in 0..N {
53 clients.push(tokio::spawn(async move {
54 // This showcases a generic connect loop.
55 //
56 // We immediately try to create a client, if it's not found or
57 // the pipe is busy we use the specialized wait function on the
58 // client builder.
59 let mut client = loop {
60 match ClientOptions::new().open(PIPE_NAME) {
61 Ok(client) => break client,
62 Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
63 Err(e) => return Err(e),
64 }
65
66 time::sleep(Duration::from_millis(5)).await;
67 };
68
69 let mut buf = [0u8; 4];
70 client.write_all(b"ping").await?;
71 client.read_exact(&mut buf).await?;
72 Ok::<_, io::Error>(buf)
73 }));
74 }
75
76 for client in clients {
77 let result = client.await?;
78 assert_eq!(&result?[..], b"pong");
79 }
80
81 server.await??;
82 Ok(())
83}
84
85#[tokio::main]
86async fn main() -> io::Result<()> {
87 #[cfg(windows)]
88 {
89 windows_main().await?;
90 }
91
92 #[cfg(not(windows))]
93 {
94 println!("Named pipes are only supported on Windows!");
95 }
96
97 Ok(())
98}
99