| 1 | use cfg_if::cfg_if; |
| 2 | use std::str; |
| 3 | |
| 4 | use nix::errno::Errno; |
| 5 | use nix::mqueue::{ |
| 6 | mq_attr_member_t, mq_close, mq_open, mq_receive, mq_send, mq_timedreceive, |
| 7 | }; |
| 8 | use nix::mqueue::{MQ_OFlag, MqAttr}; |
| 9 | use nix::sys::stat::Mode; |
| 10 | use nix::sys::time::{TimeSpec, TimeValLike}; |
| 11 | use nix::time::{clock_gettime, ClockId}; |
| 12 | |
| 13 | // Defined as a macro such that the error source is reported as the caller's location. |
| 14 | macro_rules! assert_attr_eq { |
| 15 | ($read_attr:ident, $initial_attr:ident) => { |
| 16 | cfg_if! { |
| 17 | if #[cfg(any(target_os = "dragonfly" , target_os = "netbsd" ))] { |
| 18 | // NetBSD (and others which inherit its implementation) include other flags |
| 19 | // in read_attr, such as those specified by oflag. Just make sure at least |
| 20 | // the correct bits are set. |
| 21 | assert_eq!($read_attr.flags() & $initial_attr.flags(), $initial_attr.flags()); |
| 22 | assert_eq!($read_attr.maxmsg(), $initial_attr.maxmsg()); |
| 23 | assert_eq!($read_attr.msgsize(), $initial_attr.msgsize()); |
| 24 | assert_eq!($read_attr.curmsgs(), $initial_attr.curmsgs()); |
| 25 | } else { |
| 26 | assert_eq!($read_attr, $initial_attr); |
| 27 | } |
| 28 | } |
| 29 | } |
| 30 | } |
| 31 | |
| 32 | #[test] |
| 33 | fn test_mq_send_and_receive() { |
| 34 | const MSG_SIZE: mq_attr_member_t = 32; |
| 35 | let attr = MqAttr::new(0, 10, MSG_SIZE, 0); |
| 36 | let mq_name = "/a_nix_test_queue" ; |
| 37 | |
| 38 | let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; |
| 39 | let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; |
| 40 | let r0 = mq_open(mq_name, oflag0, mode, Some(&attr)); |
| 41 | if let Err(Errno::ENOSYS) = r0 { |
| 42 | println!("message queues not supported or module not loaded?" ); |
| 43 | return; |
| 44 | }; |
| 45 | let mqd0 = r0.unwrap(); |
| 46 | let msg_to_send = "msg_1" ; |
| 47 | mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap(); |
| 48 | |
| 49 | let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY; |
| 50 | let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap(); |
| 51 | let mut buf = [0u8; 32]; |
| 52 | let mut prio = 0u32; |
| 53 | let len = mq_receive(&mqd1, &mut buf, &mut prio).unwrap(); |
| 54 | assert_eq!(prio, 1); |
| 55 | |
| 56 | mq_close(mqd1).unwrap(); |
| 57 | mq_close(mqd0).unwrap(); |
| 58 | assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap()); |
| 59 | } |
| 60 | |
| 61 | #[test] |
| 62 | fn test_mq_timedreceive() { |
| 63 | const MSG_SIZE: mq_attr_member_t = 32; |
| 64 | let attr = MqAttr::new(0, 10, MSG_SIZE, 0); |
| 65 | let mq_name = "/a_nix_test_queue" ; |
| 66 | |
| 67 | let oflag0 = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; |
| 68 | let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; |
| 69 | let r0 = mq_open(mq_name, oflag0, mode, Some(&attr)); |
| 70 | if let Err(Errno::ENOSYS) = r0 { |
| 71 | println!("message queues not supported or module not loaded?" ); |
| 72 | return; |
| 73 | }; |
| 74 | let mqd0 = r0.unwrap(); |
| 75 | let msg_to_send = "msg_1" ; |
| 76 | mq_send(&mqd0, msg_to_send.as_bytes(), 1).unwrap(); |
| 77 | |
| 78 | let oflag1 = MQ_OFlag::O_CREAT | MQ_OFlag::O_RDONLY; |
| 79 | let mqd1 = mq_open(mq_name, oflag1, mode, Some(&attr)).unwrap(); |
| 80 | let mut buf = [0u8; 32]; |
| 81 | let mut prio = 0u32; |
| 82 | let abstime = |
| 83 | clock_gettime(ClockId::CLOCK_REALTIME).unwrap() + TimeSpec::seconds(1); |
| 84 | let len = mq_timedreceive(&mqd1, &mut buf, &mut prio, &abstime).unwrap(); |
| 85 | assert_eq!(prio, 1); |
| 86 | |
| 87 | mq_close(mqd1).unwrap(); |
| 88 | mq_close(mqd0).unwrap(); |
| 89 | assert_eq!(msg_to_send, str::from_utf8(&buf[0..len]).unwrap()); |
| 90 | } |
| 91 | |
| 92 | #[test] |
| 93 | fn test_mq_getattr() { |
| 94 | use nix::mqueue::mq_getattr; |
| 95 | const MSG_SIZE: mq_attr_member_t = 32; |
| 96 | let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0); |
| 97 | let mq_name = "/attr_test_get_attr" ; |
| 98 | let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; |
| 99 | let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; |
| 100 | let r = mq_open(mq_name, oflag, mode, Some(&initial_attr)); |
| 101 | if let Err(Errno::ENOSYS) = r { |
| 102 | println!("message queues not supported or module not loaded?" ); |
| 103 | return; |
| 104 | }; |
| 105 | let mqd = r.unwrap(); |
| 106 | |
| 107 | let read_attr = mq_getattr(&mqd).unwrap(); |
| 108 | assert_attr_eq!(read_attr, initial_attr); |
| 109 | mq_close(mqd).unwrap(); |
| 110 | } |
| 111 | |
| 112 | // FIXME: Fix failures for mips in QEMU |
| 113 | #[test] |
| 114 | #[cfg_attr ( |
| 115 | all(qemu, any(target_arch = "mips" , target_arch = "mips64" )), |
| 116 | ignore |
| 117 | )] |
| 118 | fn test_mq_setattr() { |
| 119 | use nix::mqueue::{mq_getattr, mq_setattr}; |
| 120 | const MSG_SIZE: mq_attr_member_t = 32; |
| 121 | let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0); |
| 122 | let mq_name = "/attr_test_get_attr" ; |
| 123 | let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; |
| 124 | let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; |
| 125 | let r = mq_open(mq_name, oflag, mode, Some(&initial_attr)); |
| 126 | if let Err(Errno::ENOSYS) = r { |
| 127 | println!("message queues not supported or module not loaded?" ); |
| 128 | return; |
| 129 | }; |
| 130 | let mqd = r.unwrap(); |
| 131 | |
| 132 | let new_attr = MqAttr::new(0, 20, MSG_SIZE * 2, 100); |
| 133 | let old_attr = mq_setattr(&mqd, &new_attr).unwrap(); |
| 134 | assert_attr_eq!(old_attr, initial_attr); |
| 135 | |
| 136 | // No changes here because according to the Linux man page only |
| 137 | // O_NONBLOCK can be set (see tests below) |
| 138 | #[cfg (not(any(target_os = "dragonfly" , target_os = "netbsd" )))] |
| 139 | { |
| 140 | let new_attr_get = mq_getattr(&mqd).unwrap(); |
| 141 | assert_ne!(new_attr_get, new_attr); |
| 142 | } |
| 143 | |
| 144 | let new_attr_non_blocking = MqAttr::new( |
| 145 | MQ_OFlag::O_NONBLOCK.bits() as mq_attr_member_t, |
| 146 | 10, |
| 147 | MSG_SIZE, |
| 148 | 0, |
| 149 | ); |
| 150 | mq_setattr(&mqd, &new_attr_non_blocking).unwrap(); |
| 151 | let new_attr_get = mq_getattr(&mqd).unwrap(); |
| 152 | |
| 153 | // now the O_NONBLOCK flag has been set |
| 154 | #[cfg (not(any(target_os = "dragonfly" , target_os = "netbsd" )))] |
| 155 | { |
| 156 | assert_ne!(new_attr_get, initial_attr); |
| 157 | } |
| 158 | assert_attr_eq!(new_attr_get, new_attr_non_blocking); |
| 159 | mq_close(mqd).unwrap(); |
| 160 | } |
| 161 | |
| 162 | // FIXME: Fix failures for mips in QEMU |
| 163 | #[test] |
| 164 | #[cfg_attr ( |
| 165 | all(qemu, any(target_arch = "mips" , target_arch = "mips64" )), |
| 166 | ignore |
| 167 | )] |
| 168 | fn test_mq_set_nonblocking() { |
| 169 | use nix::mqueue::{mq_getattr, mq_remove_nonblock, mq_set_nonblock}; |
| 170 | const MSG_SIZE: mq_attr_member_t = 32; |
| 171 | let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0); |
| 172 | let mq_name = "/attr_test_get_attr" ; |
| 173 | let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; |
| 174 | let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; |
| 175 | let r = mq_open(mq_name, oflag, mode, Some(&initial_attr)); |
| 176 | if let Err(Errno::ENOSYS) = r { |
| 177 | println!("message queues not supported or module not loaded?" ); |
| 178 | return; |
| 179 | }; |
| 180 | let mqd = r.unwrap(); |
| 181 | mq_set_nonblock(&mqd).unwrap(); |
| 182 | let new_attr = mq_getattr(&mqd); |
| 183 | let o_nonblock_bits = MQ_OFlag::O_NONBLOCK.bits() as mq_attr_member_t; |
| 184 | assert_eq!(new_attr.unwrap().flags() & o_nonblock_bits, o_nonblock_bits); |
| 185 | mq_remove_nonblock(&mqd).unwrap(); |
| 186 | let new_attr = mq_getattr(&mqd); |
| 187 | assert_eq!(new_attr.unwrap().flags() & o_nonblock_bits, 0); |
| 188 | mq_close(mqd).unwrap(); |
| 189 | } |
| 190 | |
| 191 | #[test] |
| 192 | fn test_mq_unlink() { |
| 193 | use nix::mqueue::mq_unlink; |
| 194 | const MSG_SIZE: mq_attr_member_t = 32; |
| 195 | let initial_attr = MqAttr::new(0, 10, MSG_SIZE, 0); |
| 196 | let mq_name_opened = "/mq_unlink_test" ; |
| 197 | #[cfg (not(any(target_os = "dragonfly" , target_os = "netbsd" )))] |
| 198 | let mq_name_not_opened = "/mq_unlink_test" ; |
| 199 | let oflag = MQ_OFlag::O_CREAT | MQ_OFlag::O_WRONLY; |
| 200 | let mode = Mode::S_IWUSR | Mode::S_IRUSR | Mode::S_IRGRP | Mode::S_IROTH; |
| 201 | let r = mq_open(mq_name_opened, oflag, mode, Some(&initial_attr)); |
| 202 | if let Err(Errno::ENOSYS) = r { |
| 203 | println!("message queues not supported or module not loaded?" ); |
| 204 | return; |
| 205 | }; |
| 206 | let mqd = r.unwrap(); |
| 207 | |
| 208 | let res_unlink = mq_unlink(mq_name_opened); |
| 209 | assert_eq!(res_unlink, Ok(())); |
| 210 | |
| 211 | // NetBSD (and others which inherit its implementation) defer removing the message |
| 212 | // queue name until all references are closed, whereas Linux and others remove the |
| 213 | // message queue name immediately. |
| 214 | #[cfg (not(any(target_os = "dragonfly" , target_os = "netbsd" )))] |
| 215 | { |
| 216 | let res_unlink_not_opened = mq_unlink(mq_name_not_opened); |
| 217 | assert_eq!(res_unlink_not_opened, Err(Errno::ENOENT)); |
| 218 | } |
| 219 | |
| 220 | mq_close(mqd).unwrap(); |
| 221 | let res_unlink_after_close = mq_unlink(mq_name_opened); |
| 222 | assert_eq!(res_unlink_after_close, Err(Errno::ENOENT)); |
| 223 | } |
| 224 | |