1use std::{
2 io::{self, BufReader},
3 net::TcpStream,
4 thread,
5};
6
7use crossbeam_channel::{bounded, Receiver, Sender};
8
9use crate::{
10 stdio::{make_io_threads, IoThreads},
11 Message,
12};
13
14pub(crate) fn socket_transport(
15 stream: TcpStream,
16) -> (Sender<Message>, Receiver<Message>, IoThreads) {
17 let (reader_receiver: Receiver, reader: JoinHandle>) = make_reader(stream:stream.try_clone().unwrap());
18 let (writer_sender: Sender, writer: JoinHandle>) = make_write(stream);
19 let io_threads: IoThreads = make_io_threads(reader, writer);
20 (writer_sender, reader_receiver, io_threads)
21}
22
23fn make_reader(stream: TcpStream) -> (Receiver<Message>, thread::JoinHandle<io::Result<()>>) {
24 let (reader_sender: Sender, reader_receiver: Receiver) = bounded::<Message>(cap:0);
25 let reader: JoinHandle> = thread::spawn(move || {
26 let mut buf_read: BufReader = BufReader::new(inner:stream);
27 while let Some(msg: Message) = Message::read(&mut buf_read).unwrap() {
28 let is_exit: bool = matches!(&msg, Message::Notification(n) if n.is_exit());
29 reader_sender.send(msg).unwrap();
30 if is_exit {
31 break;
32 }
33 }
34 Ok(())
35 });
36 (reader_receiver, reader)
37}
38
39fn make_write(mut stream: TcpStream) -> (Sender<Message>, thread::JoinHandle<io::Result<()>>) {
40 let (writer_sender: Sender, writer_receiver: Receiver) = bounded::<Message>(cap:0);
41 let writer: JoinHandle> = thread::spawn(move || {
42 writer_receiver.into_iter().try_for_each(|it: Message| it.write(&mut stream)).unwrap();
43 Ok(())
44 });
45 (writer_sender, writer)
46}
47