1//! Multi - initiating multiple requests simultaneously
2
3use std::fmt;
4use std::marker;
5use std::ptr;
6use std::sync::Arc;
7use std::time::Duration;
8
9use curl_sys;
10use libc::{c_char, c_int, c_long, c_short, c_void};
11
12#[cfg(unix)]
13use libc::{pollfd, POLLIN, POLLOUT, POLLPRI};
14
15use crate::easy::{Easy, Easy2, List};
16use crate::panic;
17use crate::{Error, MultiError};
18
19/// A multi handle for initiating multiple connections simultaneously.
20///
21/// This structure corresponds to `CURLM` in libcurl and provides the ability to
22/// have multiple transfers in flight simultaneously. This handle is then used
23/// to manage each transfer. The main purpose of a `CURLM` is for the
24/// *application* to drive the I/O rather than libcurl itself doing all the
25/// blocking. Methods like `action` allow the application to inform libcurl of
26/// when events have happened.
27///
28/// Lots more documentation can be found on the libcurl [multi tutorial] where
29/// the APIs correspond pretty closely with this crate.
30///
31/// [multi tutorial]: https://curl.haxx.se/libcurl/c/libcurl-multi.html
32pub struct Multi {
33 raw: Arc<RawMulti>,
34 data: Box<MultiData>,
35}
36
37#[derive(Debug)]
38struct RawMulti {
39 handle: *mut curl_sys::CURLM,
40}
41
42struct MultiData {
43 socket: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
44 timer: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
45}
46
47/// Message from the `messages` function of a multi handle.
48///
49/// Currently only indicates whether a transfer is done.
50pub struct Message<'multi> {
51 ptr: *mut curl_sys::CURLMsg,
52 _multi: &'multi Multi,
53}
54
55/// Wrapper around an easy handle while it's owned by a multi handle.
56///
57/// Once an easy handle has been added to a multi handle then it can no longer
58/// be used via `perform`. This handle is also used to remove the easy handle
59/// from the multi handle when desired.
60pub struct EasyHandle {
61 // Safety: This *must* be before `easy` as it must be dropped first.
62 guard: DetachGuard,
63 easy: Easy,
64 // This is now effectively bound to a `Multi`, so it is no longer sendable.
65 _marker: marker::PhantomData<&'static Multi>,
66}
67
68/// Wrapper around an easy handle while it's owned by a multi handle.
69///
70/// Once an easy handle has been added to a multi handle then it can no longer
71/// be used via `perform`. This handle is also used to remove the easy handle
72/// from the multi handle when desired.
73pub struct Easy2Handle<H> {
74 // Safety: This *must* be before `easy` as it must be dropped first.
75 guard: DetachGuard,
76 easy: Easy2<H>,
77 // This is now effectively bound to a `Multi`, so it is no longer sendable.
78 _marker: marker::PhantomData<&'static Multi>,
79}
80
81/// A guard struct which guarantees that `curl_multi_remove_handle` will be
82/// called on an easy handle, either manually or on drop.
83struct DetachGuard {
84 multi: Arc<RawMulti>,
85 easy: *mut curl_sys::CURL,
86}
87
88/// Notification of the events that have happened on a socket.
89///
90/// This type is passed as an argument to the `action` method on a multi handle
91/// to indicate what events have occurred on a socket.
92pub struct Events {
93 bits: c_int,
94}
95
96/// Notification of events that are requested on a socket.
97///
98/// This type is yielded to the `socket_function` callback to indicate what
99/// events are requested on a socket.
100pub struct SocketEvents {
101 bits: c_int,
102}
103
104/// Raw underlying socket type that the multi handles use
105pub type Socket = curl_sys::curl_socket_t;
106
107/// File descriptor to wait on for use with the `wait` method on a multi handle.
108pub struct WaitFd {
109 inner: curl_sys::curl_waitfd,
110}
111
112/// A handle that can be used to wake up a thread that's blocked in [Multi::poll].
113/// The handle can be passed to and used from any thread.
114#[cfg(feature = "poll_7_68_0")]
115#[derive(Debug, Clone)]
116pub struct MultiWaker {
117 raw: std::sync::Weak<RawMulti>,
118}
119
120#[cfg(feature = "poll_7_68_0")]
121unsafe impl Send for MultiWaker {}
122
123#[cfg(feature = "poll_7_68_0")]
124unsafe impl Sync for MultiWaker {}
125
126impl Multi {
127 /// Creates a new multi session through which multiple HTTP transfers can be
128 /// initiated.
129 pub fn new() -> Multi {
130 unsafe {
131 crate::init();
132 let ptr = curl_sys::curl_multi_init();
133 assert!(!ptr.is_null());
134 Multi {
135 raw: Arc::new(RawMulti { handle: ptr }),
136 data: Box::new(MultiData {
137 socket: Box::new(|_, _, _| ()),
138 timer: Box::new(|_| true),
139 }),
140 }
141 }
142 }
143
144 /// Set the callback informed about what to wait for
145 ///
146 /// When the `action` function runs, it informs the application about
147 /// updates in the socket (file descriptor) status by doing none, one, or
148 /// multiple calls to the socket callback. The callback gets status updates
149 /// with changes since the previous time the callback was called. See
150 /// `action` for more details on how the callback is used and should work.
151 ///
152 /// The `SocketEvents` parameter informs the callback on the status of the
153 /// given socket, and the methods on that type can be used to learn about
154 /// what's going on with the socket.
155 ///
156 /// The third `usize` parameter is a custom value set by the `assign` method
157 /// below.
158 pub fn socket_function<F>(&mut self, f: F) -> Result<(), MultiError>
159 where
160 F: FnMut(Socket, SocketEvents, usize) + Send + 'static,
161 {
162 self._socket_function(Box::new(f))
163 }
164
165 fn _socket_function(
166 &mut self,
167 f: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
168 ) -> Result<(), MultiError> {
169 self.data.socket = f;
170 let cb: curl_sys::curl_socket_callback = cb;
171 self.setopt_ptr(
172 curl_sys::CURLMOPT_SOCKETFUNCTION,
173 cb as usize as *const c_char,
174 )?;
175 let ptr = &*self.data as *const _;
176 self.setopt_ptr(curl_sys::CURLMOPT_SOCKETDATA, ptr as *const c_char)?;
177 return Ok(());
178
179 // TODO: figure out how to expose `_easy`
180 extern "C" fn cb(
181 _easy: *mut curl_sys::CURL,
182 socket: curl_sys::curl_socket_t,
183 what: c_int,
184 userptr: *mut c_void,
185 socketp: *mut c_void,
186 ) -> c_int {
187 panic::catch(|| unsafe {
188 let f = &mut (*(userptr as *mut MultiData)).socket;
189 f(socket, SocketEvents { bits: what }, socketp as usize)
190 });
191 0
192 }
193 }
194
195 /// Set data to associate with an internal socket
196 ///
197 /// This function creates an association in the multi handle between the
198 /// given socket and a private token of the application. This is designed
199 /// for `action` uses.
200 ///
201 /// When set, the token will be passed to all future socket callbacks for
202 /// the specified socket.
203 ///
204 /// If the given socket isn't already in use by libcurl, this function will
205 /// return an error.
206 ///
207 /// libcurl only keeps one single token associated with a socket, so
208 /// calling this function several times for the same socket will make the
209 /// last set token get used.
210 ///
211 /// The idea here being that this association (socket to token) is something
212 /// that just about every application that uses this API will need and then
213 /// libcurl can just as well do it since it already has an internal hash
214 /// table lookup for this.
215 ///
216 /// # Typical Usage
217 ///
218 /// In a typical application you allocate a struct or at least use some kind
219 /// of semi-dynamic data for each socket that we must wait for action on
220 /// when using the `action` approach.
221 ///
222 /// When our socket-callback gets called by libcurl and we get to know about
223 /// yet another socket to wait for, we can use `assign` to point out the
224 /// particular data so that when we get updates about this same socket
225 /// again, we don't have to find the struct associated with this socket by
226 /// ourselves.
227 pub fn assign(&self, socket: Socket, token: usize) -> Result<(), MultiError> {
228 unsafe {
229 cvt(curl_sys::curl_multi_assign(
230 self.raw.handle,
231 socket,
232 token as *mut _,
233 ))?;
234 Ok(())
235 }
236 }
237
238 /// Set callback to receive timeout values
239 ///
240 /// Certain features, such as timeouts and retries, require you to call
241 /// libcurl even when there is no activity on the file descriptors.
242 ///
243 /// Your callback function should install a non-repeating timer with the
244 /// interval specified. Each time that timer fires, call either `action` or
245 /// `perform`, depending on which interface you use.
246 ///
247 /// A timeout value of `None` means you should delete your timer.
248 ///
249 /// A timeout value of 0 means you should call `action` or `perform` (once)
250 /// as soon as possible.
251 ///
252 /// This callback will only be called when the timeout changes.
253 ///
254 /// The timer callback should return `true` on success, and `false` on
255 /// error. This callback can be used instead of, or in addition to,
256 /// `get_timeout`.
257 pub fn timer_function<F>(&mut self, f: F) -> Result<(), MultiError>
258 where
259 F: FnMut(Option<Duration>) -> bool + Send + 'static,
260 {
261 self._timer_function(Box::new(f))
262 }
263
264 fn _timer_function(
265 &mut self,
266 f: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
267 ) -> Result<(), MultiError> {
268 self.data.timer = f;
269 let cb: curl_sys::curl_multi_timer_callback = cb;
270 self.setopt_ptr(
271 curl_sys::CURLMOPT_TIMERFUNCTION,
272 cb as usize as *const c_char,
273 )?;
274 let ptr = &*self.data as *const _;
275 self.setopt_ptr(curl_sys::CURLMOPT_TIMERDATA, ptr as *const c_char)?;
276 return Ok(());
277
278 // TODO: figure out how to expose `_multi`
279 extern "C" fn cb(
280 _multi: *mut curl_sys::CURLM,
281 timeout_ms: c_long,
282 user: *mut c_void,
283 ) -> c_int {
284 let keep_going = panic::catch(|| unsafe {
285 let f = &mut (*(user as *mut MultiData)).timer;
286 if timeout_ms == -1 {
287 f(None)
288 } else {
289 f(Some(Duration::from_millis(timeout_ms as u64)))
290 }
291 })
292 .unwrap_or(false);
293 if keep_going {
294 0
295 } else {
296 -1
297 }
298 }
299 }
300
301 /// Enable or disable HTTP pipelining and multiplexing.
302 ///
303 /// When http_1 is true, enable HTTP/1.1 pipelining, which means that if
304 /// you add a second request that can use an already existing connection,
305 /// the second request will be "piped" on the same connection rather than
306 /// being executed in parallel.
307 ///
308 /// When multiplex is true, enable HTTP/2 multiplexing, which means that
309 /// follow-up requests can re-use an existing connection and send the new
310 /// request multiplexed over that at the same time as other transfers are
311 /// already using that single connection.
312 pub fn pipelining(&mut self, http_1: bool, multiplex: bool) -> Result<(), MultiError> {
313 let bitmask = if http_1 { curl_sys::CURLPIPE_HTTP1 } else { 0 }
314 | if multiplex {
315 curl_sys::CURLPIPE_MULTIPLEX
316 } else {
317 0
318 };
319 self.setopt_long(curl_sys::CURLMOPT_PIPELINING, bitmask)
320 }
321
322 /// Sets the max number of connections to a single host.
323 ///
324 /// Pass a long to indicate the max number of simultaneously open connections
325 /// to a single host (a host being the same as a host name + port number pair).
326 /// For each new session to a host, libcurl will open up a new connection up to the
327 /// limit set by the provided value. When the limit is reached, the sessions will
328 /// be pending until a connection becomes available. If pipelining is enabled,
329 /// libcurl will try to pipeline if the host is capable of it.
330 pub fn set_max_host_connections(&mut self, val: usize) -> Result<(), MultiError> {
331 self.setopt_long(curl_sys::CURLMOPT_MAX_HOST_CONNECTIONS, val as c_long)
332 }
333
334 /// Sets the max simultaneously open connections.
335 ///
336 /// The set number will be used as the maximum number of simultaneously open
337 /// connections in total using this multi handle. For each new session,
338 /// libcurl will open a new connection up to the limit set by the provided
339 /// value. When the limit is reached, the sessions will be pending until
340 /// there are available connections. If pipelining is enabled, libcurl will
341 /// try to pipeline or use multiplexing if the host is capable of it.
342 pub fn set_max_total_connections(&mut self, val: usize) -> Result<(), MultiError> {
343 self.setopt_long(curl_sys::CURLMOPT_MAX_TOTAL_CONNECTIONS, val as c_long)
344 }
345
346 /// Set size of connection cache.
347 ///
348 /// The set number will be used as the maximum amount of simultaneously open
349 /// connections that libcurl may keep in its connection cache after
350 /// completed use. By default libcurl will enlarge the size for each added
351 /// easy handle to make it fit 4 times the number of added easy handles.
352 ///
353 /// By setting this option, you can prevent the cache size from growing
354 /// beyond the limit set by you.
355 ///
356 /// When the cache is full, curl closes the oldest one in the cache to
357 /// prevent the number of open connections from increasing.
358 ///
359 /// See [`set_max_total_connections`](#method.set_max_total_connections) for
360 /// limiting the number of active connections.
361 pub fn set_max_connects(&mut self, val: usize) -> Result<(), MultiError> {
362 self.setopt_long(curl_sys::CURLMOPT_MAXCONNECTS, val as c_long)
363 }
364
365 /// Sets the pipeline length.
366 ///
367 /// This sets the max number that will be used as the maximum amount of
368 /// outstanding requests in an HTTP/1.1 pipelined connection. This option
369 /// is only used for HTTP/1.1 pipelining, and not HTTP/2 multiplexing.
370 pub fn set_pipeline_length(&mut self, val: usize) -> Result<(), MultiError> {
371 self.setopt_long(curl_sys::CURLMOPT_MAX_PIPELINE_LENGTH, val as c_long)
372 }
373
374 fn setopt_long(&mut self, opt: curl_sys::CURLMoption, val: c_long) -> Result<(), MultiError> {
375 unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
376 }
377
378 fn setopt_ptr(
379 &mut self,
380 opt: curl_sys::CURLMoption,
381 val: *const c_char,
382 ) -> Result<(), MultiError> {
383 unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
384 }
385
386 /// Add an easy handle to a multi session
387 ///
388 /// Adds a standard easy handle to the multi stack. This function call will
389 /// make this multi handle control the specified easy handle.
390 ///
391 /// When an easy interface is added to a multi handle, it will use a shared
392 /// connection cache owned by the multi handle. Removing and adding new easy
393 /// handles will not affect the pool of connections or the ability to do
394 /// connection re-use.
395 ///
396 /// If you have `timer_function` set in the multi handle (and you really
397 /// should if you're working event-based with `action` and friends), that
398 /// callback will be called from within this function to ask for an updated
399 /// timer so that your main event loop will get the activity on this handle
400 /// to get started.
401 ///
402 /// The easy handle will remain added to the multi handle until you remove
403 /// it again with `remove` on the returned handle - even when a transfer
404 /// with that specific easy handle is completed.
405 pub fn add(&self, mut easy: Easy) -> Result<EasyHandle, MultiError> {
406 // Clear any configuration set by previous transfers because we're
407 // moving this into a `Send+'static` situation now basically.
408 easy.transfer();
409
410 unsafe {
411 cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
412 }
413 Ok(EasyHandle {
414 guard: DetachGuard {
415 multi: self.raw.clone(),
416 easy: easy.raw(),
417 },
418 easy,
419 _marker: marker::PhantomData,
420 })
421 }
422
423 /// Same as `add`, but works with the `Easy2` type.
424 pub fn add2<H>(&self, easy: Easy2<H>) -> Result<Easy2Handle<H>, MultiError> {
425 unsafe {
426 cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
427 }
428 Ok(Easy2Handle {
429 guard: DetachGuard {
430 multi: self.raw.clone(),
431 easy: easy.raw(),
432 },
433 easy,
434 _marker: marker::PhantomData,
435 })
436 }
437
438 /// Remove an easy handle from this multi session
439 ///
440 /// Removes the easy handle from this multi handle. This will make the
441 /// returned easy handle be removed from this multi handle's control.
442 ///
443 /// When the easy handle has been removed from a multi stack, it is again
444 /// perfectly legal to invoke `perform` on it.
445 ///
446 /// Removing an easy handle while being used is perfectly legal and will
447 /// effectively halt the transfer in progress involving that easy handle.
448 /// All other easy handles and transfers will remain unaffected.
449 pub fn remove(&self, mut easy: EasyHandle) -> Result<Easy, MultiError> {
450 easy.guard.detach()?;
451 Ok(easy.easy)
452 }
453
454 /// Same as `remove`, but for `Easy2Handle`.
455 pub fn remove2<H>(&self, mut easy: Easy2Handle<H>) -> Result<Easy2<H>, MultiError> {
456 easy.guard.detach()?;
457 Ok(easy.easy)
458 }
459
460 /// Read multi stack informationals
461 ///
462 /// Ask the multi handle if there are any messages/informationals from the
463 /// individual transfers. Messages may include informationals such as an
464 /// error code from the transfer or just the fact that a transfer is
465 /// completed. More details on these should be written down as well.
466 pub fn messages<F>(&self, mut f: F)
467 where
468 F: FnMut(Message),
469 {
470 self._messages(&mut f)
471 }
472
473 fn _messages(&self, f: &mut dyn FnMut(Message)) {
474 let mut queue = 0;
475 unsafe {
476 loop {
477 let ptr = curl_sys::curl_multi_info_read(self.raw.handle, &mut queue);
478 if ptr.is_null() {
479 break;
480 }
481 f(Message { ptr, _multi: self })
482 }
483 }
484 }
485
486 /// Inform of reads/writes available data given an action
487 ///
488 /// When the application has detected action on a socket handled by libcurl,
489 /// it should call this function with the sockfd argument set to
490 /// the socket with the action. When the events on a socket are known, they
491 /// can be passed `events`. When the events on a socket are unknown, pass
492 /// `Events::new()` instead, and libcurl will test the descriptor
493 /// internally.
494 ///
495 /// The returned integer will contain the number of running easy handles
496 /// within the multi handle. When this number reaches zero, all transfers
497 /// are complete/done. When you call `action` on a specific socket and the
498 /// counter decreases by one, it DOES NOT necessarily mean that this exact
499 /// socket/transfer is the one that completed. Use `messages` to figure out
500 /// which easy handle that completed.
501 ///
502 /// The `action` function informs the application about updates in the
503 /// socket (file descriptor) status by doing none, one, or multiple calls to
504 /// the socket callback function set with the `socket_function` method. They
505 /// update the status with changes since the previous time the callback was
506 /// called.
507 pub fn action(&self, socket: Socket, events: &Events) -> Result<u32, MultiError> {
508 let mut remaining = 0;
509 unsafe {
510 cvt(curl_sys::curl_multi_socket_action(
511 self.raw.handle,
512 socket,
513 events.bits,
514 &mut remaining,
515 ))?;
516 Ok(remaining as u32)
517 }
518 }
519
520 /// Inform libcurl that a timeout has expired and sockets should be tested.
521 ///
522 /// The returned integer will contain the number of running easy handles
523 /// within the multi handle. When this number reaches zero, all transfers
524 /// are complete/done. When you call `action` on a specific socket and the
525 /// counter decreases by one, it DOES NOT necessarily mean that this exact
526 /// socket/transfer is the one that completed. Use `messages` to figure out
527 /// which easy handle that completed.
528 ///
529 /// Get the timeout time by calling the `timer_function` method. Your
530 /// application will then get called with information on how long to wait
531 /// for socket actions at most before doing the timeout action: call the
532 /// `timeout` method. You can also use the `get_timeout` function to
533 /// poll the value at any given time, but for an event-based system using
534 /// the callback is far better than relying on polling the timeout value.
535 pub fn timeout(&self) -> Result<u32, MultiError> {
536 let mut remaining = 0;
537 unsafe {
538 cvt(curl_sys::curl_multi_socket_action(
539 self.raw.handle,
540 curl_sys::CURL_SOCKET_BAD,
541 0,
542 &mut remaining,
543 ))?;
544 Ok(remaining as u32)
545 }
546 }
547
548 /// Get how long to wait for action before proceeding
549 ///
550 /// An application using the libcurl multi interface should call
551 /// `get_timeout` to figure out how long it should wait for socket actions -
552 /// at most - before proceeding.
553 ///
554 /// Proceeding means either doing the socket-style timeout action: call the
555 /// `timeout` function, or call `perform` if you're using the simpler and
556 /// older multi interface approach.
557 ///
558 /// The timeout value returned is the duration at this very moment. If 0, it
559 /// means you should proceed immediately without waiting for anything. If it
560 /// returns `None`, there's no timeout at all set.
561 ///
562 /// Note: if libcurl returns a `None` timeout here, it just means that
563 /// libcurl currently has no stored timeout value. You must not wait too
564 /// long (more than a few seconds perhaps) before you call `perform` again.
565 pub fn get_timeout(&self) -> Result<Option<Duration>, MultiError> {
566 let mut ms = 0;
567 unsafe {
568 cvt(curl_sys::curl_multi_timeout(self.raw.handle, &mut ms))?;
569 if ms == -1 {
570 Ok(None)
571 } else {
572 Ok(Some(Duration::from_millis(ms as u64)))
573 }
574 }
575 }
576
577 /// Block until activity is detected or a timeout passes.
578 ///
579 /// The timeout is used in millisecond-precision. Large durations are
580 /// clamped at the maximum value curl accepts.
581 ///
582 /// The returned integer will contain the number of internal file
583 /// descriptors on which interesting events occured.
584 ///
585 /// This function is a simpler alternative to using `fdset()` and `select()`
586 /// and does not suffer from file descriptor limits.
587 ///
588 /// # Example
589 ///
590 /// ```
591 /// use curl::multi::Multi;
592 /// use std::time::Duration;
593 ///
594 /// let m = Multi::new();
595 ///
596 /// // Add some Easy handles...
597 ///
598 /// while m.perform().unwrap() > 0 {
599 /// m.wait(&mut [], Duration::from_secs(1)).unwrap();
600 /// }
601 /// ```
602 pub fn wait(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
603 let timeout_ms = Multi::timeout_i32(timeout);
604 unsafe {
605 let mut ret = 0;
606 cvt(curl_sys::curl_multi_wait(
607 self.raw.handle,
608 waitfds.as_mut_ptr() as *mut _,
609 waitfds.len() as u32,
610 timeout_ms,
611 &mut ret,
612 ))?;
613 Ok(ret as u32)
614 }
615 }
616
617 fn timeout_i32(timeout: Duration) -> i32 {
618 let secs = timeout.as_secs();
619 if secs > (i32::MAX / 1000) as u64 {
620 // Duration too large, clamp at maximum value.
621 i32::MAX
622 } else {
623 secs as i32 * 1000 + timeout.subsec_nanos() as i32 / 1_000_000
624 }
625 }
626
627 /// Block until activity is detected or a timeout passes.
628 ///
629 /// The timeout is used in millisecond-precision. Large durations are
630 /// clamped at the maximum value curl accepts.
631 ///
632 /// The returned integer will contain the number of internal file
633 /// descriptors on which interesting events occurred.
634 ///
635 /// This function is a simpler alternative to using `fdset()` and `select()`
636 /// and does not suffer from file descriptor limits.
637 ///
638 /// While this method is similar to [Multi::wait], with the following
639 /// distinctions:
640 /// * If there are no handles added to the multi, poll will honor the
641 /// provided timeout, while [Multi::wait] returns immediately.
642 /// * If poll has blocked due to there being no activity on the handles in
643 /// the Multi, it can be woken up from any thread and at any time before
644 /// the timeout expires.
645 ///
646 /// Requires libcurl 7.66.0 or later.
647 ///
648 /// # Example
649 ///
650 /// ```
651 /// use curl::multi::Multi;
652 /// use std::time::Duration;
653 ///
654 /// let m = Multi::new();
655 ///
656 /// // Add some Easy handles...
657 ///
658 /// while m.perform().unwrap() > 0 {
659 /// m.poll(&mut [], Duration::from_secs(1)).unwrap();
660 /// }
661 /// ```
662 #[cfg(feature = "poll_7_68_0")]
663 pub fn poll(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
664 let timeout_ms = Multi::timeout_i32(timeout);
665 unsafe {
666 let mut ret = 0;
667 cvt(curl_sys::curl_multi_poll(
668 self.raw.handle,
669 waitfds.as_mut_ptr() as *mut _,
670 waitfds.len() as u32,
671 timeout_ms,
672 &mut ret,
673 ))?;
674 Ok(ret as u32)
675 }
676 }
677
678 /// Returns a new [MultiWaker] that can be used to wake up a thread that's
679 /// currently blocked in [Multi::poll].
680 #[cfg(feature = "poll_7_68_0")]
681 pub fn waker(&self) -> MultiWaker {
682 MultiWaker::new(Arc::downgrade(&self.raw))
683 }
684
685 /// Reads/writes available data from each easy handle.
686 ///
687 /// This function handles transfers on all the added handles that need
688 /// attention in an non-blocking fashion.
689 ///
690 /// When an application has found out there's data available for this handle
691 /// or a timeout has elapsed, the application should call this function to
692 /// read/write whatever there is to read or write right now etc. This
693 /// method returns as soon as the reads/writes are done. This function does
694 /// not require that there actually is any data available for reading or
695 /// that data can be written, it can be called just in case. It will return
696 /// the number of handles that still transfer data.
697 ///
698 /// If the amount of running handles is changed from the previous call (or
699 /// is less than the amount of easy handles you've added to the multi
700 /// handle), you know that there is one or more transfers less "running".
701 /// You can then call `info` to get information about each individual
702 /// completed transfer, and that returned info includes `Error` and more.
703 /// If an added handle fails very quickly, it may never be counted as a
704 /// running handle.
705 ///
706 /// When running_handles is set to zero (0) on the return of this function,
707 /// there is no longer any transfers in progress.
708 ///
709 /// # Return
710 ///
711 /// Before libcurl version 7.20.0: If you receive `is_call_perform`, this
712 /// basically means that you should call `perform` again, before you select
713 /// on more actions. You don't have to do it immediately, but the return
714 /// code means that libcurl may have more data available to return or that
715 /// there may be more data to send off before it is "satisfied". Do note
716 /// that `perform` will return `is_call_perform` only when it wants to be
717 /// called again immediately. When things are fine and there is nothing
718 /// immediate it wants done, it'll return `Ok` and you need to wait for
719 /// "action" and then call this function again.
720 ///
721 /// This function only returns errors etc regarding the whole multi stack.
722 /// Problems still might have occurred on individual transfers even when
723 /// this function returns `Ok`. Use `info` to figure out how individual
724 /// transfers did.
725 pub fn perform(&self) -> Result<u32, MultiError> {
726 unsafe {
727 let mut ret = 0;
728 cvt(curl_sys::curl_multi_perform(self.raw.handle, &mut ret))?;
729 Ok(ret as u32)
730 }
731 }
732
733 /// Extracts file descriptor information from a multi handle
734 ///
735 /// This function extracts file descriptor information from a given
736 /// handle, and libcurl returns its `fd_set` sets. The application can use
737 /// these to `select()` on, but be sure to `FD_ZERO` them before calling
738 /// this function as curl_multi_fdset only adds its own descriptors, it
739 /// doesn't zero or otherwise remove any others. The curl_multi_perform
740 /// function should be called as soon as one of them is ready to be read
741 /// from or written to.
742 ///
743 /// If no file descriptors are set by libcurl, this function will return
744 /// `Ok(None)`. Otherwise `Ok(Some(n))` will be returned where `n` the
745 /// highest descriptor number libcurl set. When `Ok(None)` is returned it
746 /// is because libcurl currently does something that isn't possible for
747 /// your application to monitor with a socket and unfortunately you can
748 /// then not know exactly when the current action is completed using
749 /// `select()`. You then need to wait a while before you proceed and call
750 /// `perform` anyway.
751 ///
752 /// When doing `select()`, you should use `get_timeout` to figure out
753 /// how long to wait for action. Call `perform` even if no activity has
754 /// been seen on the `fd_set`s after the timeout expires as otherwise
755 /// internal retries and timeouts may not work as you'd think and want.
756 ///
757 /// If one of the sockets used by libcurl happens to be larger than what
758 /// can be set in an `fd_set`, which on POSIX systems means that the file
759 /// descriptor is larger than `FD_SETSIZE`, then libcurl will try to not
760 /// set it. Setting a too large file descriptor in an `fd_set` implies an out
761 /// of bounds write which can cause crashes, or worse. The effect of NOT
762 /// storing it will possibly save you from the crash, but will make your
763 /// program NOT wait for sockets it should wait for...
764 pub fn fdset2(
765 &self,
766 read: Option<&mut curl_sys::fd_set>,
767 write: Option<&mut curl_sys::fd_set>,
768 except: Option<&mut curl_sys::fd_set>,
769 ) -> Result<Option<i32>, MultiError> {
770 unsafe {
771 let mut ret = 0;
772 let read = read.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
773 let write = write.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
774 let except = except.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
775 cvt(curl_sys::curl_multi_fdset(
776 self.raw.handle,
777 read,
778 write,
779 except,
780 &mut ret,
781 ))?;
782 if ret == -1 {
783 Ok(None)
784 } else {
785 Ok(Some(ret))
786 }
787 }
788 }
789
790 /// Does nothing and returns `Ok(())`. This method remains for backwards
791 /// compatibility.
792 ///
793 /// This method will be changed to take `self` in a future release.
794 #[doc(hidden)]
795 #[deprecated(
796 since = "0.4.30",
797 note = "cannot close safely without consuming self; \
798 will be changed or removed in a future release"
799 )]
800 pub fn close(&self) -> Result<(), MultiError> {
801 Ok(())
802 }
803
804 /// Get a pointer to the raw underlying CURLM handle.
805 pub fn raw(&self) -> *mut curl_sys::CURLM {
806 self.raw.handle
807 }
808}
809
810impl Drop for RawMulti {
811 fn drop(&mut self) {
812 unsafe {
813 let _ = cvt(code:curl_sys::curl_multi_cleanup(self.handle));
814 }
815 }
816}
817
818#[cfg(feature = "poll_7_68_0")]
819impl MultiWaker {
820 /// Creates a new MultiWaker handle.
821 fn new(raw: std::sync::Weak<RawMulti>) -> Self {
822 Self { raw }
823 }
824
825 /// Wakes up a thread that is blocked in [Multi::poll]. This method can be
826 /// invoked from any thread.
827 ///
828 /// Will return an error if the RawMulti has already been dropped.
829 ///
830 /// Requires libcurl 7.68.0 or later.
831 pub fn wakeup(&self) -> Result<(), MultiError> {
832 if let Some(raw) = self.raw.upgrade() {
833 unsafe { cvt(curl_sys::curl_multi_wakeup(raw.handle)) }
834 } else {
835 // This happens if the RawMulti has already been dropped:
836 Err(MultiError::new(curl_sys::CURLM_BAD_HANDLE))
837 }
838 }
839}
840
841fn cvt(code: curl_sys::CURLMcode) -> Result<(), MultiError> {
842 if code == curl_sys::CURLM_OK {
843 Ok(())
844 } else {
845 Err(MultiError::new(code))
846 }
847}
848
849impl fmt::Debug for Multi {
850 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
851 f.debug_struct("Multi").field(name:"raw", &self.raw).finish()
852 }
853}
854
855macro_rules! impl_easy_getters {
856 () => {
857 impl_easy_getters! {
858 time_condition_unmet -> bool,
859 effective_url -> Option<&str>,
860 effective_url_bytes -> Option<&[u8]>,
861 response_code -> u32,
862 http_connectcode -> u32,
863 filetime -> Option<i64>,
864 download_size -> f64,
865 content_length_download -> f64,
866 total_time -> Duration,
867 namelookup_time -> Duration,
868 connect_time -> Duration,
869 appconnect_time -> Duration,
870 pretransfer_time -> Duration,
871 starttransfer_time -> Duration,
872 redirect_time -> Duration,
873 redirect_count -> u32,
874 redirect_url -> Option<&str>,
875 redirect_url_bytes -> Option<&[u8]>,
876 header_size -> u64,
877 request_size -> u64,
878 content_type -> Option<&str>,
879 content_type_bytes -> Option<&[u8]>,
880 os_errno -> i32,
881 primary_ip -> Option<&str>,
882 primary_port -> u16,
883 local_ip -> Option<&str>,
884 local_port -> u16,
885 cookies -> List,
886 }
887 };
888
889 ($($name:ident -> $ret:ty,)*) => {
890 $(
891 impl_easy_getters!($name, $ret, concat!(
892 "Same as [`Easy2::",
893 stringify!($name),
894 "`](../easy/struct.Easy2.html#method.",
895 stringify!($name),
896 ")."
897 ));
898 )*
899 };
900
901 ($name:ident, $ret:ty, $doc:expr) => {
902 #[doc = $doc]
903 pub fn $name(&mut self) -> Result<$ret, Error> {
904 self.easy.$name()
905 }
906 };
907}
908
909impl EasyHandle {
910 /// Sets an internal private token for this `EasyHandle`.
911 ///
912 /// This function will set the `CURLOPT_PRIVATE` field on the underlying
913 /// easy handle.
914 pub fn set_token(&mut self, token: usize) -> Result<(), Error> {
915 unsafe {
916 crate::cvt(curl_sys::curl_easy_setopt(
917 self.easy.raw(),
918 curl_sys::CURLOPT_PRIVATE,
919 token,
920 ))
921 }
922 }
923
924 impl_easy_getters!();
925
926 /// Unpause reading on a connection.
927 ///
928 /// Using this function, you can explicitly unpause a connection that was
929 /// previously paused.
930 ///
931 /// A connection can be paused by letting the read or the write callbacks
932 /// return `ReadError::Pause` or `WriteError::Pause`.
933 ///
934 /// The chance is high that you will get your write callback called before
935 /// this function returns.
936 pub fn unpause_read(&self) -> Result<(), Error> {
937 self.easy.unpause_read()
938 }
939
940 /// Unpause writing on a connection.
941 ///
942 /// Using this function, you can explicitly unpause a connection that was
943 /// previously paused.
944 ///
945 /// A connection can be paused by letting the read or the write callbacks
946 /// return `ReadError::Pause` or `WriteError::Pause`. A write callback that
947 /// returns pause signals to the library that it couldn't take care of any
948 /// data at all, and that data will then be delivered again to the callback
949 /// when the writing is later unpaused.
950 pub fn unpause_write(&self) -> Result<(), Error> {
951 self.easy.unpause_write()
952 }
953
954 /// Get a pointer to the raw underlying CURL handle.
955 pub fn raw(&self) -> *mut curl_sys::CURL {
956 self.easy.raw()
957 }
958}
959
960impl fmt::Debug for EasyHandle {
961 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
962 self.easy.fmt(f)
963 }
964}
965
966impl<H> Easy2Handle<H> {
967 /// Acquires a reference to the underlying handler for events.
968 pub fn get_ref(&self) -> &H {
969 self.easy.get_ref()
970 }
971
972 /// Acquires a reference to the underlying handler for events.
973 pub fn get_mut(&mut self) -> &mut H {
974 self.easy.get_mut()
975 }
976
977 /// Same as `EasyHandle::set_token`
978 pub fn set_token(&mut self, token: usize) -> Result<(), Error> {
979 unsafe {
980 crate::cvt(curl_sys::curl_easy_setopt(
981 self.easy.raw(),
982 curl_sys::CURLOPT_PRIVATE,
983 token,
984 ))
985 }
986 }
987
988 impl_easy_getters!();
989
990 /// Unpause reading on a connection.
991 ///
992 /// Using this function, you can explicitly unpause a connection that was
993 /// previously paused.
994 ///
995 /// A connection can be paused by letting the read or the write callbacks
996 /// return `ReadError::Pause` or `WriteError::Pause`.
997 ///
998 /// The chance is high that you will get your write callback called before
999 /// this function returns.
1000 pub fn unpause_read(&self) -> Result<(), Error> {
1001 self.easy.unpause_read()
1002 }
1003
1004 /// Unpause writing on a connection.
1005 ///
1006 /// Using this function, you can explicitly unpause a connection that was
1007 /// previously paused.
1008 ///
1009 /// A connection can be paused by letting the read or the write callbacks
1010 /// return `ReadError::Pause` or `WriteError::Pause`. A write callback that
1011 /// returns pause signals to the library that it couldn't take care of any
1012 /// data at all, and that data will then be delivered again to the callback
1013 /// when the writing is later unpaused.
1014 pub fn unpause_write(&self) -> Result<(), Error> {
1015 self.easy.unpause_write()
1016 }
1017
1018 /// Get a pointer to the raw underlying CURL handle.
1019 pub fn raw(&self) -> *mut curl_sys::CURL {
1020 self.easy.raw()
1021 }
1022}
1023
1024impl<H: fmt::Debug> fmt::Debug for Easy2Handle<H> {
1025 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1026 self.easy.fmt(f)
1027 }
1028}
1029
1030impl DetachGuard {
1031 /// Detach the referenced easy handle from its multi handle manually.
1032 /// Subsequent calls to this method will have no effect.
1033 fn detach(&mut self) -> Result<(), MultiError> {
1034 if !self.easy.is_null() {
1035 unsafe {
1036 cvt(code:curl_sys::curl_multi_remove_handle(
1037 self.multi.handle,
1038 self.easy,
1039 ))?
1040 }
1041
1042 // Set easy to null to signify that the handle was removed.
1043 self.easy = ptr::null_mut();
1044 }
1045
1046 Ok(())
1047 }
1048}
1049
1050impl Drop for DetachGuard {
1051 fn drop(&mut self) {
1052 let _ = self.detach();
1053 }
1054}
1055
1056impl<'multi> Message<'multi> {
1057 /// If this message indicates that a transfer has finished, returns the
1058 /// result of the transfer in `Some`.
1059 ///
1060 /// If the message doesn't indicate that a transfer has finished, then
1061 /// `None` is returned.
1062 ///
1063 /// Note that the `result*_for` methods below should be preferred as they
1064 /// provide better error messages as the associated error data on the
1065 /// handle can be associated with the error type.
1066 pub fn result(&self) -> Option<Result<(), Error>> {
1067 unsafe {
1068 if (*self.ptr).msg == curl_sys::CURLMSG_DONE {
1069 Some(crate::cvt((*self.ptr).data as curl_sys::CURLcode))
1070 } else {
1071 None
1072 }
1073 }
1074 }
1075
1076 /// Same as `result`, except only returns `Some` for the specified handle.
1077 ///
1078 /// Note that this function produces better error messages than `result` as
1079 /// it uses `take_error_buf` to associate error information with the
1080 /// returned error.
1081 pub fn result_for(&self, handle: &EasyHandle) -> Option<Result<(), Error>> {
1082 if !self.is_for(handle) {
1083 return None;
1084 }
1085 let mut err = self.result();
1086 if let Some(Err(e)) = &mut err {
1087 if let Some(s) = handle.easy.take_error_buf() {
1088 e.set_extra(s);
1089 }
1090 }
1091 err
1092 }
1093
1094 /// Same as `result`, except only returns `Some` for the specified handle.
1095 ///
1096 /// Note that this function produces better error messages than `result` as
1097 /// it uses `take_error_buf` to associate error information with the
1098 /// returned error.
1099 pub fn result_for2<H>(&self, handle: &Easy2Handle<H>) -> Option<Result<(), Error>> {
1100 if !self.is_for2(handle) {
1101 return None;
1102 }
1103 let mut err = self.result();
1104 if let Some(Err(e)) = &mut err {
1105 if let Some(s) = handle.easy.take_error_buf() {
1106 e.set_extra(s);
1107 }
1108 }
1109 err
1110 }
1111
1112 /// Returns whether this easy message was for the specified easy handle or
1113 /// not.
1114 pub fn is_for(&self, handle: &EasyHandle) -> bool {
1115 unsafe { (*self.ptr).easy_handle == handle.easy.raw() }
1116 }
1117
1118 /// Same as `is_for`, but for `Easy2Handle`.
1119 pub fn is_for2<H>(&self, handle: &Easy2Handle<H>) -> bool {
1120 unsafe { (*self.ptr).easy_handle == handle.easy.raw() }
1121 }
1122
1123 /// Returns the token associated with the easy handle that this message
1124 /// represents a completion for.
1125 ///
1126 /// This function will return the token assigned with
1127 /// `EasyHandle::set_token`. This reads the `CURLINFO_PRIVATE` field of the
1128 /// underlying `*mut CURL`.
1129 pub fn token(&self) -> Result<usize, Error> {
1130 unsafe {
1131 let mut p = 0usize;
1132 crate::cvt(curl_sys::curl_easy_getinfo(
1133 (*self.ptr).easy_handle,
1134 curl_sys::CURLINFO_PRIVATE,
1135 &mut p,
1136 ))?;
1137 Ok(p)
1138 }
1139 }
1140}
1141
1142impl<'a> fmt::Debug for Message<'a> {
1143 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1144 f.debug_struct("Message").field(name:"ptr", &self.ptr).finish()
1145 }
1146}
1147
1148impl Events {
1149 /// Creates a new blank event bit mask.
1150 pub fn new() -> Events {
1151 Events { bits: 0 }
1152 }
1153
1154 /// Set or unset the whether these events indicate that input is ready.
1155 pub fn input(&mut self, val: bool) -> &mut Events {
1156 self.flag(curl_sys::CURL_CSELECT_IN, val)
1157 }
1158
1159 /// Set or unset the whether these events indicate that output is ready.
1160 pub fn output(&mut self, val: bool) -> &mut Events {
1161 self.flag(curl_sys::CURL_CSELECT_OUT, val)
1162 }
1163
1164 /// Set or unset the whether these events indicate that an error has
1165 /// happened.
1166 pub fn error(&mut self, val: bool) -> &mut Events {
1167 self.flag(curl_sys::CURL_CSELECT_ERR, val)
1168 }
1169
1170 fn flag(&mut self, flag: c_int, val: bool) -> &mut Events {
1171 if val {
1172 self.bits |= flag;
1173 } else {
1174 self.bits &= !flag;
1175 }
1176 self
1177 }
1178}
1179
1180impl fmt::Debug for Events {
1181 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1182 f&mut DebugStruct<'_, '_>.debug_struct("Events")
1183 .field("input", &(self.bits & curl_sys::CURL_CSELECT_IN != 0))
1184 .field("output", &(self.bits & curl_sys::CURL_CSELECT_OUT != 0))
1185 .field(name:"error", &(self.bits & curl_sys::CURL_CSELECT_ERR != 0))
1186 .finish()
1187 }
1188}
1189
1190impl SocketEvents {
1191 /// Wait for incoming data. For the socket to become readable.
1192 pub fn input(&self) -> bool {
1193 self.bits & curl_sys::CURL_POLL_IN == curl_sys::CURL_POLL_IN
1194 }
1195
1196 /// Wait for outgoing data. For the socket to become writable.
1197 pub fn output(&self) -> bool {
1198 self.bits & curl_sys::CURL_POLL_OUT == curl_sys::CURL_POLL_OUT
1199 }
1200
1201 /// Wait for incoming and outgoing data. For the socket to become readable
1202 /// or writable.
1203 pub fn input_and_output(&self) -> bool {
1204 self.bits & curl_sys::CURL_POLL_INOUT == curl_sys::CURL_POLL_INOUT
1205 }
1206
1207 /// The specified socket/file descriptor is no longer used by libcurl.
1208 pub fn remove(&self) -> bool {
1209 self.bits & curl_sys::CURL_POLL_REMOVE == curl_sys::CURL_POLL_REMOVE
1210 }
1211}
1212
1213impl fmt::Debug for SocketEvents {
1214 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1215 f&mut DebugStruct<'_, '_>.debug_struct("Events")
1216 .field("input", &self.input())
1217 .field("output", &self.output())
1218 .field(name:"remove", &self.remove())
1219 .finish()
1220 }
1221}
1222
1223impl WaitFd {
1224 /// Constructs an empty (invalid) WaitFd.
1225 pub fn new() -> WaitFd {
1226 WaitFd {
1227 inner: curl_sys::curl_waitfd {
1228 fd: 0,
1229 events: 0,
1230 revents: 0,
1231 },
1232 }
1233 }
1234
1235 /// Set the file descriptor to wait for.
1236 pub fn set_fd(&mut self, fd: Socket) {
1237 self.inner.fd = fd;
1238 }
1239
1240 /// Indicate that the socket should poll on read events such as new data
1241 /// received.
1242 ///
1243 /// Corresponds to `CURL_WAIT_POLLIN`.
1244 pub fn poll_on_read(&mut self, val: bool) -> &mut WaitFd {
1245 self.flag(curl_sys::CURL_WAIT_POLLIN, val)
1246 }
1247
1248 /// Indicate that the socket should poll on high priority read events such
1249 /// as out of band data.
1250 ///
1251 /// Corresponds to `CURL_WAIT_POLLPRI`.
1252 pub fn poll_on_priority_read(&mut self, val: bool) -> &mut WaitFd {
1253 self.flag(curl_sys::CURL_WAIT_POLLPRI, val)
1254 }
1255
1256 /// Indicate that the socket should poll on write events such as the socket
1257 /// being clear to write without blocking.
1258 ///
1259 /// Corresponds to `CURL_WAIT_POLLOUT`.
1260 pub fn poll_on_write(&mut self, val: bool) -> &mut WaitFd {
1261 self.flag(curl_sys::CURL_WAIT_POLLOUT, val)
1262 }
1263
1264 fn flag(&mut self, flag: c_short, val: bool) -> &mut WaitFd {
1265 if val {
1266 self.inner.events |= flag;
1267 } else {
1268 self.inner.events &= !flag;
1269 }
1270 self
1271 }
1272
1273 /// After a call to `wait`, returns `true` if `poll_on_read` was set and a
1274 /// read event occured.
1275 pub fn received_read(&self) -> bool {
1276 self.inner.revents & curl_sys::CURL_WAIT_POLLIN == curl_sys::CURL_WAIT_POLLIN
1277 }
1278
1279 /// After a call to `wait`, returns `true` if `poll_on_priority_read` was set and a
1280 /// priority read event occured.
1281 pub fn received_priority_read(&self) -> bool {
1282 self.inner.revents & curl_sys::CURL_WAIT_POLLPRI == curl_sys::CURL_WAIT_POLLPRI
1283 }
1284
1285 /// After a call to `wait`, returns `true` if `poll_on_write` was set and a
1286 /// write event occured.
1287 pub fn received_write(&self) -> bool {
1288 self.inner.revents & curl_sys::CURL_WAIT_POLLOUT == curl_sys::CURL_WAIT_POLLOUT
1289 }
1290}
1291
1292#[cfg(unix)]
1293impl From<pollfd> for WaitFd {
1294 fn from(pfd: pollfd) -> WaitFd {
1295 let mut events: i16 = 0;
1296 if pfd.events & POLLIN == POLLIN {
1297 events |= curl_sys::CURL_WAIT_POLLIN;
1298 }
1299 if pfd.events & POLLPRI == POLLPRI {
1300 events |= curl_sys::CURL_WAIT_POLLPRI;
1301 }
1302 if pfd.events & POLLOUT == POLLOUT {
1303 events |= curl_sys::CURL_WAIT_POLLOUT;
1304 }
1305 WaitFd {
1306 inner: curl_sys::curl_waitfd {
1307 fd: pfd.fd,
1308 events,
1309 revents: 0,
1310 },
1311 }
1312 }
1313}
1314
1315impl fmt::Debug for WaitFd {
1316 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1317 f&mut DebugStruct<'_, '_>.debug_struct("WaitFd")
1318 .field("fd", &self.inner.fd)
1319 .field("events", &self.inner.fd)
1320 .field(name:"revents", &self.inner.fd)
1321 .finish()
1322 }
1323}
1324