1use std::{
2 ffi::CString,
3 os::unix::io::{AsFd, BorrowedFd, OwnedFd},
4 os::unix::{io::RawFd, net::UnixStream},
5 sync::Arc,
6};
7
8use crate::{
9 core_interfaces::{WL_CALLBACK_INTERFACE, WL_DISPLAY_INTERFACE, WL_REGISTRY_INTERFACE},
10 debug,
11 protocol::{
12 check_for_signature, same_interface, same_interface_or_anonymous, AllowNull, Argument,
13 ArgumentType, Interface, Message, ObjectInfo, ProtocolError, ANONYMOUS_INTERFACE,
14 INLINE_ARGS,
15 },
16 rs::map::SERVER_ID_LIMIT,
17 types::server::{DisconnectReason, InvalidId},
18};
19
20use smallvec::SmallVec;
21
22use crate::rs::{
23 map::{Object, ObjectMap},
24 socket::{BufferedSocket, Socket},
25 wire::MessageParseError,
26};
27
28use super::{
29 handle::PendingDestructor, registry::Registry, ClientData, ClientId, Credentials, Data,
30 DumbObjectData, GlobalHandler, InnerClientId, InnerGlobalId, InnerObjectId, ObjectData,
31 ObjectId, UninitObjectData,
32};
33
34type ArgSmallVec<Fd> = SmallVec<[Argument<ObjectId, Fd>; INLINE_ARGS]>;
35
36#[repr(u32)]
37#[allow(dead_code)]
38pub(crate) enum DisplayError {
39 InvalidObject = 0,
40 InvalidMethod = 1,
41 NoMemory = 2,
42 Implementation = 3,
43}
44
45#[derive(Debug)]
46pub(crate) struct Client<D: 'static> {
47 socket: BufferedSocket,
48 pub(crate) map: ObjectMap<Data<D>>,
49 debug: bool,
50 last_serial: u32,
51 pub(crate) id: InnerClientId,
52 pub(crate) killed: bool,
53 pub(crate) data: Arc<dyn ClientData>,
54}
55
56impl<D> Client<D> {
57 fn next_serial(&mut self) -> u32 {
58 self.last_serial = self.last_serial.wrapping_add(1);
59 self.last_serial
60 }
61}
62
63impl<D> Client<D> {
64 pub(crate) fn new(
65 stream: UnixStream,
66 id: InnerClientId,
67 debug: bool,
68 data: Arc<dyn ClientData>,
69 ) -> Self {
70 let socket = BufferedSocket::new(Socket::from(stream));
71 let mut map = ObjectMap::new();
72 map.insert_at(
73 1,
74 Object {
75 interface: &WL_DISPLAY_INTERFACE,
76 version: 1,
77 data: Data { user_data: Arc::new(DumbObjectData), serial: 0 },
78 },
79 )
80 .unwrap();
81
82 data.initialized(ClientId { id: id.clone() });
83
84 Self { socket, map, debug, id, killed: false, last_serial: 0, data }
85 }
86
87 pub(crate) fn create_object(
88 &mut self,
89 interface: &'static Interface,
90 version: u32,
91 user_data: Arc<dyn ObjectData<D>>,
92 ) -> InnerObjectId {
93 let serial = self.next_serial();
94 let id = self.map.server_insert_new(Object {
95 interface,
96 version,
97 data: Data { serial, user_data },
98 });
99 InnerObjectId { id, serial, client_id: self.id.clone(), interface }
100 }
101
102 pub(crate) fn object_info(&self, id: InnerObjectId) -> Result<ObjectInfo, InvalidId> {
103 let object = self.get_object(id.clone())?;
104 Ok(ObjectInfo { id: id.id, interface: object.interface, version: object.version })
105 }
106
107 pub(crate) fn send_event(
108 &mut self,
109 Message { sender_id: object_id, opcode, args }: Message<ObjectId, RawFd>,
110 pending_destructors: Option<&mut Vec<super::handle::PendingDestructor<D>>>,
111 ) -> Result<(), InvalidId> {
112 if self.killed {
113 return Ok(());
114 }
115 let object = self.get_object(object_id.id.clone())?;
116
117 let message_desc = match object.interface.events.get(opcode as usize) {
118 Some(msg) => msg,
119 None => {
120 panic!(
121 "Unknown opcode {} for object {}@{}.",
122 opcode, object.interface.name, object_id.id
123 );
124 }
125 };
126
127 if !check_for_signature(message_desc.signature, &args) {
128 panic!(
129 "Unexpected signature for event {}@{}.{}: expected {:?}, got {:?}.",
130 object.interface.name,
131 object_id.id,
132 message_desc.name,
133 message_desc.signature,
134 args
135 );
136 }
137
138 if self.debug {
139 debug::print_send_message(
140 object.interface.name,
141 object_id.id.id,
142 message_desc.name,
143 &args,
144 false,
145 );
146 }
147
148 let mut msg_args = SmallVec::with_capacity(args.len());
149 let mut arg_interfaces = message_desc.arg_interfaces.iter();
150 for (i, arg) in args.into_iter().enumerate() {
151 msg_args.push(match arg {
152 Argument::Array(a) => Argument::Array(a),
153 Argument::Int(i) => Argument::Int(i),
154 Argument::Uint(u) => Argument::Uint(u),
155 Argument::Str(s) => Argument::Str(s),
156 Argument::Fixed(f) => Argument::Fixed(f),
157 Argument::Fd(f) => Argument::Fd(f),
158 Argument::NewId(o) => {
159 if o.id.id != 0 {
160 if o.id.client_id != self.id {
161 panic!("Attempting to send an event with objects from wrong client.")
162 }
163 let object = self.get_object(o.id.clone())?;
164 let child_interface = match message_desc.child_interface {
165 Some(iface) => iface,
166 None => panic!("Trying to send event {}@{}.{} which creates an object without specifying its interface, this is unsupported.", object_id.id.interface.name, object_id.id, message_desc.name),
167 };
168 if !same_interface(child_interface, object.interface) {
169 panic!("Event {}@{}.{} expects a newid argument of interface {} but {} was provided instead.", object.interface.name, object_id.id, message_desc.name, child_interface.name, object.interface.name);
170 }
171 } else if !matches!(message_desc.signature[i], ArgumentType::NewId) {
172 panic!("Request {}@{}.{} expects an non-null newid argument.", object.interface.name, object_id.id, message_desc.name);
173 }
174 Argument::Object(o.id.id)
175 },
176 Argument::Object(o) => {
177 let next_interface = arg_interfaces.next().unwrap();
178 if o.id.id != 0 {
179 if o.id.client_id != self.id {
180 panic!("Attempting to send an event with objects from wrong client.")
181 }
182 let arg_object = self.get_object(o.id.clone())?;
183 if !same_interface_or_anonymous(next_interface, arg_object.interface) {
184 panic!("Event {}@{}.{} expects an object argument of interface {} but {} was provided instead.", object.interface.name, object_id.id, message_desc.name, next_interface.name, arg_object.interface.name);
185 }
186 } else if !matches!(message_desc.signature[i], ArgumentType::Object(AllowNull::Yes)) {
187 panic!("Request {}@{}.{} expects an non-null object argument.", object.interface.name, object_id.id, message_desc.name);
188 }
189 Argument::Object(o.id.id)
190 }
191 });
192 }
193
194 let msg = Message { sender_id: object_id.id.id, opcode, args: msg_args };
195
196 if self.socket.write_message(&msg).is_err() {
197 self.kill(DisconnectReason::ConnectionClosed);
198 }
199
200 // Handle destruction if relevant
201 if message_desc.is_destructor {
202 self.map.remove(object_id.id.id);
203 if let Some(vec) = pending_destructors {
204 vec.push((object.data.user_data.clone(), self.id.clone(), object_id.id.clone()));
205 }
206 self.send_delete_id(object_id.id);
207 }
208
209 Ok(())
210 }
211
212 pub(crate) fn send_delete_id(&mut self, object_id: InnerObjectId) {
213 // We should only send delete_id for objects in the client ID space
214 if object_id.id < SERVER_ID_LIMIT {
215 let msg = message!(1, 1, [Argument::Uint(object_id.id)]);
216 if self.socket.write_message(&msg).is_err() {
217 self.kill(DisconnectReason::ConnectionClosed);
218 }
219 }
220 self.map.remove(object_id.id);
221 }
222
223 pub(crate) fn get_object_data(
224 &self,
225 id: InnerObjectId,
226 ) -> Result<Arc<dyn ObjectData<D>>, InvalidId> {
227 let object = self.get_object(id)?;
228 Ok(object.data.user_data)
229 }
230
231 pub(crate) fn set_object_data(
232 &mut self,
233 id: InnerObjectId,
234 data: Arc<dyn ObjectData<D>>,
235 ) -> Result<(), InvalidId> {
236 self.map
237 .with(id.id, |objdata| {
238 if objdata.data.serial != id.serial {
239 Err(InvalidId)
240 } else {
241 objdata.data.user_data = data;
242 Ok(())
243 }
244 })
245 .unwrap_or(Err(InvalidId))
246 }
247
248 pub(crate) fn post_display_error(&mut self, code: DisplayError, message: CString) {
249 self.post_error(
250 InnerObjectId {
251 id: 1,
252 interface: &WL_DISPLAY_INTERFACE,
253 client_id: self.id.clone(),
254 serial: 0,
255 },
256 code as u32,
257 message,
258 )
259 }
260
261 pub(crate) fn post_error(
262 &mut self,
263 object_id: InnerObjectId,
264 error_code: u32,
265 message: CString,
266 ) {
267 let converted_message = message.to_string_lossy().into();
268 // errors are ignored, as the client will be killed anyway
269 let _ = self.send_event(
270 message!(
271 ObjectId {
272 id: InnerObjectId {
273 id: 1,
274 interface: &WL_DISPLAY_INTERFACE,
275 client_id: self.id.clone(),
276 serial: 0
277 }
278 },
279 0, // wl_display.error
280 [
281 Argument::Object(ObjectId { id: object_id.clone() }),
282 Argument::Uint(error_code),
283 Argument::Str(Some(Box::new(message))),
284 ],
285 ),
286 // wl_display.error is not a destructor, this argument will not be used
287 None,
288 );
289 let _ = self.flush();
290 self.kill(DisconnectReason::ProtocolError(ProtocolError {
291 code: error_code,
292 object_id: object_id.id,
293 object_interface: object_id.interface.name.into(),
294 message: converted_message,
295 }));
296 }
297
298 #[cfg(any(target_os = "linux", target_os = "android"))]
299 pub(crate) fn get_credentials(&self) -> Credentials {
300 let creds =
301 rustix::net::sockopt::get_socket_peercred(&self.socket).expect("getsockopt failed!?");
302 let pid = rustix::process::Pid::as_raw(Some(creds.pid));
303 Credentials { pid, uid: creds.uid.as_raw(), gid: creds.gid.as_raw() }
304 }
305
306 #[cfg(not(any(target_os = "linux", target_os = "android")))]
307 // for now this only works on linux
308 pub(crate) fn get_credentials(&self) -> Credentials {
309 Credentials { pid: 0, uid: 0, gid: 0 }
310 }
311
312 pub(crate) fn kill(&mut self, reason: DisconnectReason) {
313 self.killed = true;
314 self.data.disconnected(ClientId { id: self.id.clone() }, reason);
315 }
316
317 pub(crate) fn flush(&mut self) -> std::io::Result<()> {
318 self.socket.flush()
319 }
320
321 pub(crate) fn all_objects(&self) -> impl Iterator<Item = ObjectId> + '_ {
322 let client_id = self.id.clone();
323 self.map.all_objects().map(move |(id, obj)| ObjectId {
324 id: InnerObjectId {
325 id,
326 client_id: client_id.clone(),
327 interface: obj.interface,
328 serial: obj.data.serial,
329 },
330 })
331 }
332
333 #[allow(clippy::type_complexity)]
334 pub(crate) fn next_request(
335 &mut self,
336 ) -> std::io::Result<(Message<u32, OwnedFd>, Object<Data<D>>)> {
337 if self.killed {
338 return Err(rustix::io::Errno::PIPE.into());
339 }
340 loop {
341 let map = &self.map;
342 let msg = match self.socket.read_one_message(|id, opcode| {
343 map.find(id)
344 .and_then(|o| o.interface.requests.get(opcode as usize))
345 .map(|desc| desc.signature)
346 }) {
347 Ok(msg) => msg,
348 Err(MessageParseError::MissingData) | Err(MessageParseError::MissingFD) => {
349 // need to read more data
350 if let Err(e) = self.socket.fill_incoming_buffers() {
351 if e.kind() != std::io::ErrorKind::WouldBlock {
352 self.kill(DisconnectReason::ConnectionClosed);
353 }
354 return Err(e);
355 }
356 continue;
357 }
358 Err(MessageParseError::Malformed) => {
359 self.kill(DisconnectReason::ConnectionClosed);
360 return Err(rustix::io::Errno::PROTO.into());
361 }
362 };
363
364 let obj = self.map.find(msg.sender_id).unwrap();
365
366 if self.debug {
367 debug::print_dispatched_message(
368 obj.interface.name,
369 msg.sender_id,
370 obj.interface.requests.get(msg.opcode as usize).unwrap().name,
371 &msg.args,
372 );
373 }
374
375 return Ok((msg, obj));
376 }
377 }
378
379 fn get_object(&self, id: InnerObjectId) -> Result<Object<Data<D>>, InvalidId> {
380 let object = self.map.find(id.id).ok_or(InvalidId)?;
381 if object.data.serial != id.serial {
382 return Err(InvalidId);
383 }
384 Ok(object)
385 }
386
387 pub(crate) fn object_for_protocol_id(&self, pid: u32) -> Result<InnerObjectId, InvalidId> {
388 let object = self.map.find(pid).ok_or(InvalidId)?;
389 Ok(InnerObjectId {
390 id: pid,
391 client_id: self.id.clone(),
392 serial: object.data.serial,
393 interface: object.interface,
394 })
395 }
396
397 fn queue_all_destructors(&mut self, pending_destructors: &mut Vec<PendingDestructor<D>>) {
398 pending_destructors.extend(self.map.all_objects().map(|(id, obj)| {
399 (
400 obj.data.user_data.clone(),
401 self.id.clone(),
402 InnerObjectId {
403 id,
404 serial: obj.data.serial,
405 client_id: self.id.clone(),
406 interface: obj.interface,
407 },
408 )
409 }));
410 }
411
412 pub(crate) fn handle_display_request(
413 &mut self,
414 message: Message<u32, OwnedFd>,
415 registry: &mut Registry<D>,
416 ) {
417 match message.opcode {
418 // wl_display.sync(new id wl_callback)
419 0 => {
420 if let [Argument::NewId(new_id)] = message.args[..] {
421 let serial = self.next_serial();
422 let callback_obj = Object {
423 interface: &WL_CALLBACK_INTERFACE,
424 version: 1,
425 data: Data { user_data: Arc::new(DumbObjectData), serial },
426 };
427 if let Err(()) = self.map.insert_at(new_id, callback_obj) {
428 self.post_display_error(
429 DisplayError::InvalidObject,
430 CString::new(format!("Invalid new_id: {}.", new_id)).unwrap(),
431 );
432 return;
433 }
434 let cb_id = ObjectId {
435 id: InnerObjectId {
436 id: new_id,
437 client_id: self.id.clone(),
438 serial,
439 interface: &WL_CALLBACK_INTERFACE,
440 },
441 };
442 // send wl_callback.done(0) this callback does not have any meaningful destructor to run, we can ignore it
443 self.send_event(message!(cb_id, 0, [Argument::Uint(0)]), None).unwrap();
444 } else {
445 unreachable!()
446 }
447 }
448 // wl_display.get_registry(new id wl_registry)
449 1 => {
450 if let [Argument::NewId(new_id)] = message.args[..] {
451 let serial = self.next_serial();
452 let registry_obj = Object {
453 interface: &WL_REGISTRY_INTERFACE,
454 version: 1,
455 data: Data { user_data: Arc::new(DumbObjectData), serial },
456 };
457 let registry_id = InnerObjectId {
458 id: new_id,
459 serial,
460 client_id: self.id.clone(),
461 interface: &WL_REGISTRY_INTERFACE,
462 };
463 if let Err(()) = self.map.insert_at(new_id, registry_obj) {
464 self.post_display_error(
465 DisplayError::InvalidObject,
466 CString::new(format!("Invalid new_id: {}.", new_id)).unwrap(),
467 );
468 return;
469 }
470 let _ = registry.new_registry(registry_id, self);
471 } else {
472 unreachable!()
473 }
474 }
475 _ => {
476 // unkown opcode, kill the client
477 self.post_display_error(
478 DisplayError::InvalidMethod,
479 CString::new(format!(
480 "Unknown opcode {} for interface wl_display.",
481 message.opcode
482 ))
483 .unwrap(),
484 );
485 }
486 }
487 }
488
489 #[allow(clippy::type_complexity)]
490 pub(crate) fn handle_registry_request(
491 &mut self,
492 message: Message<u32, OwnedFd>,
493 registry: &mut Registry<D>,
494 ) -> Option<(InnerClientId, InnerGlobalId, InnerObjectId, Arc<dyn GlobalHandler<D>>)> {
495 match message.opcode {
496 // wl_registry.bind(uint name, str interface, uint version, new id)
497 0 => {
498 if let [Argument::Uint(name), Argument::Str(Some(ref interface_name)), Argument::Uint(version), Argument::NewId(new_id)] =
499 message.args[..]
500 {
501 if let Some((interface, global_id, handler)) =
502 registry.check_bind(self, name, interface_name, version)
503 {
504 let serial = self.next_serial();
505 let object = Object {
506 interface,
507 version,
508 data: Data { serial, user_data: Arc::new(UninitObjectData) },
509 };
510 if let Err(()) = self.map.insert_at(new_id, object) {
511 self.post_display_error(
512 DisplayError::InvalidObject,
513 CString::new(format!("Invalid new_id: {}.", new_id)).unwrap(),
514 );
515 return None;
516 }
517 Some((
518 self.id.clone(),
519 global_id,
520 InnerObjectId {
521 id: new_id,
522 client_id: self.id.clone(),
523 interface,
524 serial,
525 },
526 handler.clone(),
527 ))
528 } else {
529 self.post_display_error(
530 DisplayError::InvalidObject,
531 CString::new(format!(
532 "Invalid binding of {} version {} for global {}.",
533 interface_name.to_string_lossy(),
534 version,
535 name
536 ))
537 .unwrap(),
538 );
539 None
540 }
541 } else {
542 unreachable!()
543 }
544 }
545 _ => {
546 // unkown opcode, kill the client
547 self.post_display_error(
548 DisplayError::InvalidMethod,
549 CString::new(format!(
550 "Unknown opcode {} for interface wl_registry.",
551 message.opcode
552 ))
553 .unwrap(),
554 );
555 None
556 }
557 }
558 }
559
560 pub(crate) fn process_request(
561 &mut self,
562 object: &Object<Data<D>>,
563 message: Message<u32, OwnedFd>,
564 ) -> Option<(ArgSmallVec<OwnedFd>, bool, Option<InnerObjectId>)> {
565 let message_desc = object.interface.requests.get(message.opcode as usize).unwrap();
566 // Convert the arguments and create the new object if applicable
567 let mut new_args = SmallVec::with_capacity(message.args.len());
568 let mut arg_interfaces = message_desc.arg_interfaces.iter();
569 let mut created_id = None;
570 for (i, arg) in message.args.into_iter().enumerate() {
571 new_args.push(match arg {
572 Argument::Array(a) => Argument::Array(a),
573 Argument::Int(i) => Argument::Int(i),
574 Argument::Uint(u) => Argument::Uint(u),
575 Argument::Str(s) => Argument::Str(s),
576 Argument::Fixed(f) => Argument::Fixed(f),
577 Argument::Fd(f) => Argument::Fd(f),
578 Argument::Object(o) => {
579 let next_interface = arg_interfaces.next();
580 if o != 0 {
581 // Lookup the object to make the appropriate Id
582 let obj = match self.map.find(o) {
583 Some(o) => o,
584 None => {
585 self.post_display_error(
586 DisplayError::InvalidObject,
587 CString::new(format!("Unknown id: {}.", o)).unwrap()
588 );
589 return None;
590 }
591 };
592 if let Some(next_interface) = next_interface {
593 if !same_interface_or_anonymous(next_interface, obj.interface) {
594 self.post_display_error(
595 DisplayError::InvalidObject,
596 CString::new(format!(
597 "Invalid object {} in request {}.{}: expected {} but got {}.",
598 o,
599 object.interface.name,
600 message_desc.name,
601 next_interface.name,
602 obj.interface.name,
603 )).unwrap()
604 );
605 return None;
606 }
607 }
608 Argument::Object(ObjectId { id: InnerObjectId { id: o, client_id: self.id.clone(), serial: obj.data.serial, interface: obj.interface }})
609 } else if matches!(message_desc.signature[i], ArgumentType::Object(AllowNull::Yes)) {
610 Argument::Object(ObjectId { id: InnerObjectId { id: 0, client_id: self.id.clone(), serial: 0, interface: &ANONYMOUS_INTERFACE }})
611 } else {
612 self.post_display_error(
613 DisplayError::InvalidObject,
614 CString::new(format!(
615 "Invalid null object in request {}.{}.",
616 object.interface.name,
617 message_desc.name,
618 )).unwrap()
619 );
620 return None;
621 }
622 }
623 Argument::NewId(new_id) => {
624 // An object should be created
625 let child_interface = match message_desc.child_interface {
626 Some(iface) => iface,
627 None => panic!("Received request {}@{}.{} which creates an object without specifying its interface, this is unsupported.", object.interface.name, message.sender_id, message_desc.name),
628 };
629
630 let child_udata = Arc::new(UninitObjectData);
631
632 let child_obj = Object {
633 interface: child_interface,
634 version: object.version,
635 data: Data {
636 user_data: child_udata,
637 serial: self.next_serial(),
638 }
639 };
640
641 let child_id = InnerObjectId { id: new_id, client_id: self.id.clone(), serial: child_obj.data.serial, interface: child_obj.interface };
642 created_id = Some(child_id.clone());
643
644 if let Err(()) = self.map.insert_at(new_id, child_obj) {
645 // abort parsing, this is an unrecoverable error
646 self.post_display_error(
647 DisplayError::InvalidObject,
648 CString::new(format!("Invalid new_id: {}.", new_id)).unwrap()
649 );
650 return None;
651 }
652
653 Argument::NewId(ObjectId { id: child_id })
654 }
655 });
656 }
657 Some((new_args, message_desc.is_destructor, created_id))
658 }
659}
660
661impl<D> AsFd for Client<D> {
662 fn as_fd(&self) -> BorrowedFd<'_> {
663 self.socket.as_fd()
664 }
665}
666
667#[derive(Debug)]
668pub(crate) struct ClientStore<D: 'static> {
669 clients: Vec<Option<Client<D>>>,
670 last_serial: u32,
671 debug: bool,
672}
673
674impl<D> ClientStore<D> {
675 pub(crate) fn new(debug: bool) -> Self {
676 Self { clients: Vec::new(), last_serial: 0, debug }
677 }
678
679 pub(crate) fn create_client(
680 &mut self,
681 stream: UnixStream,
682 data: Arc<dyn ClientData>,
683 ) -> InnerClientId {
684 let serial = self.next_serial();
685 // Find the next free place
686 let (id, place) = match self.clients.iter_mut().enumerate().find(|(_, c)| c.is_none()) {
687 Some((id, place)) => (id, place),
688 None => {
689 self.clients.push(None);
690 (self.clients.len() - 1, self.clients.last_mut().unwrap())
691 }
692 };
693
694 let id = InnerClientId { id: id as u32, serial };
695
696 *place = Some(Client::new(stream, id.clone(), self.debug, data));
697
698 id
699 }
700
701 pub(crate) fn get_client(&self, id: InnerClientId) -> Result<&Client<D>, InvalidId> {
702 match self.clients.get(id.id as usize) {
703 Some(Some(client)) if client.id == id => Ok(client),
704 _ => Err(InvalidId),
705 }
706 }
707
708 pub(crate) fn get_client_mut(
709 &mut self,
710 id: InnerClientId,
711 ) -> Result<&mut Client<D>, InvalidId> {
712 match self.clients.get_mut(id.id as usize) {
713 Some(&mut Some(ref mut client)) if client.id == id => Ok(client),
714 _ => Err(InvalidId),
715 }
716 }
717
718 pub(crate) fn cleanup(
719 &mut self,
720 pending_destructors: &mut Vec<PendingDestructor<D>>,
721 ) -> SmallVec<[ClientId; 1]> {
722 let mut cleaned = SmallVec::new();
723 for place in &mut self.clients {
724 if place.as_ref().map(|client| client.killed).unwrap_or(false) {
725 // Remove the client from the store and flush it one last time before dropping it
726 let mut client = place.take().unwrap();
727 client.queue_all_destructors(pending_destructors);
728 let _ = client.flush();
729 cleaned.push(ClientId { id: client.id });
730 }
731 }
732 cleaned
733 }
734
735 fn next_serial(&mut self) -> u32 {
736 self.last_serial = self.last_serial.wrapping_add(1);
737 self.last_serial
738 }
739
740 pub(crate) fn clients_mut(&mut self) -> impl Iterator<Item = &mut Client<D>> {
741 self.clients.iter_mut().flat_map(|o| o.as_mut()).filter(|c| !c.killed)
742 }
743
744 pub(crate) fn all_clients_id(&self) -> impl Iterator<Item = ClientId> + '_ {
745 self.clients.iter().flat_map(|opt| {
746 opt.as_ref().filter(|c| !c.killed).map(|client| ClientId { id: client.id.clone() })
747 })
748 }
749}
750