1 | #![allow (clippy::single_component_path_imports)] |
2 | |
3 | use std::convert::Into; |
4 | use std::ffi::CString; |
5 | use std::marker::PhantomData; |
6 | use std::os::raw::c_void; |
7 | use std::ptr::{self, null_mut}; |
8 | use std::sync::atomic::{AtomicBool, AtomicPtr, Ordering}; |
9 | use std::sync::{Arc, RwLock, RwLockWriteGuard, Weak}; |
10 | |
11 | use crate::bindgen_runtime::{ |
12 | FromNapiValue, JsValuesTupleIntoVec, ToNapiValue, TypeName, ValidateNapiValue, |
13 | }; |
14 | use crate::{check_status, sys, Env, JsError, JsUnknown, Result, Status}; |
15 | |
16 | /// ThreadSafeFunction Context object |
17 | /// the `value` is the value passed to `call` method |
18 | pub struct ThreadSafeCallContext<T: 'static> { |
19 | pub env: Env, |
20 | pub value: T, |
21 | } |
22 | |
23 | #[repr (u8)] |
24 | #[derive (Clone, Copy, Debug, Eq, PartialEq)] |
25 | pub enum ThreadsafeFunctionCallMode { |
26 | NonBlocking, |
27 | Blocking, |
28 | } |
29 | |
30 | impl From<ThreadsafeFunctionCallMode> for sys::napi_threadsafe_function_call_mode { |
31 | fn from(value: ThreadsafeFunctionCallMode) -> Self { |
32 | match value { |
33 | ThreadsafeFunctionCallMode::Blocking => sys::ThreadsafeFunctionCallMode::blocking, |
34 | ThreadsafeFunctionCallMode::NonBlocking => sys::ThreadsafeFunctionCallMode::nonblocking, |
35 | } |
36 | } |
37 | } |
38 | |
39 | type_level_enum! { |
40 | /// Type-level `enum` to express how to feed [`ThreadsafeFunction`] errors to |
41 | /// the inner [`JsFunction`]. |
42 | /// |
43 | /// ### Context |
44 | /// |
45 | /// For callbacks that expect a `Result`-like kind of input, the convention is |
46 | /// to have the callback take an `error` parameter as its first parameter. |
47 | /// |
48 | /// This way receiving a `Result<Args…>` can be modelled as follows: |
49 | /// |
50 | /// - In case of `Err(error)`, feed that `error` entity as the first parameter |
51 | /// of the callback; |
52 | /// |
53 | /// - Otherwise (in case of `Ok(_)`), feed `null` instead. |
54 | /// |
55 | /// In pseudo-code: |
56 | /// |
57 | /// ```rust,ignore |
58 | /// match result_args { |
59 | /// Ok(args) => { |
60 | /// let js_null = /* … */; |
61 | /// callback.call( |
62 | /// // this |
63 | /// None, |
64 | /// // args… |
65 | /// &iter::once(js_null).chain(args).collect::<Vec<_>>(), |
66 | /// ) |
67 | /// }, |
68 | /// Err(err) => callback.call(None, &[JsError::from(err)]), |
69 | /// } |
70 | /// ``` |
71 | /// |
72 | /// **Note that the `Err` case can stem from a failed conversion from native |
73 | /// values to js values when calling the callback!** |
74 | /// |
75 | /// That's why: |
76 | /// |
77 | /// > **[This][`ErrorStrategy::CalleeHandled`] is the default error strategy**. |
78 | /// |
79 | /// In order to opt-out of it, [`ThreadsafeFunction`] has an optional second |
80 | /// generic parameter (of "kind" [`ErrorStrategy::T`]) that defines whether |
81 | /// this behavior ([`ErrorStrategy::CalleeHandled`]) or a non-`Result` one |
82 | /// ([`ErrorStrategy::Fatal`]) is desired. |
83 | pub enum ErrorStrategy { |
84 | /// Input errors (including conversion errors) are left for the callee to |
85 | /// handle: |
86 | /// |
87 | /// The callee receives an extra `error` parameter (the first one), which is |
88 | /// `null` if no error occurred, and the error payload otherwise. |
89 | CalleeHandled, |
90 | |
91 | /// Input errors (including conversion errors) are deemed fatal: |
92 | /// |
93 | /// they can thus cause a `panic!` or abort the process. |
94 | /// |
95 | /// The callee thus is not expected to have to deal with [that extra `error` |
96 | /// parameter][CalleeHandled], which is thus not added. |
97 | Fatal, |
98 | } |
99 | } |
100 | |
101 | struct ThreadsafeFunctionHandle { |
102 | raw: AtomicPtr<sys::napi_threadsafe_function__>, |
103 | aborted: RwLock<bool>, |
104 | referred: AtomicBool, |
105 | } |
106 | |
107 | impl ThreadsafeFunctionHandle { |
108 | /// create a Arc to hold the `ThreadsafeFunctionHandle` |
109 | fn new(raw: sys::napi_threadsafe_function) -> Arc<Self> { |
110 | Arc::new(Self { |
111 | raw: AtomicPtr::new(raw), |
112 | aborted: RwLock::new(false), |
113 | referred: AtomicBool::new(true), |
114 | }) |
115 | } |
116 | |
117 | /// Lock `aborted` with read access, call `f` with the value of `aborted`, then unlock it |
118 | fn with_read_aborted<RT, F>(&self, f: F) -> RT |
119 | where |
120 | F: FnOnce(bool) -> RT, |
121 | { |
122 | let aborted_guard = self |
123 | .aborted |
124 | .read() |
125 | .expect("Threadsafe Function aborted lock failed" ); |
126 | f(*aborted_guard) |
127 | } |
128 | |
129 | /// Lock `aborted` with write access, call `f` with the `RwLockWriteGuard`, then unlock it |
130 | fn with_write_aborted<RT, F>(&self, f: F) -> RT |
131 | where |
132 | F: FnOnce(RwLockWriteGuard<bool>) -> RT, |
133 | { |
134 | let aborted_guard = self |
135 | .aborted |
136 | .write() |
137 | .expect("Threadsafe Function aborted lock failed" ); |
138 | f(aborted_guard) |
139 | } |
140 | |
141 | #[allow (clippy::arc_with_non_send_sync)] |
142 | fn null() -> Arc<Self> { |
143 | Self::new(null_mut()) |
144 | } |
145 | |
146 | fn get_raw(&self) -> sys::napi_threadsafe_function { |
147 | self.raw.load(Ordering::SeqCst) |
148 | } |
149 | |
150 | fn set_raw(&self, raw: sys::napi_threadsafe_function) { |
151 | self.raw.store(raw, Ordering::SeqCst) |
152 | } |
153 | } |
154 | |
155 | impl Drop for ThreadsafeFunctionHandle { |
156 | fn drop(&mut self) { |
157 | self.with_read_aborted(|aborted: bool| { |
158 | if !aborted { |
159 | let release_status: i32 = unsafe { |
160 | sys::napi_release_threadsafe_function( |
161 | self.get_raw(), |
162 | mode:sys::ThreadsafeFunctionReleaseMode::release, |
163 | ) |
164 | }; |
165 | assert!( |
166 | release_status == sys::Status::napi_ok, |
167 | "Threadsafe Function release failed {}" , |
168 | Status::from(release_status) |
169 | ); |
170 | } |
171 | }) |
172 | } |
173 | } |
174 | |
175 | #[repr (u8)] |
176 | enum ThreadsafeFunctionCallVariant { |
177 | Direct, |
178 | WithCallback, |
179 | } |
180 | |
181 | struct ThreadsafeFunctionCallJsBackData<T> { |
182 | data: T, |
183 | call_variant: ThreadsafeFunctionCallVariant, |
184 | callback: Box<dyn FnOnce(Result<JsUnknown>) -> Result<()>>, |
185 | } |
186 | |
187 | /// Communicate with the addon's main thread by invoking a JavaScript function from other threads. |
188 | /// |
189 | /// ## Example |
190 | /// An example of using `ThreadsafeFunction`: |
191 | /// |
192 | /// ```rust |
193 | /// #[macro_use] |
194 | /// extern crate napi_derive; |
195 | /// |
196 | /// use std::thread; |
197 | /// |
198 | /// use napi::{ |
199 | /// threadsafe_function::{ |
200 | /// ThreadSafeCallContext, ThreadsafeFunctionCallMode, ThreadsafeFunctionReleaseMode, |
201 | /// }, |
202 | /// CallContext, Error, JsFunction, JsNumber, JsUndefined, Result, Status, |
203 | /// }; |
204 | /// |
205 | /// #[js_function(1)] |
206 | /// pub fn test_threadsafe_function(ctx: CallContext) -> Result<JsUndefined> { |
207 | /// let func = ctx.get::<JsFunction>(0)?; |
208 | /// |
209 | /// let tsfn = |
210 | /// ctx |
211 | /// .env |
212 | /// .create_threadsafe_function(&func, 0, |ctx: ThreadSafeCallContext<Vec<u32>>| { |
213 | /// ctx.value |
214 | /// .iter() |
215 | /// .map(|v| ctx.env.create_uint32(*v)) |
216 | /// .collect::<Result<Vec<JsNumber>>>() |
217 | /// })?; |
218 | /// |
219 | /// let tsfn_cloned = tsfn.clone(); |
220 | /// |
221 | /// thread::spawn(move || { |
222 | /// let output: Vec<u32> = vec![0, 1, 2, 3]; |
223 | /// // It's okay to call a threadsafe function multiple times. |
224 | /// tsfn.call(Ok(output.clone()), ThreadsafeFunctionCallMode::Blocking); |
225 | /// }); |
226 | /// |
227 | /// thread::spawn(move || { |
228 | /// let output: Vec<u32> = vec![3, 2, 1, 0]; |
229 | /// // It's okay to call a threadsafe function multiple times. |
230 | /// tsfn_cloned.call(Ok(output.clone()), ThreadsafeFunctionCallMode::NonBlocking); |
231 | /// }); |
232 | /// |
233 | /// ctx.env.get_undefined() |
234 | /// } |
235 | /// ``` |
236 | pub struct ThreadsafeFunction<T: 'static, ES: ErrorStrategy::T = ErrorStrategy::CalleeHandled> { |
237 | handle: Arc<ThreadsafeFunctionHandle>, |
238 | _phantom: PhantomData<(T, ES)>, |
239 | } |
240 | |
241 | unsafe impl<T: 'static, ES: ErrorStrategy::T> Send for ThreadsafeFunction<T, ES> {} |
242 | unsafe impl<T: 'static, ES: ErrorStrategy::T> Sync for ThreadsafeFunction<T, ES> {} |
243 | |
244 | impl<T: 'static, ES: ErrorStrategy::T> Clone for ThreadsafeFunction<T, ES> { |
245 | fn clone(&self) -> Self { |
246 | self.handle.with_read_aborted(|aborted: bool| { |
247 | if aborted { |
248 | panic!("ThreadsafeFunction was aborted, can not clone it" ); |
249 | }; |
250 | |
251 | Self { |
252 | handle: self.handle.clone(), |
253 | _phantom: PhantomData, |
254 | } |
255 | }) |
256 | } |
257 | } |
258 | |
259 | impl<T: ToNapiValue> JsValuesTupleIntoVec for T { |
260 | #[allow (clippy::not_unsafe_ptr_arg_deref)] |
261 | fn into_vec(self, env: sys::napi_env) -> Result<Vec<sys::napi_value>> { |
262 | Ok(vec![unsafe { |
263 | <T as ToNapiValue>::to_napi_value(env, self)? |
264 | }]) |
265 | } |
266 | } |
267 | |
268 | macro_rules! impl_js_value_tuple_to_vec { |
269 | ($($ident:ident),*) => { |
270 | impl<$($ident: ToNapiValue),*> JsValuesTupleIntoVec for ($($ident,)*) { |
271 | #[allow(clippy::not_unsafe_ptr_arg_deref)] |
272 | fn into_vec(self, env: sys::napi_env) -> Result<Vec<sys::napi_value>> { |
273 | #[allow(non_snake_case)] |
274 | let ($($ident,)*) = self; |
275 | Ok(vec![$(unsafe { <$ident as ToNapiValue>::to_napi_value(env, $ident)? }),*]) |
276 | } |
277 | } |
278 | }; |
279 | } |
280 | |
281 | impl_js_value_tuple_to_vec!(A); |
282 | impl_js_value_tuple_to_vec!(A, B); |
283 | impl_js_value_tuple_to_vec!(A, B, C); |
284 | impl_js_value_tuple_to_vec!(A, B, C, D); |
285 | impl_js_value_tuple_to_vec!(A, B, C, D, E); |
286 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F); |
287 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G); |
288 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H); |
289 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I); |
290 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J); |
291 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K); |
292 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L); |
293 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M); |
294 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N); |
295 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O); |
296 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P); |
297 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q); |
298 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R); |
299 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S); |
300 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T); |
301 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U); |
302 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V); |
303 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W); |
304 | impl_js_value_tuple_to_vec!(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X); |
305 | impl_js_value_tuple_to_vec!( |
306 | A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y |
307 | ); |
308 | impl_js_value_tuple_to_vec!( |
309 | A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z |
310 | ); |
311 | |
312 | impl<T: JsValuesTupleIntoVec + 'static, ES: ErrorStrategy::T> FromNapiValue |
313 | for ThreadsafeFunction<T, ES> |
314 | { |
315 | unsafe fn from_napi_value(env: sys::napi_env, napi_val: sys::napi_value) -> Result<Self> { |
316 | Self::create(env, func:napi_val, max_queue_size:0, |ctx: ThreadSafeCallContext| ctx.value.into_vec(env:ctx.env.0)) |
317 | } |
318 | } |
319 | |
320 | impl<T: 'static, ES: ErrorStrategy::T> ThreadsafeFunction<T, ES> { |
321 | /// See [napi_create_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_create_threadsafe_function) |
322 | /// for more information. |
323 | pub(crate) fn create< |
324 | V: ToNapiValue, |
325 | R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>, |
326 | >( |
327 | env: sys::napi_env, |
328 | func: sys::napi_value, |
329 | max_queue_size: usize, |
330 | callback: R, |
331 | ) -> Result<Self> { |
332 | let mut async_resource_name = ptr::null_mut(); |
333 | let s = "napi_rs_threadsafe_function" ; |
334 | let len = s.len(); |
335 | let s = CString::new(s)?; |
336 | check_status!(unsafe { |
337 | sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_resource_name) |
338 | })?; |
339 | |
340 | let mut raw_tsfn = ptr::null_mut(); |
341 | let callback_ptr = Box::into_raw(Box::new(callback)); |
342 | let handle = ThreadsafeFunctionHandle::null(); |
343 | check_status!(unsafe { |
344 | sys::napi_create_threadsafe_function( |
345 | env, |
346 | func, |
347 | ptr::null_mut(), |
348 | async_resource_name, |
349 | max_queue_size, |
350 | 1, |
351 | Arc::downgrade(&handle).into_raw() as *mut c_void, // pass handler to thread_finalize_cb |
352 | Some(thread_finalize_cb::<T, V, R>), |
353 | callback_ptr.cast(), |
354 | Some(call_js_cb::<T, V, R, ES>), |
355 | &mut raw_tsfn, |
356 | ) |
357 | })?; |
358 | handle.set_raw(raw_tsfn); |
359 | |
360 | Ok(ThreadsafeFunction { |
361 | handle, |
362 | _phantom: PhantomData, |
363 | }) |
364 | } |
365 | |
366 | /// See [napi_ref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_ref_threadsafe_function) |
367 | /// for more information. |
368 | /// |
369 | /// "ref" is a keyword so that we use "refer" here. |
370 | pub fn refer(&mut self, env: &Env) -> Result<()> { |
371 | self.handle.with_read_aborted(|aborted| { |
372 | if !aborted && !self.handle.referred.load(Ordering::Relaxed) { |
373 | check_status!(unsafe { sys::napi_ref_threadsafe_function(env.0, self.handle.get_raw()) })?; |
374 | self.handle.referred.store(true, Ordering::Relaxed); |
375 | } |
376 | Ok(()) |
377 | }) |
378 | } |
379 | |
380 | /// See [napi_unref_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_unref_threadsafe_function) |
381 | /// for more information. |
382 | pub fn unref(&mut self, env: &Env) -> Result<()> { |
383 | self.handle.with_read_aborted(|aborted| { |
384 | if !aborted && self.handle.referred.load(Ordering::Relaxed) { |
385 | check_status!(unsafe { |
386 | sys::napi_unref_threadsafe_function(env.0, self.handle.get_raw()) |
387 | })?; |
388 | self.handle.referred.store(false, Ordering::Relaxed); |
389 | } |
390 | Ok(()) |
391 | }) |
392 | } |
393 | |
394 | pub fn aborted(&self) -> bool { |
395 | self.handle.with_read_aborted(|aborted| aborted) |
396 | } |
397 | |
398 | pub fn abort(self) -> Result<()> { |
399 | self.handle.with_write_aborted(|mut aborted_guard| { |
400 | if !*aborted_guard { |
401 | check_status!(unsafe { |
402 | sys::napi_release_threadsafe_function( |
403 | self.handle.get_raw(), |
404 | sys::ThreadsafeFunctionReleaseMode::abort, |
405 | ) |
406 | })?; |
407 | *aborted_guard = true; |
408 | } |
409 | Ok(()) |
410 | }) |
411 | } |
412 | |
413 | /// Get the raw `ThreadSafeFunction` pointer |
414 | pub fn raw(&self) -> sys::napi_threadsafe_function { |
415 | self.handle.get_raw() |
416 | } |
417 | } |
418 | |
419 | impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::CalleeHandled> { |
420 | /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) |
421 | /// for more information. |
422 | pub fn call(&self, value: Result<T>, mode: ThreadsafeFunctionCallMode) -> Status { |
423 | self.handle.with_read_aborted(|aborted| { |
424 | if aborted { |
425 | return Status::Closing; |
426 | } |
427 | |
428 | unsafe { |
429 | sys::napi_call_threadsafe_function( |
430 | self.handle.get_raw(), |
431 | Box::into_raw(Box::new(value.map(|data| { |
432 | ThreadsafeFunctionCallJsBackData { |
433 | data, |
434 | call_variant: ThreadsafeFunctionCallVariant::Direct, |
435 | callback: Box::new(|_d: Result<JsUnknown>| Ok(())), |
436 | } |
437 | }))) |
438 | .cast(), |
439 | mode.into(), |
440 | ) |
441 | } |
442 | .into() |
443 | }) |
444 | } |
445 | |
446 | pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>( |
447 | &self, |
448 | value: Result<T>, |
449 | mode: ThreadsafeFunctionCallMode, |
450 | cb: F, |
451 | ) -> Status { |
452 | self.handle.with_read_aborted(|aborted| { |
453 | if aborted { |
454 | return Status::Closing; |
455 | } |
456 | |
457 | unsafe { |
458 | sys::napi_call_threadsafe_function( |
459 | self.handle.get_raw(), |
460 | Box::into_raw(Box::new(value.map(|data| { |
461 | ThreadsafeFunctionCallJsBackData { |
462 | data, |
463 | call_variant: ThreadsafeFunctionCallVariant::WithCallback, |
464 | callback: Box::new(move |d: Result<JsUnknown>| { |
465 | d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb)) |
466 | }), |
467 | } |
468 | }))) |
469 | .cast(), |
470 | mode.into(), |
471 | ) |
472 | } |
473 | .into() |
474 | }) |
475 | } |
476 | |
477 | #[cfg (feature = "tokio_rt" )] |
478 | pub async fn call_async<D: 'static + FromNapiValue>(&self, value: Result<T>) -> Result<D> { |
479 | let (sender, receiver) = tokio::sync::oneshot::channel::<Result<D>>(); |
480 | |
481 | self.handle.with_read_aborted(|aborted| { |
482 | if aborted { |
483 | return Err(crate::Error::from_status(Status::Closing)); |
484 | } |
485 | |
486 | check_status!( |
487 | unsafe { |
488 | sys::napi_call_threadsafe_function( |
489 | self.handle.get_raw(), |
490 | Box::into_raw(Box::new(value.map(|data| { |
491 | ThreadsafeFunctionCallJsBackData { |
492 | data, |
493 | call_variant: ThreadsafeFunctionCallVariant::WithCallback, |
494 | callback: Box::new(move |d: Result<JsUnknown>| { |
495 | sender |
496 | .send(d.and_then(|d| D::from_napi_value(d.0.env, d.0.value))) |
497 | // The only reason for send to return Err is if the receiver isn't listening |
498 | // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead. |
499 | .or(Ok(())) |
500 | }), |
501 | } |
502 | }))) |
503 | .cast(), |
504 | ThreadsafeFunctionCallMode::NonBlocking.into(), |
505 | ) |
506 | }, |
507 | "Threadsafe function call_async failed" |
508 | ) |
509 | })?; |
510 | receiver |
511 | .await |
512 | .map_err(|_| { |
513 | crate::Error::new( |
514 | Status::GenericFailure, |
515 | "Receive value from threadsafe function sender failed" , |
516 | ) |
517 | }) |
518 | .and_then(|ret| ret) |
519 | } |
520 | } |
521 | |
522 | impl<T: 'static> ThreadsafeFunction<T, ErrorStrategy::Fatal> { |
523 | /// See [napi_call_threadsafe_function](https://nodejs.org/api/n-api.html#n_api_napi_call_threadsafe_function) |
524 | /// for more information. |
525 | pub fn call(&self, value: T, mode: ThreadsafeFunctionCallMode) -> Status { |
526 | self.handle.with_read_aborted(|aborted| { |
527 | if aborted { |
528 | return Status::Closing; |
529 | } |
530 | |
531 | unsafe { |
532 | sys::napi_call_threadsafe_function( |
533 | self.handle.get_raw(), |
534 | Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { |
535 | data: value, |
536 | call_variant: ThreadsafeFunctionCallVariant::Direct, |
537 | callback: Box::new(|_d: Result<JsUnknown>| Ok(())), |
538 | })) |
539 | .cast(), |
540 | mode.into(), |
541 | ) |
542 | } |
543 | .into() |
544 | }) |
545 | } |
546 | |
547 | pub fn call_with_return_value<D: FromNapiValue, F: 'static + FnOnce(D) -> Result<()>>( |
548 | &self, |
549 | value: T, |
550 | mode: ThreadsafeFunctionCallMode, |
551 | cb: F, |
552 | ) -> Status { |
553 | self.handle.with_read_aborted(|aborted| { |
554 | if aborted { |
555 | return Status::Closing; |
556 | } |
557 | |
558 | unsafe { |
559 | sys::napi_call_threadsafe_function( |
560 | self.handle.get_raw(), |
561 | Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { |
562 | data: value, |
563 | call_variant: ThreadsafeFunctionCallVariant::WithCallback, |
564 | callback: Box::new(move |d: Result<JsUnknown>| { |
565 | d.and_then(|d| D::from_napi_value(d.0.env, d.0.value).and_then(cb)) |
566 | }), |
567 | })) |
568 | .cast(), |
569 | mode.into(), |
570 | ) |
571 | } |
572 | .into() |
573 | }) |
574 | } |
575 | |
576 | #[cfg (feature = "tokio_rt" )] |
577 | pub async fn call_async<D: 'static + FromNapiValue>(&self, value: T) -> Result<D> { |
578 | let (sender, receiver) = tokio::sync::oneshot::channel::<D>(); |
579 | |
580 | self.handle.with_read_aborted(|aborted| { |
581 | if aborted { |
582 | return Err(crate::Error::from_status(Status::Closing)); |
583 | } |
584 | |
585 | check_status!(unsafe { |
586 | sys::napi_call_threadsafe_function( |
587 | self.handle.get_raw(), |
588 | Box::into_raw(Box::new(ThreadsafeFunctionCallJsBackData { |
589 | data: value, |
590 | call_variant: ThreadsafeFunctionCallVariant::WithCallback, |
591 | callback: Box::new(move |d: Result<JsUnknown>| { |
592 | d.and_then(|d| { |
593 | D::from_napi_value(d.0.env, d.0.value).and_then(move |d| { |
594 | sender |
595 | .send(d) |
596 | // The only reason for send to return Err is if the receiver isn't listening |
597 | // Not hiding the error would result in a napi_fatal_error call, it's safe to ignore it instead. |
598 | .or(Ok(())) |
599 | }) |
600 | }) |
601 | }), |
602 | })) |
603 | .cast(), |
604 | ThreadsafeFunctionCallMode::NonBlocking.into(), |
605 | ) |
606 | }) |
607 | })?; |
608 | |
609 | receiver |
610 | .await |
611 | .map_err(|err| crate::Error::new(Status::GenericFailure, format!(" {}" , err))) |
612 | } |
613 | } |
614 | |
615 | #[allow (unused_variables)] |
616 | unsafe extern "C" fn thread_finalize_cb<T: 'static, V: ToNapiValue, R>( |
617 | env: sys::napi_env, |
618 | finalize_data: *mut c_void, |
619 | finalize_hint: *mut c_void, |
620 | ) where |
621 | R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>, |
622 | { |
623 | let handle_option: Option> = |
624 | unsafe { Weak::from_raw(ptr:finalize_data.cast::<ThreadsafeFunctionHandle>()).upgrade() }; |
625 | |
626 | if let Some(handle: Arc) = handle_option { |
627 | handle.with_write_aborted(|mut aborted_guard: RwLockWriteGuard<'_, bool>| { |
628 | if !*aborted_guard { |
629 | *aborted_guard = true; |
630 | } |
631 | }); |
632 | } |
633 | |
634 | // cleanup |
635 | drop(unsafe { Box::<R>::from_raw(finalize_hint.cast()) }); |
636 | } |
637 | |
638 | unsafe extern "C" fn call_js_cb<T: 'static, V: ToNapiValue, R, ES>( |
639 | raw_env: sys::napi_env, |
640 | js_callback: sys::napi_value, |
641 | context: *mut c_void, |
642 | data: *mut c_void, |
643 | ) where |
644 | R: 'static + Send + FnMut(ThreadSafeCallContext<T>) -> Result<Vec<V>>, |
645 | ES: ErrorStrategy::T, |
646 | { |
647 | // env and/or callback can be null when shutting down |
648 | if raw_env.is_null() || js_callback.is_null() { |
649 | return; |
650 | } |
651 | |
652 | let ctx: &mut R = unsafe { Box::leak(Box::from_raw(context.cast())) }; |
653 | let val = unsafe { |
654 | match ES::VALUE { |
655 | ErrorStrategy::CalleeHandled::VALUE => { |
656 | *Box::<Result<ThreadsafeFunctionCallJsBackData<T>>>::from_raw(data.cast()) |
657 | } |
658 | ErrorStrategy::Fatal::VALUE => Ok(*Box::<ThreadsafeFunctionCallJsBackData<T>>::from_raw( |
659 | data.cast(), |
660 | )), |
661 | } |
662 | }; |
663 | |
664 | let mut recv = ptr::null_mut(); |
665 | unsafe { sys::napi_get_undefined(raw_env, &mut recv) }; |
666 | |
667 | let ret = val.and_then(|v| { |
668 | (ctx)(ThreadSafeCallContext { |
669 | env: unsafe { Env::from_raw(raw_env) }, |
670 | value: v.data, |
671 | }) |
672 | .map(|ret| (ret, v.call_variant, v.callback)) |
673 | }); |
674 | |
675 | // Follow async callback conventions: https://nodejs.org/en/knowledge/errors/what-are-the-error-conventions/ |
676 | // Check if the Result is okay, if so, pass a null as the first (error) argument automatically. |
677 | // If the Result is an error, pass that as the first argument. |
678 | let status = match ret { |
679 | Ok((values, call_variant, callback)) => { |
680 | let values = values |
681 | .into_iter() |
682 | .map(|v| unsafe { ToNapiValue::to_napi_value(raw_env, v) }); |
683 | let args: Result<Vec<sys::napi_value>> = if ES::VALUE == ErrorStrategy::CalleeHandled::VALUE { |
684 | let mut js_null = ptr::null_mut(); |
685 | unsafe { sys::napi_get_null(raw_env, &mut js_null) }; |
686 | ::core::iter::once(Ok(js_null)).chain(values).collect() |
687 | } else { |
688 | values.collect() |
689 | }; |
690 | let mut return_value = ptr::null_mut(); |
691 | let mut status = match args { |
692 | Ok(args) => unsafe { |
693 | sys::napi_call_function( |
694 | raw_env, |
695 | recv, |
696 | js_callback, |
697 | args.len(), |
698 | args.as_ptr(), |
699 | &mut return_value, |
700 | ) |
701 | }, |
702 | Err(e) => match ES::VALUE { |
703 | ErrorStrategy::Fatal::VALUE => unsafe { |
704 | sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env)) |
705 | }, |
706 | ErrorStrategy::CalleeHandled::VALUE => unsafe { |
707 | sys::napi_call_function( |
708 | raw_env, |
709 | recv, |
710 | js_callback, |
711 | 1, |
712 | [JsError::from(e).into_value(raw_env)].as_mut_ptr(), |
713 | &mut return_value, |
714 | ) |
715 | }, |
716 | }, |
717 | }; |
718 | if let ThreadsafeFunctionCallVariant::WithCallback = call_variant { |
719 | // throw Error in JavaScript callback |
720 | let callback_arg = if status == sys::Status::napi_pending_exception { |
721 | let mut exception = ptr::null_mut(); |
722 | status = unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut exception) }; |
723 | Err( |
724 | JsUnknown(crate::Value { |
725 | env: raw_env, |
726 | value: exception, |
727 | value_type: crate::ValueType::Unknown, |
728 | }) |
729 | .into(), |
730 | ) |
731 | } else { |
732 | Ok(JsUnknown(crate::Value { |
733 | env: raw_env, |
734 | value: return_value, |
735 | value_type: crate::ValueType::Unknown, |
736 | })) |
737 | }; |
738 | if let Err(err) = callback(callback_arg) { |
739 | let message = format!( |
740 | "Failed to convert return value in ThreadsafeFunction callback into Rust value: {}" , |
741 | err |
742 | ); |
743 | let message_length = message.len(); |
744 | let c_message = CString::new(message).unwrap(); |
745 | unsafe { |
746 | sys::napi_fatal_error( |
747 | "threadsafe_function.rs:749 \0" .as_ptr().cast(), |
748 | 26, |
749 | c_message.as_ptr(), |
750 | message_length, |
751 | ) |
752 | }; |
753 | } |
754 | } |
755 | status |
756 | } |
757 | Err(e) if ES::VALUE == ErrorStrategy::Fatal::VALUE => unsafe { |
758 | sys::napi_fatal_exception(raw_env, JsError::from(e).into_value(raw_env)) |
759 | }, |
760 | Err(e) => unsafe { |
761 | sys::napi_call_function( |
762 | raw_env, |
763 | recv, |
764 | js_callback, |
765 | 1, |
766 | [JsError::from(e).into_value(raw_env)].as_mut_ptr(), |
767 | ptr::null_mut(), |
768 | ) |
769 | }, |
770 | }; |
771 | if status == sys::Status::napi_ok { |
772 | return; |
773 | } |
774 | if status == sys::Status::napi_pending_exception { |
775 | let mut error_result = ptr::null_mut(); |
776 | assert_eq!( |
777 | unsafe { sys::napi_get_and_clear_last_exception(raw_env, &mut error_result) }, |
778 | sys::Status::napi_ok |
779 | ); |
780 | |
781 | // When shutting down, napi_fatal_exception sometimes returns another exception |
782 | let stat = unsafe { sys::napi_fatal_exception(raw_env, error_result) }; |
783 | assert!(stat == sys::Status::napi_ok || stat == sys::Status::napi_pending_exception); |
784 | } else { |
785 | let error_code: Status = status.into(); |
786 | let error_code_string = format!(" {:?}" , error_code); |
787 | let mut error_code_value = ptr::null_mut(); |
788 | assert_eq!( |
789 | unsafe { |
790 | sys::napi_create_string_utf8( |
791 | raw_env, |
792 | error_code_string.as_ptr() as *const _, |
793 | error_code_string.len(), |
794 | &mut error_code_value, |
795 | ) |
796 | }, |
797 | sys::Status::napi_ok, |
798 | ); |
799 | let error_msg = "Call JavaScript callback failed in threadsafe function" ; |
800 | let mut error_msg_value = ptr::null_mut(); |
801 | assert_eq!( |
802 | unsafe { |
803 | sys::napi_create_string_utf8( |
804 | raw_env, |
805 | error_msg.as_ptr() as *const _, |
806 | error_msg.len(), |
807 | &mut error_msg_value, |
808 | ) |
809 | }, |
810 | sys::Status::napi_ok, |
811 | ); |
812 | let mut error_value = ptr::null_mut(); |
813 | assert_eq!( |
814 | unsafe { |
815 | sys::napi_create_error(raw_env, error_code_value, error_msg_value, &mut error_value) |
816 | }, |
817 | sys::Status::napi_ok, |
818 | ); |
819 | assert_eq!( |
820 | unsafe { sys::napi_fatal_exception(raw_env, error_value) }, |
821 | sys::Status::napi_ok |
822 | ); |
823 | } |
824 | } |
825 | |
826 | /// Helper |
827 | macro_rules! type_level_enum {( |
828 | $( #[doc = $doc:tt] )* |
829 | $pub:vis |
830 | enum $EnumName:ident { |
831 | $( |
832 | $( #[doc = $doc_variant:tt] )* |
833 | $Variant:ident |
834 | ),* $(,)? |
835 | } |
836 | ) => (type_level_enum! { // This requires the macro to be in scope when called. |
837 | with_docs! { |
838 | $( #[doc = $doc] )* |
839 | /// |
840 | /// ### Type-level `enum` |
841 | /// |
842 | /// Until `const_generics` can handle custom `enum`s, this pattern must be |
843 | /// implemented at the type level. |
844 | /// |
845 | /// We thus end up with: |
846 | /// |
847 | /// ```rust,ignore |
848 | /// #[type_level_enum] |
849 | #[doc = ::core::concat!( |
850 | " enum " , ::core::stringify!($EnumName), " {" , |
851 | )] |
852 | $( |
853 | #[doc = ::core::concat!( |
854 | " " , ::core::stringify!($Variant), "," , |
855 | )] |
856 | )* |
857 | #[doc = " }" ] |
858 | /// ``` |
859 | /// |
860 | #[doc = ::core::concat!( |
861 | "With [`" , ::core::stringify!($EnumName), "::T`](#reexports) \ |
862 | being the type-level \"enum type \":" , |
863 | )] |
864 | /// |
865 | /// ```rust,ignore |
866 | #[doc = ::core::concat!( |
867 | "<Param: " , ::core::stringify!($EnumName), "::T>" |
868 | )] |
869 | /// ``` |
870 | } |
871 | #[allow(warnings)] |
872 | $pub mod $EnumName { |
873 | #[doc(no_inline)] |
874 | pub use $EnumName as T; |
875 | |
876 | super::type_level_enum! { |
877 | with_docs! { |
878 | #[doc = ::core::concat!( |
879 | "See [`" , ::core::stringify!($EnumName), "`]\ |
880 | [super::" , ::core::stringify!($EnumName), "]" |
881 | )] |
882 | } |
883 | pub trait $EnumName : __sealed::$EnumName + ::core::marker::Sized + 'static { |
884 | const VALUE: __value::$EnumName; |
885 | } |
886 | } |
887 | |
888 | mod __sealed { pub trait $EnumName {} } |
889 | |
890 | mod __value { |
891 | #[derive(Debug, PartialEq, Eq)] |
892 | pub enum $EnumName { $( $Variant ),* } |
893 | } |
894 | |
895 | $( |
896 | $( #[doc = $doc_variant] )* |
897 | pub enum $Variant {} |
898 | impl __sealed::$EnumName for $Variant {} |
899 | impl $EnumName for $Variant { |
900 | const VALUE: __value::$EnumName = __value::$EnumName::$Variant; |
901 | } |
902 | impl $Variant { |
903 | pub const VALUE: __value::$EnumName = __value::$EnumName::$Variant; |
904 | } |
905 | )* |
906 | } |
907 | });( |
908 | with_docs! { |
909 | $( #[doc = $doc:expr] )* |
910 | } |
911 | $item:item |
912 | ) => ( |
913 | $( #[doc = $doc] )* |
914 | $item |
915 | )} |
916 | |
917 | use type_level_enum; |
918 | |
919 | pub struct UnknownReturnValue; |
920 | |
921 | impl TypeName for UnknownReturnValue { |
922 | fn type_name() -> &'static str { |
923 | "UnknownReturnValue" |
924 | } |
925 | |
926 | fn value_type() -> crate::ValueType { |
927 | crate::ValueType::Unknown |
928 | } |
929 | } |
930 | |
931 | impl ValidateNapiValue for UnknownReturnValue {} |
932 | |
933 | impl FromNapiValue for UnknownReturnValue { |
934 | unsafe fn from_napi_value(_env: sys::napi_env, _napi_val: sys::napi_value) -> Result<Self> { |
935 | Ok(UnknownReturnValue) |
936 | } |
937 | } |
938 | |