1 | // Copyright 2023 The AccessKit Authors. All rights reserved. |
2 | // Licensed under the Apache License, Version 2.0 (found in |
3 | // the LICENSE-APACHE file) or the MIT license (found in |
4 | // the LICENSE-MIT file), at your option. |
5 | |
6 | use accesskit_atspi_common::{AppContext, Event}; |
7 | #[cfg (not(feature = "tokio" ))] |
8 | use async_channel::{Receiver, Sender}; |
9 | use atspi::proxy::bus::StatusProxy; |
10 | #[cfg (not(feature = "tokio" ))] |
11 | use futures_util::{pin_mut as pin, select, StreamExt}; |
12 | use once_cell::sync::{Lazy, OnceCell}; |
13 | use std::{ |
14 | sync::{Arc, RwLock}, |
15 | thread, |
16 | }; |
17 | #[cfg (feature = "tokio" )] |
18 | use tokio::{ |
19 | pin, select, |
20 | sync::mpsc::{UnboundedReceiver as Receiver, UnboundedSender as Sender}, |
21 | }; |
22 | #[cfg (feature = "tokio" )] |
23 | use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; |
24 | use zbus::{Connection, ConnectionBuilder}; |
25 | |
26 | use crate::{ |
27 | adapter::{LazyAdapter, Message}, |
28 | atspi::{map_or_ignoring_broken_pipe, Bus}, |
29 | executor::Executor, |
30 | util::block_on, |
31 | }; |
32 | |
33 | static APP_CONTEXT: OnceCell<Arc<RwLock<AppContext>>> = OnceCell::new(); |
34 | static MESSAGES: OnceCell<Sender<Message>> = OnceCell::new(); |
35 | |
36 | pub(crate) fn get_or_init_app_context<'a>() -> &'a Arc<RwLock<AppContext>> { |
37 | APP_CONTEXT.get_or_init(AppContext::new) |
38 | } |
39 | |
40 | pub(crate) fn get_or_init_messages() -> Sender<Message> { |
41 | MESSAGES&Sender |
42 | .get_or_init(|| { |
43 | #[cfg (not(feature = "tokio" ))] |
44 | let (tx: Sender, rx: Receiver) = async_channel::unbounded(); |
45 | #[cfg (feature = "tokio" )] |
46 | let (tx, rx) = tokio::sync::mpsc::unbounded_channel(); |
47 | |
48 | thread::spawn(|| { |
49 | let executor: Executor<'_> = Executor::new(); |
50 | block_on(future:executor.run(future:async { |
51 | if let Ok(session_bus: ConnectionBuilder<'_>) = ConnectionBuilder::session() { |
52 | if let Ok(session_bus: Connection) = session_bus.internal_executor(enabled:false).build().await |
53 | { |
54 | run_event_loop(&executor, session_bus, rx).await.unwrap(); |
55 | } |
56 | } |
57 | })) |
58 | }); |
59 | |
60 | tx |
61 | }) |
62 | .clone() |
63 | } |
64 | |
65 | async fn run_event_loop( |
66 | executor: &Executor<'_>, |
67 | session_bus: Connection, |
68 | rx: Receiver<Message>, |
69 | ) -> zbus::Result<()> { |
70 | let session_bus_copy = session_bus.clone(); |
71 | let _session_bus_task = executor.spawn( |
72 | async move { |
73 | loop { |
74 | session_bus_copy.executor().tick().await; |
75 | } |
76 | }, |
77 | "accesskit_session_bus_task" , |
78 | ); |
79 | |
80 | let status = StatusProxy::new(&session_bus).await?; |
81 | let changes = status.receive_is_enabled_changed().await.fuse(); |
82 | pin!(changes); |
83 | |
84 | #[cfg (not(feature = "tokio" ))] |
85 | let messages = rx.fuse(); |
86 | #[cfg (feature = "tokio" )] |
87 | let messages = UnboundedReceiverStream::new(rx).fuse(); |
88 | pin!(messages); |
89 | |
90 | let mut atspi_bus = None; |
91 | let mut adapters: Vec<(usize, LazyAdapter)> = Vec::new(); |
92 | |
93 | loop { |
94 | select! { |
95 | change = changes.next() => { |
96 | atspi_bus = None; |
97 | if let Some(change) = change { |
98 | if change.get().await? { |
99 | atspi_bus = map_or_ignoring_broken_pipe(Bus::new(&session_bus, executor).await, None, Some)?; |
100 | } |
101 | } |
102 | if atspi_bus.is_some() { |
103 | for (_, adapter) in &adapters { |
104 | Lazy::force(adapter); |
105 | } |
106 | } |
107 | } |
108 | message = messages.next() => { |
109 | if let Some(message) = message { |
110 | process_adapter_message(&atspi_bus, &mut adapters, message).await?; |
111 | } |
112 | } |
113 | } |
114 | } |
115 | } |
116 | |
117 | async fn process_adapter_message( |
118 | atspi_bus: &Option<Bus>, |
119 | adapters: &mut Vec<(usize, LazyAdapter)>, |
120 | message: Message, |
121 | ) -> zbus::Result<()> { |
122 | match message { |
123 | Message::AddAdapter { id, adapter } => { |
124 | adapters.push((id, adapter)); |
125 | if atspi_bus.is_some() { |
126 | let adapter = &adapters.last_mut().unwrap().1; |
127 | Lazy::force(adapter); |
128 | } |
129 | } |
130 | Message::RemoveAdapter { id } => { |
131 | if let Ok(index) = adapters.binary_search_by(|adapter| adapter.0.cmp(&id)) { |
132 | adapters.remove(index); |
133 | } |
134 | } |
135 | Message::RegisterInterfaces { node, interfaces } => { |
136 | if let Some(bus) = atspi_bus { |
137 | bus.register_interfaces(node, interfaces).await? |
138 | } |
139 | } |
140 | Message::UnregisterInterfaces { |
141 | adapter_id, |
142 | node_id, |
143 | interfaces, |
144 | } => { |
145 | if let Some(bus) = atspi_bus { |
146 | bus.unregister_interfaces(adapter_id, node_id, interfaces) |
147 | .await? |
148 | } |
149 | } |
150 | Message::EmitEvent { |
151 | adapter_id, |
152 | event: Event::Object { target, event }, |
153 | } => { |
154 | if let Some(bus) = atspi_bus { |
155 | bus.emit_object_event(adapter_id, target, event).await? |
156 | } |
157 | } |
158 | Message::EmitEvent { |
159 | adapter_id, |
160 | event: |
161 | Event::Window { |
162 | target, |
163 | name, |
164 | event, |
165 | }, |
166 | } => { |
167 | if let Some(bus) = atspi_bus { |
168 | bus.emit_window_event(adapter_id, target, name, event) |
169 | .await?; |
170 | } |
171 | } |
172 | } |
173 | |
174 | Ok(()) |
175 | } |
176 | |