| 1 | //! A single-producer, single-consumer (oneshot) channel. |
| 2 | //! |
| 3 | //! This is an experimental module, so the API will likely change. |
| 4 | |
| 5 | use crate::sync::mpmc; |
| 6 | use crate::sync::mpsc::{RecvError, SendError}; |
| 7 | use crate::time::{Duration, Instant}; |
| 8 | use crate::{error, fmt}; |
| 9 | |
| 10 | /// Creates a new oneshot channel, returning the sender/receiver halves. |
| 11 | /// |
| 12 | /// # Examples |
| 13 | /// |
| 14 | /// ``` |
| 15 | /// #![feature(oneshot_channel)] |
| 16 | /// use std::sync::oneshot; |
| 17 | /// use std::thread; |
| 18 | /// |
| 19 | /// let (sender, receiver) = oneshot::channel(); |
| 20 | /// |
| 21 | /// // Spawn off an expensive computation. |
| 22 | /// thread::spawn(move || { |
| 23 | /// # fn expensive_computation() -> i32 { 42 } |
| 24 | /// sender.send(expensive_computation()).unwrap(); |
| 25 | /// // `sender` is consumed by `send`, so we cannot use it anymore. |
| 26 | /// }); |
| 27 | /// |
| 28 | /// # fn do_other_work() -> i32 { 42 } |
| 29 | /// do_other_work(); |
| 30 | /// |
| 31 | /// // Let's see what that answer was... |
| 32 | /// println!("{:?}" , receiver.recv().unwrap()); |
| 33 | /// // `receiver` is consumed by `recv`, so we cannot use it anymore. |
| 34 | /// ``` |
| 35 | #[must_use ] |
| 36 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 37 | pub fn channel<T>() -> (Sender<T>, Receiver<T>) { |
| 38 | // Using a `sync_channel` with capacity 1 means that the internal implementation will use the |
| 39 | // `Array`-flavored channel implementation. |
| 40 | let (sender: Sender, receiver: Receiver) = mpmc::sync_channel(cap:1); |
| 41 | (Sender { inner: sender }, Receiver { inner: receiver }) |
| 42 | } |
| 43 | |
| 44 | //////////////////////////////////////////////////////////////////////////////////////////////////// |
| 45 | // Sender |
| 46 | //////////////////////////////////////////////////////////////////////////////////////////////////// |
| 47 | |
| 48 | /// The sending half of a oneshot channel. |
| 49 | /// |
| 50 | /// # Examples |
| 51 | /// |
| 52 | /// ``` |
| 53 | /// #![feature(oneshot_channel)] |
| 54 | /// use std::sync::oneshot; |
| 55 | /// use std::thread; |
| 56 | /// |
| 57 | /// let (sender, receiver) = oneshot::channel(); |
| 58 | /// |
| 59 | /// thread::spawn(move || { |
| 60 | /// sender.send("Hello from thread!" ).unwrap(); |
| 61 | /// }); |
| 62 | /// |
| 63 | /// assert_eq!(receiver.recv().unwrap(), "Hello from thread!" ); |
| 64 | /// ``` |
| 65 | /// |
| 66 | /// `Sender` cannot be sent between threads if it is sending non-`Send` types. |
| 67 | /// |
| 68 | /// ```compile_fail |
| 69 | /// #![feature(oneshot_channel)] |
| 70 | /// use std::sync::oneshot; |
| 71 | /// use std::thread; |
| 72 | /// use std::ptr; |
| 73 | /// |
| 74 | /// let (sender, receiver) = oneshot::channel(); |
| 75 | /// |
| 76 | /// struct NotSend(*mut ()); |
| 77 | /// thread::spawn(move || { |
| 78 | /// sender.send(NotSend(ptr::null_mut())); |
| 79 | /// }); |
| 80 | /// |
| 81 | /// let reply = receiver.try_recv().unwrap(); |
| 82 | /// ``` |
| 83 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 84 | pub struct Sender<T> { |
| 85 | /// The `oneshot` channel is simply a wrapper around a `mpmc` channel. |
| 86 | inner: mpmc::Sender<T>, |
| 87 | } |
| 88 | |
| 89 | // SAFETY: Since the only methods in which synchronization must occur take full ownership of the |
| 90 | // [`Sender`], it is perfectly safe to share a `&Sender` between threads (as it is effectively |
| 91 | // useless without ownership). |
| 92 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 93 | unsafe impl<T> Sync for Sender<T> {} |
| 94 | |
| 95 | impl<T> Sender<T> { |
| 96 | /// Attempts to send a value through this channel. This can only fail if the corresponding |
| 97 | /// [`Receiver<T>`] has been dropped. |
| 98 | /// |
| 99 | /// This method is non-blocking (wait-free). |
| 100 | /// |
| 101 | /// # Examples |
| 102 | /// |
| 103 | /// ``` |
| 104 | /// #![feature(oneshot_channel)] |
| 105 | /// use std::sync::oneshot; |
| 106 | /// use std::thread; |
| 107 | /// |
| 108 | /// let (tx, rx) = oneshot::channel(); |
| 109 | /// |
| 110 | /// thread::spawn(move || { |
| 111 | /// // Perform some computation. |
| 112 | /// let result = 2 + 2; |
| 113 | /// tx.send(result).unwrap(); |
| 114 | /// }); |
| 115 | /// |
| 116 | /// assert_eq!(rx.recv().unwrap(), 4); |
| 117 | /// ``` |
| 118 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 119 | pub fn send(self, t: T) -> Result<(), SendError<T>> { |
| 120 | self.inner.send(t) |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 125 | impl<T> fmt::Debug for Sender<T> { |
| 126 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 127 | f.debug_struct("Sender" ).finish_non_exhaustive() |
| 128 | } |
| 129 | } |
| 130 | |
| 131 | //////////////////////////////////////////////////////////////////////////////////////////////////// |
| 132 | // Receiver |
| 133 | //////////////////////////////////////////////////////////////////////////////////////////////////// |
| 134 | |
| 135 | /// The receiving half of a oneshot channel. |
| 136 | /// |
| 137 | /// # Examples |
| 138 | /// |
| 139 | /// ``` |
| 140 | /// #![feature(oneshot_channel)] |
| 141 | /// use std::sync::oneshot; |
| 142 | /// use std::thread; |
| 143 | /// use std::time::Duration; |
| 144 | /// |
| 145 | /// let (sender, receiver) = oneshot::channel(); |
| 146 | /// |
| 147 | /// thread::spawn(move || { |
| 148 | /// thread::sleep(Duration::from_millis(100)); |
| 149 | /// sender.send("Hello after delay!" ).unwrap(); |
| 150 | /// }); |
| 151 | /// |
| 152 | /// println!("Waiting for message..." ); |
| 153 | /// println!("{}" , receiver.recv().unwrap()); |
| 154 | /// ``` |
| 155 | /// |
| 156 | /// `Receiver` cannot be sent between threads if it is receiving non-`Send` types. |
| 157 | /// |
| 158 | /// ```compile_fail |
| 159 | /// # #![feature(oneshot_channel)] |
| 160 | /// # use std::sync::oneshot; |
| 161 | /// # use std::thread; |
| 162 | /// # use std::ptr; |
| 163 | /// # |
| 164 | /// let (sender, receiver) = oneshot::channel(); |
| 165 | /// |
| 166 | /// struct NotSend(*mut ()); |
| 167 | /// sender.send(NotSend(ptr::null_mut())); |
| 168 | /// |
| 169 | /// thread::spawn(move || { |
| 170 | /// let reply = receiver.try_recv().unwrap(); |
| 171 | /// }); |
| 172 | /// ``` |
| 173 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 174 | pub struct Receiver<T> { |
| 175 | /// The `oneshot` channel is simply a wrapper around a `mpmc` channel. |
| 176 | inner: mpmc::Receiver<T>, |
| 177 | } |
| 178 | |
| 179 | // SAFETY: Since the only methods in which synchronization must occur take full ownership of the |
| 180 | // [`Receiver`], it is perfectly safe to share a `&Receiver` between threads (as it is unable to |
| 181 | // receive any values without ownership). |
| 182 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 183 | unsafe impl<T> Sync for Receiver<T> {} |
| 184 | |
| 185 | impl<T> Receiver<T> { |
| 186 | /// Receives the value from the sending end, blocking the calling thread until it gets it. |
| 187 | /// |
| 188 | /// Can only fail if the corresponding [`Sender<T>`] has been dropped. |
| 189 | /// |
| 190 | /// # Examples |
| 191 | /// |
| 192 | /// ``` |
| 193 | /// #![feature(oneshot_channel)] |
| 194 | /// use std::sync::oneshot; |
| 195 | /// use std::thread; |
| 196 | /// use std::time::Duration; |
| 197 | /// |
| 198 | /// let (tx, rx) = oneshot::channel(); |
| 199 | /// |
| 200 | /// thread::spawn(move || { |
| 201 | /// thread::sleep(Duration::from_millis(500)); |
| 202 | /// tx.send("Done!" ).unwrap(); |
| 203 | /// }); |
| 204 | /// |
| 205 | /// // This will block until the message arrives. |
| 206 | /// println!("{}" , rx.recv().unwrap()); |
| 207 | /// ``` |
| 208 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 209 | pub fn recv(self) -> Result<T, RecvError> { |
| 210 | self.inner.recv() |
| 211 | } |
| 212 | |
| 213 | // Fallible methods. |
| 214 | |
| 215 | /// Attempts to return a pending value on this receiver without blocking. |
| 216 | /// |
| 217 | /// # Examples |
| 218 | /// |
| 219 | /// ``` |
| 220 | /// #![feature(oneshot_channel)] |
| 221 | /// use std::sync::oneshot; |
| 222 | /// use std::thread; |
| 223 | /// use std::time::Duration; |
| 224 | /// |
| 225 | /// let (sender, mut receiver) = oneshot::channel(); |
| 226 | /// |
| 227 | /// thread::spawn(move || { |
| 228 | /// thread::sleep(Duration::from_millis(100)); |
| 229 | /// sender.send(42).unwrap(); |
| 230 | /// }); |
| 231 | /// |
| 232 | /// // Keep trying until we get the message, doing other work in the process. |
| 233 | /// loop { |
| 234 | /// match receiver.try_recv() { |
| 235 | /// Ok(value) => { |
| 236 | /// assert_eq!(value, 42); |
| 237 | /// break; |
| 238 | /// } |
| 239 | /// Err(oneshot::TryRecvError::Empty(rx)) => { |
| 240 | /// // Retake ownership of the receiver. |
| 241 | /// receiver = rx; |
| 242 | /// # fn do_other_work() { thread::sleep(Duration::from_millis(25)); } |
| 243 | /// do_other_work(); |
| 244 | /// } |
| 245 | /// Err(oneshot::TryRecvError::Disconnected) => panic!("Sender disconnected" ), |
| 246 | /// } |
| 247 | /// } |
| 248 | /// ``` |
| 249 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 250 | pub fn try_recv(self) -> Result<T, TryRecvError<T>> { |
| 251 | self.inner.try_recv().map_err(|err| match err { |
| 252 | mpmc::TryRecvError::Empty => TryRecvError::Empty(self), |
| 253 | mpmc::TryRecvError::Disconnected => TryRecvError::Disconnected, |
| 254 | }) |
| 255 | } |
| 256 | |
| 257 | /// Attempts to wait for a value on this receiver, returning an error if the corresponding |
| 258 | /// [`Sender`] half of this channel has been dropped, or if it waits more than `timeout`. |
| 259 | /// |
| 260 | /// # Examples |
| 261 | /// |
| 262 | /// ``` |
| 263 | /// #![feature(oneshot_channel)] |
| 264 | /// use std::sync::oneshot; |
| 265 | /// use std::thread; |
| 266 | /// use std::time::Duration; |
| 267 | /// |
| 268 | /// let (sender, receiver) = oneshot::channel(); |
| 269 | /// |
| 270 | /// thread::spawn(move || { |
| 271 | /// thread::sleep(Duration::from_millis(500)); |
| 272 | /// sender.send("Success!" ).unwrap(); |
| 273 | /// }); |
| 274 | /// |
| 275 | /// // Wait up to 1 second for the message |
| 276 | /// match receiver.recv_timeout(Duration::from_secs(1)) { |
| 277 | /// Ok(msg) => println!("Received: {}" , msg), |
| 278 | /// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Timed out!" ), |
| 279 | /// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!" ), |
| 280 | /// } |
| 281 | /// ``` |
| 282 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 283 | pub fn recv_timeout(self, timeout: Duration) -> Result<T, RecvTimeoutError<T>> { |
| 284 | self.inner.recv_timeout(timeout).map_err(|err| match err { |
| 285 | mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self), |
| 286 | mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, |
| 287 | }) |
| 288 | } |
| 289 | |
| 290 | /// Attempts to wait for a value on this receiver, returning an error if the corresponding |
| 291 | /// [`Sender`] half of this channel has been dropped, or if `deadline` is reached. |
| 292 | /// |
| 293 | /// # Examples |
| 294 | /// |
| 295 | /// ``` |
| 296 | /// #![feature(oneshot_channel)] |
| 297 | /// use std::sync::oneshot; |
| 298 | /// use std::thread; |
| 299 | /// use std::time::{Duration, Instant}; |
| 300 | /// |
| 301 | /// let (sender, receiver) = oneshot::channel(); |
| 302 | /// |
| 303 | /// thread::spawn(move || { |
| 304 | /// thread::sleep(Duration::from_millis(100)); |
| 305 | /// sender.send("Just in time!" ).unwrap(); |
| 306 | /// }); |
| 307 | /// |
| 308 | /// let deadline = Instant::now() + Duration::from_millis(500); |
| 309 | /// match receiver.recv_deadline(deadline) { |
| 310 | /// Ok(msg) => println!("Received: {}" , msg), |
| 311 | /// Err(oneshot::RecvTimeoutError::Timeout(_)) => println!("Missed deadline!" ), |
| 312 | /// Err(oneshot::RecvTimeoutError::Disconnected) => println!("Sender dropped!" ), |
| 313 | /// } |
| 314 | /// ``` |
| 315 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 316 | pub fn recv_deadline(self, deadline: Instant) -> Result<T, RecvTimeoutError<T>> { |
| 317 | self.inner.recv_deadline(deadline).map_err(|err| match err { |
| 318 | mpmc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout(self), |
| 319 | mpmc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected, |
| 320 | }) |
| 321 | } |
| 322 | } |
| 323 | |
| 324 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 325 | impl<T> fmt::Debug for Receiver<T> { |
| 326 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 327 | f.debug_struct("Receiver" ).finish_non_exhaustive() |
| 328 | } |
| 329 | } |
| 330 | |
| 331 | //////////////////////////////////////////////////////////////////////////////////////////////////// |
| 332 | // Receiver Errors |
| 333 | //////////////////////////////////////////////////////////////////////////////////////////////////// |
| 334 | |
| 335 | /// An error returned from the [`try_recv`](Receiver::try_recv) method. |
| 336 | /// |
| 337 | /// See the documentation for [`try_recv`] for more information on how to use this error. |
| 338 | /// |
| 339 | /// [`try_recv`]: Receiver::try_recv |
| 340 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 341 | pub enum TryRecvError<T> { |
| 342 | /// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet |
| 343 | /// disconnected). This variant contains the [`Receiver`] that [`try_recv`](Receiver::try_recv) |
| 344 | /// took ownership over. |
| 345 | Empty(Receiver<T>), |
| 346 | /// The corresponding [`Sender`] half of this channel has become disconnected, and there will |
| 347 | /// never be any more data sent over the channel. |
| 348 | Disconnected, |
| 349 | } |
| 350 | |
| 351 | /// An error returned from the [`recv_timeout`](Receiver::recv_timeout) or |
| 352 | /// [`recv_deadline`](Receiver::recv_deadline) methods. |
| 353 | /// |
| 354 | /// # Examples |
| 355 | /// |
| 356 | /// Usage of this error is similar to [`TryRecvError`]. |
| 357 | /// |
| 358 | /// ``` |
| 359 | /// #![feature(oneshot_channel)] |
| 360 | /// use std::sync::oneshot::{self, RecvTimeoutError}; |
| 361 | /// use std::thread; |
| 362 | /// use std::time::Duration; |
| 363 | /// |
| 364 | /// let (sender, receiver) = oneshot::channel(); |
| 365 | /// |
| 366 | /// let send_failure = thread::spawn(move || { |
| 367 | /// // Simulate a long computation that takes longer than our timeout. |
| 368 | /// thread::sleep(Duration::from_millis(250)); |
| 369 | /// |
| 370 | /// // This will likely fail to send because we drop the receiver in the main thread. |
| 371 | /// sender.send("Goodbye!" .to_string()).unwrap(); |
| 372 | /// }); |
| 373 | /// |
| 374 | /// // Try to receive the message with a short timeout. |
| 375 | /// match receiver.recv_timeout(Duration::from_millis(10)) { |
| 376 | /// Ok(msg) => println!("Received: {}" , msg), |
| 377 | /// Err(RecvTimeoutError::Timeout(rx)) => { |
| 378 | /// println!("Timed out waiting for message!" ); |
| 379 | /// |
| 380 | /// // Note that you can reuse the receiver without dropping it. |
| 381 | /// drop(rx); |
| 382 | /// }, |
| 383 | /// Err(RecvTimeoutError::Disconnected) => println!("Sender dropped!" ), |
| 384 | /// } |
| 385 | /// |
| 386 | /// send_failure.join().unwrap_err(); |
| 387 | /// ``` |
| 388 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 389 | pub enum RecvTimeoutError<T> { |
| 390 | /// The [`Sender`] has not sent a message yet, but it might in the future (as it has not yet |
| 391 | /// disconnected). This variant contains the [`Receiver`] that either |
| 392 | /// [`recv_timeout`](Receiver::recv_timeout) or [`recv_deadline`](Receiver::recv_deadline) took |
| 393 | /// ownership over. |
| 394 | Timeout(Receiver<T>), |
| 395 | /// The corresponding [`Sender`] half of this channel has become disconnected, and there will |
| 396 | /// never be any more data sent over the channel. |
| 397 | Disconnected, |
| 398 | } |
| 399 | |
| 400 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 401 | impl<T> fmt::Debug for TryRecvError<T> { |
| 402 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 403 | f.debug_tuple("TryRecvError" ).finish_non_exhaustive() |
| 404 | } |
| 405 | } |
| 406 | |
| 407 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 408 | impl<T> fmt::Display for TryRecvError<T> { |
| 409 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 410 | match *self { |
| 411 | TryRecvError::Empty(..) => "receiving on an empty oneshot channel" .fmt(f), |
| 412 | TryRecvError::Disconnected => "receiving on a closed oneshot channel" .fmt(f), |
| 413 | } |
| 414 | } |
| 415 | } |
| 416 | |
| 417 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 418 | impl<T> error::Error for TryRecvError<T> {} |
| 419 | |
| 420 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 421 | impl<T> From<RecvError> for TryRecvError<T> { |
| 422 | /// Converts a `RecvError` into a `TryRecvError`. |
| 423 | /// |
| 424 | /// This conversion always returns `TryRecvError::Disconnected`. |
| 425 | /// |
| 426 | /// No data is allocated on the heap. |
| 427 | fn from(err: RecvError) -> TryRecvError<T> { |
| 428 | match err { |
| 429 | RecvError => TryRecvError::Disconnected, |
| 430 | } |
| 431 | } |
| 432 | } |
| 433 | |
| 434 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 435 | impl<T> fmt::Debug for RecvTimeoutError<T> { |
| 436 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 437 | f.debug_tuple("RecvTimeoutError" ).finish_non_exhaustive() |
| 438 | } |
| 439 | } |
| 440 | |
| 441 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 442 | impl<T> fmt::Display for RecvTimeoutError<T> { |
| 443 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
| 444 | match *self { |
| 445 | RecvTimeoutError::Timeout(..) => "timed out waiting on oneshot channel" .fmt(f), |
| 446 | RecvTimeoutError::Disconnected => "receiving on a closed oneshot channel" .fmt(f), |
| 447 | } |
| 448 | } |
| 449 | } |
| 450 | |
| 451 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 452 | impl<T> error::Error for RecvTimeoutError<T> {} |
| 453 | |
| 454 | #[unstable (feature = "oneshot_channel" , issue = "143674" )] |
| 455 | impl<T> From<RecvError> for RecvTimeoutError<T> { |
| 456 | /// Converts a `RecvError` into a `RecvTimeoutError`. |
| 457 | /// |
| 458 | /// This conversion always returns `RecvTimeoutError::Disconnected`. |
| 459 | /// |
| 460 | /// No data is allocated on the heap. |
| 461 | fn from(err: RecvError) -> RecvTimeoutError<T> { |
| 462 | match err { |
| 463 | RecvError => RecvTimeoutError::Disconnected, |
| 464 | } |
| 465 | } |
| 466 | } |
| 467 | |