1// Copyright 2023 The AccessKit Authors. All rights reserved.
2// Licensed under the Apache License, Version 2.0 (found in
3// the LICENSE-APACHE file) or the MIT license (found in
4// the LICENSE-MIT file), at your option.
5
6use accesskit_atspi_common::{AppContext, Event};
7#[cfg(not(feature = "tokio"))]
8use async_channel::{Receiver, Sender};
9use atspi::proxy::bus::StatusProxy;
10#[cfg(not(feature = "tokio"))]
11use futures_util::{pin_mut as pin, select, StreamExt};
12use once_cell::sync::{Lazy, OnceCell};
13use std::{
14 sync::{Arc, RwLock},
15 thread,
16};
17#[cfg(feature = "tokio")]
18use tokio::{
19 pin, select,
20 sync::mpsc::{UnboundedReceiver as Receiver, UnboundedSender as Sender},
21};
22#[cfg(feature = "tokio")]
23use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt};
24use zbus::{Connection, ConnectionBuilder};
25
26use crate::{
27 adapter::{LazyAdapter, Message},
28 atspi::{map_or_ignoring_broken_pipe, Bus},
29 executor::Executor,
30 util::block_on,
31};
32
33static APP_CONTEXT: OnceCell<Arc<RwLock<AppContext>>> = OnceCell::new();
34static MESSAGES: OnceCell<Sender<Message>> = OnceCell::new();
35
36pub(crate) fn get_or_init_app_context<'a>() -> &'a Arc<RwLock<AppContext>> {
37 APP_CONTEXT.get_or_init(AppContext::new)
38}
39
40pub(crate) fn get_or_init_messages() -> Sender<Message> {
41 MESSAGES&Sender
42 .get_or_init(|| {
43 #[cfg(not(feature = "tokio"))]
44 let (tx: Sender, rx: Receiver) = async_channel::unbounded();
45 #[cfg(feature = "tokio")]
46 let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
47
48 thread::spawn(|| {
49 let executor: Executor<'_> = Executor::new();
50 block_on(future:executor.run(future:async {
51 if let Ok(session_bus: ConnectionBuilder<'_>) = ConnectionBuilder::session() {
52 if let Ok(session_bus: Connection) = session_bus.internal_executor(enabled:false).build().await
53 {
54 run_event_loop(&executor, session_bus, rx).await.unwrap();
55 }
56 }
57 }))
58 });
59
60 tx
61 })
62 .clone()
63}
64
65async fn run_event_loop(
66 executor: &Executor<'_>,
67 session_bus: Connection,
68 rx: Receiver<Message>,
69) -> zbus::Result<()> {
70 let session_bus_copy = session_bus.clone();
71 let _session_bus_task = executor.spawn(
72 async move {
73 loop {
74 session_bus_copy.executor().tick().await;
75 }
76 },
77 "accesskit_session_bus_task",
78 );
79
80 let status = StatusProxy::new(&session_bus).await?;
81 let changes = status.receive_is_enabled_changed().await.fuse();
82 pin!(changes);
83
84 #[cfg(not(feature = "tokio"))]
85 let messages = rx.fuse();
86 #[cfg(feature = "tokio")]
87 let messages = UnboundedReceiverStream::new(rx).fuse();
88 pin!(messages);
89
90 let mut atspi_bus = None;
91 let mut adapters: Vec<(usize, LazyAdapter)> = Vec::new();
92
93 loop {
94 select! {
95 change = changes.next() => {
96 atspi_bus = None;
97 if let Some(change) = change {
98 if change.get().await? {
99 atspi_bus = map_or_ignoring_broken_pipe(Bus::new(&session_bus, executor).await, None, Some)?;
100 }
101 }
102 if atspi_bus.is_some() {
103 for (_, adapter) in &adapters {
104 Lazy::force(adapter);
105 }
106 }
107 }
108 message = messages.next() => {
109 if let Some(message) = message {
110 process_adapter_message(&atspi_bus, &mut adapters, message).await?;
111 }
112 }
113 }
114 }
115}
116
117async fn process_adapter_message(
118 atspi_bus: &Option<Bus>,
119 adapters: &mut Vec<(usize, LazyAdapter)>,
120 message: Message,
121) -> zbus::Result<()> {
122 match message {
123 Message::AddAdapter { id, adapter } => {
124 adapters.push((id, adapter));
125 if atspi_bus.is_some() {
126 let adapter = &adapters.last_mut().unwrap().1;
127 Lazy::force(adapter);
128 }
129 }
130 Message::RemoveAdapter { id } => {
131 if let Ok(index) = adapters.binary_search_by(|adapter| adapter.0.cmp(&id)) {
132 adapters.remove(index);
133 }
134 }
135 Message::RegisterInterfaces { node, interfaces } => {
136 if let Some(bus) = atspi_bus {
137 bus.register_interfaces(node, interfaces).await?
138 }
139 }
140 Message::UnregisterInterfaces {
141 adapter_id,
142 node_id,
143 interfaces,
144 } => {
145 if let Some(bus) = atspi_bus {
146 bus.unregister_interfaces(adapter_id, node_id, interfaces)
147 .await?
148 }
149 }
150 Message::EmitEvent {
151 adapter_id,
152 event: Event::Object { target, event },
153 } => {
154 if let Some(bus) = atspi_bus {
155 bus.emit_object_event(adapter_id, target, event).await?
156 }
157 }
158 Message::EmitEvent {
159 adapter_id,
160 event:
161 Event::Window {
162 target,
163 name,
164 event,
165 },
166 } => {
167 if let Some(bus) = atspi_bus {
168 bus.emit_window_event(adapter_id, target, name, event)
169 .await?;
170 }
171 }
172 }
173
174 Ok(())
175}
176