1use std::ffi::CString;
2use std::mem;
3use std::os::raw::c_void;
4use std::ptr;
5use std::rc::Rc;
6use std::sync::atomic::{AtomicU8, Ordering};
7
8use crate::{
9 bindgen_runtime::ToNapiValue, check_status, js_values::NapiValue, sys, Env, JsError, JsObject,
10 Result, Task,
11};
12
13struct AsyncWork<T: Task> {
14 inner_task: T,
15 deferred: sys::napi_deferred,
16 value: Result<mem::MaybeUninit<T::Output>>,
17 napi_async_work: sys::napi_async_work,
18 status: Rc<AtomicU8>,
19}
20
21pub struct AsyncWorkPromise {
22 pub(crate) napi_async_work: sys::napi_async_work,
23 raw_promise: sys::napi_value,
24 pub(crate) deferred: sys::napi_deferred,
25 env: sys::napi_env,
26 /// share with AsyncWork
27 /// 0: not started
28 /// 1: completed
29 /// 2: canceled
30 pub(crate) status: Rc<AtomicU8>,
31}
32
33impl AsyncWorkPromise {
34 pub fn promise_object(&self) -> JsObject {
35 unsafe { JsObject::from_raw_unchecked(self.env, self.raw_promise) }
36 }
37
38 pub fn cancel(&self) -> Result<()> {
39 // must be happened in the main thread, relaxed is enough
40 self.status.store(val:2, order:Ordering::Relaxed);
41 check_status!(unsafe { sys::napi_cancel_async_work(self.env, self.napi_async_work) })
42 }
43}
44
45pub fn run<T: Task>(
46 env: sys::napi_env,
47 task: T,
48 abort_status: Option<Rc<AtomicU8>>,
49) -> Result<AsyncWorkPromise> {
50 let mut raw_resource = ptr::null_mut();
51 check_status!(unsafe { sys::napi_create_object(env, &mut raw_resource) })?;
52 let mut raw_promise = ptr::null_mut();
53 let mut deferred = ptr::null_mut();
54 check_status!(unsafe { sys::napi_create_promise(env, &mut deferred, &mut raw_promise) })?;
55 let task_status = abort_status.unwrap_or_else(|| Rc::new(AtomicU8::new(0)));
56 let result = Box::leak(Box::new(AsyncWork {
57 inner_task: task,
58 deferred,
59 value: Ok(mem::MaybeUninit::zeroed()),
60 napi_async_work: ptr::null_mut(),
61 status: task_status.clone(),
62 }));
63 let mut async_work_name = ptr::null_mut();
64 let s = "napi_rs_async_work";
65 let len = s.len();
66 let s = CString::new(s)?;
67 check_status!(unsafe {
68 sys::napi_create_string_utf8(env, s.as_ptr(), len, &mut async_work_name)
69 })?;
70 check_status!(unsafe {
71 sys::napi_create_async_work(
72 env,
73 raw_resource,
74 async_work_name,
75 Some(execute::<T>),
76 Some(complete::<T>),
77 (result as *mut AsyncWork<T>).cast(),
78 &mut result.napi_async_work,
79 )
80 })?;
81 check_status!(unsafe { sys::napi_queue_async_work(env, result.napi_async_work) })?;
82 Ok(AsyncWorkPromise {
83 napi_async_work: result.napi_async_work,
84 raw_promise,
85 deferred,
86 env,
87 status: task_status,
88 })
89}
90
91unsafe impl<T: Task + Send> Send for AsyncWork<T> {}
92unsafe impl<T: Task + Sync> Sync for AsyncWork<T> {}
93
94/// env here is the same with the one in `CallContext`.
95/// So it actually could do nothing here, because `execute` function is called in the other thread mostly.
96unsafe extern "C" fn execute<T: Task>(_env: sys::napi_env, data: *mut c_void) {
97 let mut work: Box> = unsafe { Box::from_raw(data as *mut AsyncWork<T>) };
98 let _ = mem::replace(
99 &mut work.value,
100 src:work.inner_task.compute().map(op:mem::MaybeUninit::new),
101 );
102 Box::leak(work);
103}
104
105unsafe extern "C" fn complete<T: Task>(
106 env: sys::napi_env,
107 status: sys::napi_status,
108 data: *mut c_void,
109) {
110 let mut work = unsafe { Box::from_raw(data as *mut AsyncWork<T>) };
111 let value_ptr = mem::replace(&mut work.value, Ok(mem::MaybeUninit::zeroed()));
112 let deferred = mem::replace(&mut work.deferred, ptr::null_mut());
113 let napi_async_work = mem::replace(&mut work.napi_async_work, ptr::null_mut());
114 let value = match value_ptr {
115 Ok(v) => {
116 let output = unsafe { v.assume_init() };
117 work
118 .inner_task
119 .resolve(unsafe { Env::from_raw(env) }, output)
120 }
121 Err(e) => work.inner_task.reject(unsafe { Env::from_raw(env) }, e),
122 };
123 if status != sys::Status::napi_cancelled && work.status.load(Ordering::Relaxed) != 2 {
124 match check_status!(status)
125 .and_then(move |_| value)
126 .and_then(|v| unsafe { ToNapiValue::to_napi_value(env, v) })
127 {
128 Ok(v) => {
129 let status = unsafe { sys::napi_resolve_deferred(env, deferred, v) };
130 debug_assert!(
131 status == sys::Status::napi_ok,
132 "Resolve promise failed, status: {:?}",
133 crate::Status::from(status)
134 );
135 }
136 Err(e) => {
137 let status =
138 unsafe { sys::napi_reject_deferred(env, deferred, JsError::from(e).into_value(env)) };
139 debug_assert!(
140 status == sys::Status::napi_ok,
141 "Reject promise failed, status: {:?}",
142 crate::Status::from(status)
143 );
144 }
145 };
146 }
147 if let Err(e) = work.inner_task.finally(unsafe { Env::from_raw(env) }) {
148 debug_assert!(false, "Panic in Task finally fn: {:?}", e);
149 }
150 let delete_status = unsafe { sys::napi_delete_async_work(env, napi_async_work) };
151 debug_assert!(
152 delete_status == sys::Status::napi_ok,
153 "Delete async work failed, status {:?}",
154 crate::Status::from(delete_status)
155 );
156 work.status.store(1, Ordering::Relaxed);
157}
158