1#![cfg(feature = "full")]
2#![cfg(all(windows))]
3
4use std::io;
5use std::time::Duration;
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::net::windows::named_pipe::{ClientOptions, PipeMode, ServerOptions};
8use tokio::time;
9use windows_sys::Win32::Foundation::{ERROR_NO_DATA, ERROR_PIPE_BUSY};
10
11#[tokio::test]
12async fn test_named_pipe_client_drop() -> io::Result<()> {
13 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-client-drop";
14
15 let mut server = ServerOptions::new().create(PIPE_NAME)?;
16
17 let client = ClientOptions::new().open(PIPE_NAME)?;
18
19 server.connect().await?;
20 drop(client);
21
22 // instance will be broken because client is gone
23 match server.write_all(b"ping").await {
24 Err(e) if e.raw_os_error() == Some(ERROR_NO_DATA as i32) => (),
25 x => panic!("{:?}", x),
26 }
27
28 Ok(())
29}
30
31#[tokio::test]
32async fn test_named_pipe_single_client() -> io::Result<()> {
33 use tokio::io::{AsyncBufReadExt as _, BufReader};
34
35 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-single-client";
36
37 let server = ServerOptions::new().create(PIPE_NAME)?;
38
39 let server = tokio::spawn(async move {
40 // Note: we wait for a client to connect.
41 server.connect().await?;
42
43 let mut server = BufReader::new(server);
44
45 let mut buf = String::new();
46 server.read_line(&mut buf).await?;
47 server.write_all(b"pong\n").await?;
48 Ok::<_, io::Error>(buf)
49 });
50
51 let client = tokio::spawn(async move {
52 let client = ClientOptions::new().open(PIPE_NAME)?;
53
54 let mut client = BufReader::new(client);
55
56 let mut buf = String::new();
57 client.write_all(b"ping\n").await?;
58 client.read_line(&mut buf).await?;
59 Ok::<_, io::Error>(buf)
60 });
61
62 let (server, client) = tokio::try_join!(server, client)?;
63
64 assert_eq!(server?, "ping\n");
65 assert_eq!(client?, "pong\n");
66
67 Ok(())
68}
69
70#[tokio::test]
71async fn test_named_pipe_multi_client() -> io::Result<()> {
72 use tokio::io::{AsyncBufReadExt as _, BufReader};
73
74 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client";
75 const N: usize = 10;
76
77 // The first server needs to be constructed early so that clients can
78 // be correctly connected. Otherwise calling .wait will cause the client to
79 // error.
80 let mut server = ServerOptions::new().create(PIPE_NAME)?;
81
82 let server = tokio::spawn(async move {
83 for _ in 0..N {
84 // Wait for client to connect.
85 server.connect().await?;
86 let mut inner = BufReader::new(server);
87
88 // Construct the next server to be connected before sending the one
89 // we already have of onto a task. This ensures that the server
90 // isn't closed (after it's done in the task) before a new one is
91 // available. Otherwise the client might error with
92 // `io::ErrorKind::NotFound`.
93 server = ServerOptions::new().create(PIPE_NAME)?;
94
95 let _ = tokio::spawn(async move {
96 let mut buf = String::new();
97 inner.read_line(&mut buf).await?;
98 inner.write_all(b"pong\n").await?;
99 inner.flush().await?;
100 Ok::<_, io::Error>(())
101 });
102 }
103
104 Ok::<_, io::Error>(())
105 });
106
107 let mut clients = Vec::new();
108
109 for _ in 0..N {
110 clients.push(tokio::spawn(async move {
111 // This showcases a generic connect loop.
112 //
113 // We immediately try to create a client, if it's not found or the
114 // pipe is busy we use the specialized wait function on the client
115 // builder.
116 let client = loop {
117 match ClientOptions::new().open(PIPE_NAME) {
118 Ok(client) => break client,
119 Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
120 Err(e) if e.kind() == io::ErrorKind::NotFound => (),
121 Err(e) => return Err(e),
122 }
123
124 // Wait for a named pipe to become available.
125 time::sleep(Duration::from_millis(10)).await;
126 };
127
128 let mut client = BufReader::new(client);
129
130 let mut buf = String::new();
131 client.write_all(b"ping\n").await?;
132 client.flush().await?;
133 client.read_line(&mut buf).await?;
134 Ok::<_, io::Error>(buf)
135 }));
136 }
137
138 for client in clients {
139 let result = client.await?;
140 assert_eq!(result?, "pong\n");
141 }
142
143 server.await??;
144 Ok(())
145}
146
147#[tokio::test]
148async fn test_named_pipe_multi_client_ready() -> io::Result<()> {
149 use tokio::io::Interest;
150
151 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-multi-client-ready";
152 const N: usize = 10;
153
154 // The first server needs to be constructed early so that clients can
155 // be correctly connected. Otherwise calling .wait will cause the client to
156 // error.
157 let mut server = ServerOptions::new().create(PIPE_NAME)?;
158
159 let server = tokio::spawn(async move {
160 for _ in 0..N {
161 // Wait for client to connect.
162 server.connect().await?;
163
164 let inner_server = server;
165
166 // Construct the next server to be connected before sending the one
167 // we already have of onto a task. This ensures that the server
168 // isn't closed (after it's done in the task) before a new one is
169 // available. Otherwise the client might error with
170 // `io::ErrorKind::NotFound`.
171 server = ServerOptions::new().create(PIPE_NAME)?;
172
173 let _ = tokio::spawn(async move {
174 let server = inner_server;
175
176 {
177 let mut read_buf = [0u8; 5];
178 let mut read_buf_cursor = 0;
179
180 loop {
181 server.readable().await?;
182
183 let buf = &mut read_buf[read_buf_cursor..];
184
185 match server.try_read(buf) {
186 Ok(n) => {
187 read_buf_cursor += n;
188
189 if read_buf_cursor == read_buf.len() {
190 break;
191 }
192 }
193 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
194 continue;
195 }
196 Err(e) => {
197 return Err(e);
198 }
199 }
200 }
201 };
202
203 {
204 let write_buf = b"pong\n";
205 let mut write_buf_cursor = 0;
206
207 loop {
208 server.writable().await?;
209 let buf = &write_buf[write_buf_cursor..];
210
211 match server.try_write(buf) {
212 Ok(n) => {
213 write_buf_cursor += n;
214
215 if write_buf_cursor == write_buf.len() {
216 break;
217 }
218 }
219 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
220 continue;
221 }
222 Err(e) => {
223 return Err(e);
224 }
225 }
226 }
227 }
228
229 Ok::<_, io::Error>(())
230 });
231 }
232
233 Ok::<_, io::Error>(())
234 });
235
236 let mut clients = Vec::new();
237
238 for _ in 0..N {
239 clients.push(tokio::spawn(async move {
240 // This showcases a generic connect loop.
241 //
242 // We immediately try to create a client, if it's not found or the
243 // pipe is busy we use the specialized wait function on the client
244 // builder.
245 let client = loop {
246 match ClientOptions::new().open(PIPE_NAME) {
247 Ok(client) => break client,
248 Err(e) if e.raw_os_error() == Some(ERROR_PIPE_BUSY as i32) => (),
249 Err(e) if e.kind() == io::ErrorKind::NotFound => (),
250 Err(e) => return Err(e),
251 }
252
253 // Wait for a named pipe to become available.
254 time::sleep(Duration::from_millis(10)).await;
255 };
256
257 let mut read_buf = [0u8; 5];
258 let mut read_buf_cursor = 0;
259 let write_buf = b"ping\n";
260 let mut write_buf_cursor = 0;
261
262 loop {
263 let mut interest = Interest::READABLE;
264 if write_buf_cursor < write_buf.len() {
265 interest |= Interest::WRITABLE;
266 }
267
268 let ready = client.ready(interest).await?;
269
270 if ready.is_readable() {
271 let buf = &mut read_buf[read_buf_cursor..];
272
273 match client.try_read(buf) {
274 Ok(n) => {
275 read_buf_cursor += n;
276
277 if read_buf_cursor == read_buf.len() {
278 break;
279 }
280 }
281 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
282 continue;
283 }
284 Err(e) => {
285 return Err(e);
286 }
287 }
288 }
289
290 if ready.is_writable() {
291 let buf = &write_buf[write_buf_cursor..];
292
293 if buf.is_empty() {
294 continue;
295 }
296
297 match client.try_write(buf) {
298 Ok(n) => {
299 write_buf_cursor += n;
300 }
301 Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
302 continue;
303 }
304 Err(e) => {
305 return Err(e);
306 }
307 }
308 }
309 }
310
311 let buf = String::from_utf8_lossy(&read_buf).into_owned();
312
313 Ok::<_, io::Error>(buf)
314 }));
315 }
316
317 for client in clients {
318 let result = client.await?;
319 assert_eq!(result?, "pong\n");
320 }
321
322 server.await??;
323 Ok(())
324}
325
326// This tests that message mode works as expected.
327#[tokio::test]
328async fn test_named_pipe_mode_message() -> io::Result<()> {
329 // it's easy to accidentally get a seemingly working test here because byte pipes
330 // often return contents at write boundaries. to make sure we're doing the right thing we
331 // explicitly test that it doesn't work in byte mode.
332 _named_pipe_mode_message(PipeMode::Message).await?;
333 _named_pipe_mode_message(PipeMode::Byte).await
334}
335
336async fn _named_pipe_mode_message(mode: PipeMode) -> io::Result<()> {
337 let pipe_name = format!(
338 r"\\.\pipe\test-named-pipe-mode-message-{}",
339 matches!(mode, PipeMode::Message)
340 );
341 let mut buf = [0u8; 32];
342
343 let mut server = ServerOptions::new()
344 .first_pipe_instance(true)
345 .pipe_mode(mode)
346 .create(&pipe_name)?;
347
348 let mut client = ClientOptions::new().pipe_mode(mode).open(&pipe_name)?;
349
350 server.connect().await?;
351
352 // this needs a few iterations, presumably Windows waits for a few calls before merging buffers
353 for _ in 0..10 {
354 client.write_all(b"hello").await?;
355 server.write_all(b"world").await?;
356 }
357 for _ in 0..10 {
358 let n = server.read(&mut buf).await?;
359 if buf[..n] != b"hello"[..] {
360 assert!(matches!(mode, PipeMode::Byte));
361 return Ok(());
362 }
363 let n = client.read(&mut buf).await?;
364 if buf[..n] != b"world"[..] {
365 assert!(matches!(mode, PipeMode::Byte));
366 return Ok(());
367 }
368 }
369 // byte mode should have errored before.
370 assert!(matches!(mode, PipeMode::Message));
371 Ok(())
372}
373
374// This tests `NamedPipeServer::connect` with various access settings.
375#[tokio::test]
376async fn test_named_pipe_access() -> io::Result<()> {
377 const PIPE_NAME: &str = r"\\.\pipe\test-named-pipe-access";
378
379 for (inb, outb) in [(true, true), (true, false), (false, true)] {
380 let (tx, rx) = tokio::sync::oneshot::channel();
381 let server = tokio::spawn(async move {
382 let s = ServerOptions::new()
383 .access_inbound(inb)
384 .access_outbound(outb)
385 .create(PIPE_NAME)?;
386 let mut connect_fut = tokio_test::task::spawn(s.connect());
387 assert!(connect_fut.poll().is_pending());
388 tx.send(()).unwrap();
389 connect_fut.await
390 });
391
392 // Wait for the server to call connect.
393 rx.await.unwrap();
394 let _ = ClientOptions::new().read(outb).write(inb).open(PIPE_NAME)?;
395
396 server.await??;
397 }
398 Ok(())
399}
400