neon/event/
channel.rs

1use std::{
2    error, fmt,
3    sync::{
4        atomic::{AtomicUsize, Ordering},
5        Arc,
6    },
7};
8
9use crate::{
10    context::{internal::Env, Context, Cx},
11    result::{NeonResult, ResultExt, Throw},
12    sys::{self, tsfn::ThreadsafeFunction},
13};
14
15#[cfg(feature = "futures")]
16use {
17    std::future::Future,
18    std::pin::Pin,
19    std::task::{self, Poll},
20    tokio::sync::oneshot,
21};
22
23#[cfg(not(feature = "futures"))]
24// Synchronous oneshot channel API compatible with `tokio::sync::oneshot`
25mod oneshot {
26    use std::sync::mpsc;
27
28    pub(super) mod error {
29        pub use super::mpsc::RecvError;
30    }
31
32    pub(super) struct Receiver<T>(mpsc::Receiver<T>);
33
34    impl<T> Receiver<T> {
35        pub(super) fn blocking_recv(self) -> Result<T, mpsc::RecvError> {
36            self.0.recv()
37        }
38    }
39
40    pub(super) fn channel<T>() -> (mpsc::SyncSender<T>, Receiver<T>) {
41        let (tx, rx) = mpsc::sync_channel(1);
42
43        (tx, Receiver(rx))
44    }
45}
46
47type Callback = Box<dyn FnOnce(sys::Env) + Send + 'static>;
48
49/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
50///
51/// Cloning a `Channel` will create a new channel that shares a backing queue for
52/// events.
53///
54/// # Example
55///
56/// The following example spawns a standard Rust thread to complete a computation
57/// and calls back to a JavaScript function asynchronously with the result.
58///
59/// ```
60/// # use neon::prelude::*;
61/// # fn fibonacci(_: f64) -> f64 { todo!() }
62/// fn async_fibonacci(mut cx: FunctionContext) -> JsResult<JsUndefined> {
63///     // These types (`f64`, `Root<JsFunction>`, `Channel`) may all be sent
64///     // across threads.
65///     let n = cx.argument::<JsNumber>(0)?.value(&mut cx);
66///     let callback = cx.argument::<JsFunction>(1)?.root(&mut cx);
67///     let channel = cx.channel();
68///
69///     // Spawn a thread to complete the execution. This will _not_ block the
70///     // JavaScript event loop.
71///     std::thread::spawn(move || {
72///         let result = fibonacci(n);
73///
74///         // Send a closure as a task to be executed by the JavaScript event
75///         // loop. This _will_ block the event loop while executing.
76///         channel.send(move |mut cx| {
77///             let callback = callback.into_inner(&mut cx);
78///
79///             callback
80///                 .bind(&mut cx)
81///                 .args(((), result))?
82///                 .exec()?;
83///
84///             Ok(())
85///         });
86///     });
87///
88///     Ok(cx.undefined())
89/// }
90/// ```
91#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))]
92pub struct Channel {
93    state: Arc<ChannelState>,
94    has_ref: bool,
95}
96
97impl fmt::Debug for Channel {
98    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99        f.write_str("Channel")
100    }
101}
102
103impl Channel {
104    /// Creates an unbounded channel for scheduling closures on the JavaScript
105    /// main thread
106    pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
107        Self {
108            state: Arc::new(ChannelState::new(cx)),
109            has_ref: true,
110        }
111    }
112
113    /// Allow the Node event loop to exit while this `Channel` exists.
114    /// _Idempotent_
115    pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
116        // Already unreferenced
117        if !self.has_ref {
118            return self;
119        }
120
121        self.has_ref = false;
122        self.state.unref(cx);
123        self
124    }
125
126    /// Prevent the Node event loop from exiting while this `Channel` exists. (Default)
127    /// _Idempotent_
128    pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
129        // Already referenced
130        if self.has_ref {
131            return self;
132        }
133
134        self.has_ref = true;
135        self.state.reference(cx);
136        self
137    }
138
139    /// Schedules a closure to execute on the JavaScript thread that created this Channel
140    /// Panics if there is a libuv error
141    pub fn send<T, F>(&self, f: F) -> JoinHandle<T>
142    where
143        T: Send + 'static,
144        F: FnOnce(Cx) -> NeonResult<T> + Send + 'static,
145    {
146        self.try_send(f).unwrap()
147    }
148
149    /// Schedules a closure to execute on the JavaScript thread that created this Channel
150    /// Returns an `Error` if the task could not be scheduled.
151    ///
152    /// See [`SendError`] for additional details on failure causes.
153    pub fn try_send<T, F>(&self, f: F) -> Result<JoinHandle<T>, SendError>
154    where
155        T: Send + 'static,
156        F: FnOnce(Cx) -> NeonResult<T> + Send + 'static,
157    {
158        let (tx, rx) = oneshot::channel();
159        let callback = Box::new(move |env| {
160            let env = Env::from(env);
161
162            // Note: It is sufficient to use `Cx` because
163            // N-API creates a `HandleScope` before calling the callback.
164            Cx::with_context(env, move |cx| {
165                // Error can be ignored; it only means the user didn't join
166                let _ = tx.send(f(cx).map_err(Into::into));
167            });
168        });
169
170        self.state
171            .tsfn
172            .call(callback, None)
173            .map_err(|_| SendError)?;
174
175        Ok(JoinHandle { rx })
176    }
177
178    /// Returns a boolean indicating if this `Channel` will prevent the Node event
179    /// loop from exiting.
180    pub fn has_ref(&self) -> bool {
181        self.has_ref
182    }
183}
184
185impl Clone for Channel {
186    /// Returns a clone of the Channel instance that shares the internal
187    /// unbounded queue with the original channel. Scheduling callbacks on the
188    /// same queue is faster than using separate channels, but might lead to
189    /// starvation if one of the threads posts significantly more callbacks on
190    /// the channel than the other one.
191    ///
192    /// Cloned and referenced Channel instances might trigger additional
193    /// event-loop tick when dropped. Channel can be wrapped into an Arc and
194    /// shared between different threads/callers to avoid this.
195    fn clone(&self) -> Self {
196        // Not referenced, we can simply clone the fields
197        if !self.has_ref {
198            return Self {
199                state: self.state.clone(),
200                has_ref: false,
201            };
202        }
203
204        let state = Arc::clone(&self.state);
205
206        // Only need to increase the ref count since the tsfn is already referenced
207        state.ref_count.fetch_add(1, Ordering::Relaxed);
208
209        Self {
210            state,
211            has_ref: true,
212        }
213    }
214}
215
216impl Drop for Channel {
217    fn drop(&mut self) {
218        // Not a referenced event queue
219        if !self.has_ref {
220            return;
221        }
222
223        // It was only us who kept the `ChannelState` alive. No need to unref
224        // the `tsfn`, because it is going to be dropped once this function
225        // returns.
226        if Arc::strong_count(&self.state) == 1 {
227            return;
228        }
229
230        // The ChannelState is dropped on a worker thread. We have to `unref`
231        // the tsfn on the UV thread after all pending closures. Note that in
232        // the most of scenarios the optimization in N-API layer would coalesce
233        // `send()` with a user-supplied closure and the unref send here into a
234        // single UV tick.
235        //
236        // If this ever has to be optimized a second `Arc` could be used to wrap
237        // the `state` and it could be cloned in `try_send` and unref'ed on the
238        // UV thread if strong reference count goes to 0.
239        let state = Arc::clone(&self.state);
240
241        // `Channel::try_send` will only fail if the environment has shutdown.
242        // In that case, the teardown will perform clean-up.
243        let _ = self.try_send(move |mut cx| {
244            state.unref(&mut cx);
245            Ok(())
246        });
247    }
248}
249
250/// An owned permission to join on the result of a closure sent to the JavaScript main
251/// thread with [`Channel::send`].
252pub struct JoinHandle<T> {
253    // `Err` is always `Throw`, but `Throw` cannot be sent across threads
254    rx: oneshot::Receiver<Result<T, SendThrow>>,
255}
256
257impl<T> JoinHandle<T> {
258    /// Waits for the associated closure to finish executing
259    ///
260    /// If the closure panics or throws an exception, `Err` is returned
261    ///
262    /// # Panics
263    ///
264    /// This function panics if called within an asynchronous execution context.
265    pub fn join(self) -> Result<T, JoinError> {
266        Ok(self.rx.blocking_recv()??)
267    }
268}
269
270#[cfg(feature = "futures")]
271#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
272impl<T> Future for JoinHandle<T> {
273    type Output = Result<T, JoinError>;
274
275    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
276        match Pin::new(&mut self.rx).poll(cx) {
277            Poll::Ready(result) => {
278                // Flatten `Result<Result<T, SendThrow>, RecvError>` by mapping to
279                // `Result<T, JoinError>`. This can be simplified by replacing the
280                // closure with a try-block after stabilization.
281                // https://doc.rust-lang.org/beta/unstable-book/language-features/try-blocks.html
282                let get_result = move || Ok(result??);
283
284                Poll::Ready(get_result())
285            }
286            Poll::Pending => Poll::Pending,
287        }
288    }
289}
290
291#[derive(Debug)]
292/// Error returned by [`JoinHandle::join`] indicating the associated closure panicked
293/// or threw an exception.
294pub struct JoinError(JoinErrorType);
295
296#[derive(Debug)]
297enum JoinErrorType {
298    Panic,
299    Throw,
300}
301
302impl JoinError {
303    fn as_str(&self) -> &str {
304        match &self.0 {
305            JoinErrorType::Panic => "Closure panicked before returning",
306            JoinErrorType::Throw => "Closure threw an exception",
307        }
308    }
309}
310
311impl fmt::Display for JoinError {
312    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
313        f.write_str(self.as_str())
314    }
315}
316
317impl error::Error for JoinError {}
318
319impl From<oneshot::error::RecvError> for JoinError {
320    fn from(_: oneshot::error::RecvError) -> Self {
321        JoinError(JoinErrorType::Panic)
322    }
323}
324
325// Marker that a `Throw` occurred that can be sent across threads for use in `JoinError`
326pub(crate) struct SendThrow(());
327
328impl From<SendThrow> for JoinError {
329    fn from(_: SendThrow) -> Self {
330        JoinError(JoinErrorType::Throw)
331    }
332}
333
334impl From<Throw> for SendThrow {
335    fn from(_: Throw) -> SendThrow {
336        SendThrow(())
337    }
338}
339
340impl<T> ResultExt<T> for Result<T, JoinError> {
341    fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T> {
342        self.or_else(|err| cx.throw_error(err.as_str()))
343    }
344}
345
346/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
347///
348/// The most likely cause of a failure is that Node is shutting down. This may occur if the
349/// process is forcefully exiting even if the channel is referenced. For example, by calling
350/// `process.exit()`.
351//
352// NOTE: These docs will need to be updated to include `QueueFull` if bounded queues are
353// implemented.
354#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))]
355pub struct SendError;
356
357impl fmt::Display for SendError {
358    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
359        write!(f, "SendError")
360    }
361}
362
363impl fmt::Debug for SendError {
364    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
365        fmt::Display::fmt(self, f)
366    }
367}
368
369impl error::Error for SendError {}
370
371struct ChannelState {
372    tsfn: ThreadsafeFunction<Callback>,
373    ref_count: AtomicUsize,
374}
375
376impl ChannelState {
377    fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
378        let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
379        Self {
380            tsfn,
381            ref_count: AtomicUsize::new(1),
382        }
383    }
384
385    fn reference<'a, C: Context<'a>>(&self, cx: &mut C) {
386        // We can use relaxed ordering because `reference()` can only be called
387        // on the Event-Loop thread.
388        if self.ref_count.fetch_add(1, Ordering::Relaxed) != 0 {
389            return;
390        }
391
392        unsafe {
393            self.tsfn.reference(cx.env().to_raw());
394        }
395    }
396
397    fn unref<'a, C: Context<'a>>(&self, cx: &mut C) {
398        // We can use relaxed ordering because `unref()` can only be called
399        // on the Event-Loop thread.
400        if self.ref_count.fetch_sub(1, Ordering::Relaxed) != 1 {
401            return;
402        }
403
404        unsafe {
405            self.tsfn.unref(cx.env().to_raw());
406        }
407    }
408
409    // Monomorphized trampoline funciton for calling the user provided closure
410    fn callback(env: Option<sys::Env>, callback: Callback) {
411        if let Some(env) = env {
412            callback(env);
413        } else {
414            crate::context::internal::IS_RUNNING.with(|v| {
415                *v.borrow_mut() = false;
416            });
417        }
418    }
419}