1use cfg_if::cfg_if;
2use std::str;
3
4use nix::errno::Errno;
5use nix::mqueue::{
6 mq_attr_member_t, mq_close, mq_open, mq_receive, mq_send, mq_timedreceive,
7};
8use nix::mqueue::{MQ_OFlag, MqAttr};
9use nix::sys::stat::Mode;
10use nix::sys::time::{TimeSpec, TimeValLike};
11use nix::time::{clock_gettime, ClockId};
12
13// Defined as a macro such that the error source is reported as the caller's location.
14macro_rules! assert_attr_eq {
15 ($read_attr:ident, $initial_attr:ident) => {
16 cfg_if! {
17 if #[cfg(any(target_os = "dragonfly", target_os = "netbsd"))] {
18 // NetBSD (and others which inherit its implementation) include other flags
19 // in read_attr, such as those specified by oflag. Just make sure at least
20 // the correct bits are set.
21 assert_eq!($read_attr.flags() & $initial_attr.flags(), $initial_attr.flags());
22 assert_eq!($read_attr.maxmsg(), $initial_attr.maxmsg());
23 assert_eq!($read_attr.msgsize(), $initial_attr.msgsize());
24 assert_eq!($read_attr.curmsgs(), $initial_attr.curmsgs());
25 } else {
26 assert_eq!($read_attr, $initial_attr);
27 }
28 }
29 }
30}
31
32#[test]
33fn test_mq_send_and_receive() {
34 const MSG_SIZE: mq_attr_member_t = 32;
35 let attr = MqAttr::new(0, 10, MSG_SIZE, 0);
36 let mq_name = "/a_nix_test_queue";
37
38 let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
39 let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
40 let r0 = mq_open(mq_name, oflag0, mode, Some(&attr));
41 if let Err(Errno::ENOSYS) = r0 {
42 println!("message queues not supported or module not loaded?");
43 return;
44 };
45 let mqd0 = r0.unwrap();
46 let msg_to_send = "msg_1";
47 mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap();
48
49 let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
50 let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap();
51 let mut buf = [0u8; 32];
52 let mut prio = 0u32;
53 let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap();
54 assert_eq!(prio, 1);
55
56 mq_close(mqd1).unwrap();
57 mq_close(mqd0).unwrap();
58 assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap());
59}
60
61#[test]
62fn test_mq_timedreceive() {
63 const MSG_SIZE: mq_attr_member_t = 32;
64 let attr = MqAttr::new(0, 10, MSG_SIZE, 0);
65 let mq_name = "/a_nix_test_queue";
66
67 let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
68 let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
69 let r0 = mq_open(mq_name, oflag0, mode, Some(&attr));
70 if let Err(Errno::ENOSYS) = r0 {
71 println!("message queues not supported or module not loaded?");
72 return;
73 };
74 let mqd0 = r0.unwrap();
75 let msg_to_send = "msg_1";
76 mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap();
77
78 let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY;
79 let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap();
80 let mut buf = [0u8; 32];
81 let mut prio = 0u32;
82 let abstime =
83 clock_gettime(ClockId::CLOCK_REALTIME).unwrap() + TimeSpec::seconds(1);
84 let len = mq_timedreceive(&mqd1, &mut buf, &mut prio, &abstime).unwrap();
85 assert_eq!(prio, 1);
86
87 mq_close(mqd1).unwrap();
88 mq_close(mqd0).unwrap();
89 assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap());
90}
91
92#[test]
93fn test_mq_getattr() {
94 use nix::mqueue::mq_getattr;
95 const MSG_SIZE: mq_attr_member_t = 32;
96 let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
97 let mq_name = "/attr_test_get_attr";
98 let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
99 let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
100 let r = mq_open(mq_name, oflag, mode, Some(&initial_attr));
101 if let Err(Errno::ENOSYS) = r {
102 println!("message queues not supported or module not loaded?");
103 return;
104 };
105 let mqd = r.unwrap();
106
107 let read_attr = mq_getattr(&mqd).unwrap();
108 assert_attr_eq!(read_attr, initial_attr);
109 mq_close(mqd).unwrap();
110}
111
112// FIXME: Fix failures for mips in QEMU
113#[test]
114#[cfg_attr(
115 all(qemu, any(target_arch = "mips", target_arch = "mips64")),
116 ignore
117)]
118fn test_mq_setattr() {
119 use nix::mqueue::{mq_getattr, mq_setattr};
120 const MSG_SIZE: mq_attr_member_t = 32;
121 let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
122 let mq_name = "/attr_test_get_attr";
123 let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
124 let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
125 let r = mq_open(mq_name, oflag, mode, Some(&initial_attr));
126 if let Err(Errno::ENOSYS) = r {
127 println!("message queues not supported or module not loaded?");
128 return;
129 };
130 let mqd = r.unwrap();
131
132 let new_attr = MqAttr::new(0, 20, MSG_SIZE * 2, 100);
133 let old_attr = mq_setattr(&mqd, &new_attr).unwrap();
134 assert_attr_eq!(old_attr, initial_attr);
135
136 // No changes here because according to the Linux man page only
137 // O_NONBLOCK can be set (see tests below)
138 #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
139 {
140 let new_attr_get = mq_getattr(&mqd).unwrap();
141 assert_ne!(new_attr_get, new_attr);
142 }
143
144 let new_attr_non_blocking = MqAttr::new(
145 MQ_OFlag::O_NONBLOCK.bits() as mq_attr_member_t,
146 10,
147 MSG_SIZE,
148 0,
149 );
150 mq_setattr(&mqd, &new_attr_non_blocking).unwrap();
151 let new_attr_get = mq_getattr(&mqd).unwrap();
152
153 // now the O_NONBLOCK flag has been set
154 #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
155 {
156 assert_ne!(new_attr_get, initial_attr);
157 }
158 assert_attr_eq!(new_attr_get, new_attr_non_blocking);
159 mq_close(mqd).unwrap();
160}
161
162// FIXME: Fix failures for mips in QEMU
163#[test]
164#[cfg_attr(
165 all(qemu, any(target_arch = "mips", target_arch = "mips64")),
166 ignore
167)]
168fn test_mq_set_nonblocking() {
169 use nix::mqueue::{mq_getattr, mq_remove_nonblock, mq_set_nonblock};
170 const MSG_SIZE: mq_attr_member_t = 32;
171 let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
172 let mq_name = "/attr_test_get_attr";
173 let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
174 let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
175 let r = mq_open(mq_name, oflag, mode, Some(&initial_attr));
176 if let Err(Errno::ENOSYS) = r {
177 println!("message queues not supported or module not loaded?");
178 return;
179 };
180 let mqd = r.unwrap();
181 mq_set_nonblock(&mqd).unwrap();
182 let new_attr = mq_getattr(&mqd);
183 let o_nonblock_bits = MQ_OFlag::O_NONBLOCK.bits() as mq_attr_member_t;
184 assert_eq!(new_attr.unwrap().flags() & o_nonblock_bits, o_nonblock_bits);
185 mq_remove_nonblock(&mqd).unwrap();
186 let new_attr = mq_getattr(&mqd);
187 assert_eq!(new_attr.unwrap().flags() & o_nonblock_bits, 0);
188 mq_close(mqd).unwrap();
189}
190
191#[test]
192fn test_mq_unlink() {
193 use nix::mqueue::mq_unlink;
194 const MSG_SIZE: mq_attr_member_t = 32;
195 let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0);
196 let mq_name_opened = "/mq_unlink_test";
197 #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
198 let mq_name_not_opened = "/mq_unlink_test";
199 let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY;
200 let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH;
201 let r = mq_open(mq_name_opened, oflag, mode, Some(&initial_attr));
202 if let Err(Errno::ENOSYS) = r {
203 println!("message queues not supported or module not loaded?");
204 return;
205 };
206 let mqd = r.unwrap();
207
208 let res_unlink = mq_unlink(mq_name_opened);
209 assert_eq!(res_unlink, Ok(()));
210
211 // NetBSD (and others which inherit its implementation) defer removing the message
212 // queue name until all references are closed, whereas Linux and others remove the
213 // message queue name immediately.
214 #[cfg(not(any(target_os = "dragonfly", target_os = "netbsd")))]
215 {
216 let res_unlink_not_opened = mq_unlink(mq_name_not_opened);
217 assert_eq!(res_unlink_not_opened, Err(Errno::ENOENT));
218 }
219
220 mq_close(mqd).unwrap();
221 let res_unlink_after_close = mq_unlink(mq_name_opened);
222 assert_eq!(res_unlink_after_close, Err(Errno::ENOENT));
223}
224