1//! A language server scaffold, exposing a synchronous crossbeam-channel based API.
2//! This crate handles protocol handshaking and parsing messages, while you
3//! control the message dispatch loop yourself.
4//!
5//! Run with `RUST_LOG=lsp_server=debug` to see all the messages.
6
7#![warn(rust_2018_idioms, unused_lifetimes)]
8
9mod msg;
10mod stdio;
11mod error;
12mod socket;
13mod req_queue;
14
15use std::{
16 io,
17 net::{TcpListener, TcpStream, ToSocketAddrs},
18};
19
20use crossbeam_channel::{Receiver, RecvError, RecvTimeoutError, Sender};
21
22pub use crate::{
23 error::{ExtractError, ProtocolError},
24 msg::{ErrorCode, Message, Notification, Request, RequestId, Response, ResponseError},
25 req_queue::{Incoming, Outgoing, ReqQueue},
26 stdio::IoThreads,
27};
28
29/// Connection is just a pair of channels of LSP messages.
30pub struct Connection {
31 pub sender: Sender<Message>,
32 pub receiver: Receiver<Message>,
33}
34
35impl Connection {
36 /// Create connection over standard in/standard out.
37 ///
38 /// Use this to create a real language server.
39 pub fn stdio() -> (Connection, IoThreads) {
40 let (sender, receiver, io_threads) = stdio::stdio_transport();
41 (Connection { sender, receiver }, io_threads)
42 }
43
44 /// Open a connection over tcp.
45 /// This call blocks until a connection is established.
46 ///
47 /// Use this to create a real language server.
48 pub fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
49 let stream = TcpStream::connect(addr)?;
50 let (sender, receiver, io_threads) = socket::socket_transport(stream);
51 Ok((Connection { sender, receiver }, io_threads))
52 }
53
54 /// Listen for a connection over tcp.
55 /// This call blocks until a connection is established.
56 ///
57 /// Use this to create a real language server.
58 pub fn listen<A: ToSocketAddrs>(addr: A) -> io::Result<(Connection, IoThreads)> {
59 let listener = TcpListener::bind(addr)?;
60 let (stream, _) = listener.accept()?;
61 let (sender, receiver, io_threads) = socket::socket_transport(stream);
62 Ok((Connection { sender, receiver }, io_threads))
63 }
64
65 /// Creates a pair of connected connections.
66 ///
67 /// Use this for testing.
68 pub fn memory() -> (Connection, Connection) {
69 let (s1, r1) = crossbeam_channel::unbounded();
70 let (s2, r2) = crossbeam_channel::unbounded();
71 (Connection { sender: s1, receiver: r2 }, Connection { sender: s2, receiver: r1 })
72 }
73
74 /// Starts the initialization process by waiting for an initialize
75 /// request from the client. Use this for more advanced customization than
76 /// `initialize` can provide.
77 ///
78 /// Returns the request id and serialized `InitializeParams` from the client.
79 ///
80 /// # Example
81 ///
82 /// ```no_run
83 /// use std::error::Error;
84 /// use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
85 ///
86 /// use lsp_server::{Connection, Message, Request, RequestId, Response};
87 ///
88 /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
89 /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could
90 /// // also be implemented to use sockets or HTTP.
91 /// let (connection, io_threads) = Connection::stdio();
92 ///
93 /// // Run the server
94 /// let (id, params) = connection.initialize_start()?;
95 ///
96 /// let init_params: InitializeParams = serde_json::from_value(params).unwrap();
97 /// let client_capabilities: ClientCapabilities = init_params.capabilities;
98 /// let server_capabilities = ServerCapabilities::default();
99 ///
100 /// let initialize_data = serde_json::json!({
101 /// "capabilities": server_capabilities,
102 /// "serverInfo": {
103 /// "name": "lsp-server-test",
104 /// "version": "0.1"
105 /// }
106 /// });
107 ///
108 /// connection.initialize_finish(id, initialize_data)?;
109 ///
110 /// // ... Run main loop ...
111 ///
112 /// Ok(())
113 /// }
114 /// ```
115 pub fn initialize_start(&self) -> Result<(RequestId, serde_json::Value), ProtocolError> {
116 self.initialize_start_while(|| true)
117 }
118
119 /// Starts the initialization process by waiting for an initialize as described in
120 /// [`Self::initialize_start`] as long as `running` returns
121 /// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
122 ///
123 /// # Example
124 ///
125 /// ```rust
126 /// use std::sync::atomic::{AtomicBool, Ordering};
127 /// use std::sync::Arc;
128 /// # use std::error::Error;
129 /// # use lsp_types::{ClientCapabilities, InitializeParams, ServerCapabilities};
130 /// # use lsp_server::{Connection, Message, Request, RequestId, Response};
131 /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
132 /// let running = Arc::new(AtomicBool::new(true));
133 /// # running.store(true, Ordering::SeqCst);
134 /// let r = running.clone();
135 ///
136 /// ctrlc::set_handler(move || {
137 /// r.store(false, Ordering::SeqCst);
138 /// }).expect("Error setting Ctrl-C handler");
139 ///
140 /// let (connection, io_threads) = Connection::stdio();
141 ///
142 /// let res = connection.initialize_start_while(|| running.load(Ordering::SeqCst));
143 /// # assert!(res.is_err());
144 ///
145 /// # Ok(())
146 /// # }
147 /// ```
148 pub fn initialize_start_while<C>(
149 &self,
150 running: C,
151 ) -> Result<(RequestId, serde_json::Value), ProtocolError>
152 where
153 C: Fn() -> bool,
154 {
155 while running() {
156 let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
157 Ok(msg) => msg,
158 Err(RecvTimeoutError::Timeout) => {
159 continue;
160 }
161 Err(RecvTimeoutError::Disconnected) => return Err(ProtocolError::disconnected()),
162 };
163
164 match msg {
165 Message::Request(req) if req.is_initialize() => return Ok((req.id, req.params)),
166 // Respond to non-initialize requests with ServerNotInitialized
167 Message::Request(req) => {
168 let resp = Response::new_err(
169 req.id.clone(),
170 ErrorCode::ServerNotInitialized as i32,
171 format!("expected initialize request, got {req:?}"),
172 );
173 self.sender.send(resp.into()).unwrap();
174 continue;
175 }
176 Message::Notification(n) if !n.is_exit() => {
177 continue;
178 }
179 msg => {
180 return Err(ProtocolError::new(format!(
181 "expected initialize request, got {msg:?}"
182 )));
183 }
184 };
185 }
186
187 return Err(ProtocolError::new(String::from(
188 "Initialization has been aborted during initialization",
189 )));
190 }
191
192 /// Finishes the initialization process by sending an `InitializeResult` to the client
193 pub fn initialize_finish(
194 &self,
195 initialize_id: RequestId,
196 initialize_result: serde_json::Value,
197 ) -> Result<(), ProtocolError> {
198 let resp = Response::new_ok(initialize_id, initialize_result);
199 self.sender.send(resp.into()).unwrap();
200 match &self.receiver.recv() {
201 Ok(Message::Notification(n)) if n.is_initialized() => Ok(()),
202 Ok(msg) => Err(ProtocolError::new(format!(
203 r#"expected initialized notification, got: {msg:?}"#
204 ))),
205 Err(RecvError) => Err(ProtocolError::disconnected()),
206 }
207 }
208
209 /// Finishes the initialization process as described in [`Self::initialize_finish`] as
210 /// long as `running` returns `true` while the return value can be changed through a sig
211 /// handler such as `CTRL + C`.
212 pub fn initialize_finish_while<C>(
213 &self,
214 initialize_id: RequestId,
215 initialize_result: serde_json::Value,
216 running: C,
217 ) -> Result<(), ProtocolError>
218 where
219 C: Fn() -> bool,
220 {
221 let resp = Response::new_ok(initialize_id, initialize_result);
222 self.sender.send(resp.into()).unwrap();
223
224 while running() {
225 let msg = match self.receiver.recv_timeout(std::time::Duration::from_secs(1)) {
226 Ok(msg) => msg,
227 Err(RecvTimeoutError::Timeout) => {
228 continue;
229 }
230 Err(RecvTimeoutError::Disconnected) => {
231 return Err(ProtocolError::disconnected());
232 }
233 };
234
235 match msg {
236 Message::Notification(n) if n.is_initialized() => {
237 return Ok(());
238 }
239 msg => {
240 return Err(ProtocolError::new(format!(
241 r#"expected initialized notification, got: {msg:?}"#
242 )));
243 }
244 }
245 }
246
247 return Err(ProtocolError::new(String::from(
248 "Initialization has been aborted during initialization",
249 )));
250 }
251
252 /// Initialize the connection. Sends the server capabilities
253 /// to the client and returns the serialized client capabilities
254 /// on success. If more fine-grained initialization is required use
255 /// `initialize_start`/`initialize_finish`.
256 ///
257 /// # Example
258 ///
259 /// ```no_run
260 /// use std::error::Error;
261 /// use lsp_types::ServerCapabilities;
262 ///
263 /// use lsp_server::{Connection, Message, Request, RequestId, Response};
264 ///
265 /// fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
266 /// // Create the transport. Includes the stdio (stdin and stdout) versions but this could
267 /// // also be implemented to use sockets or HTTP.
268 /// let (connection, io_threads) = Connection::stdio();
269 ///
270 /// // Run the server
271 /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
272 /// let initialization_params = connection.initialize(server_capabilities)?;
273 ///
274 /// // ... Run main loop ...
275 ///
276 /// Ok(())
277 /// }
278 /// ```
279 pub fn initialize(
280 &self,
281 server_capabilities: serde_json::Value,
282 ) -> Result<serde_json::Value, ProtocolError> {
283 let (id, params) = self.initialize_start()?;
284
285 let initialize_data = serde_json::json!({
286 "capabilities": server_capabilities,
287 });
288
289 self.initialize_finish(id, initialize_data)?;
290
291 Ok(params)
292 }
293
294 /// Initialize the connection as described in [`Self::initialize`] as long as `running` returns
295 /// `true` while the return value can be changed through a sig handler such as `CTRL + C`.
296 ///
297 /// # Example
298 ///
299 /// ```rust
300 /// use std::sync::atomic::{AtomicBool, Ordering};
301 /// use std::sync::Arc;
302 /// # use std::error::Error;
303 /// # use lsp_types::ServerCapabilities;
304 /// # use lsp_server::{Connection, Message, Request, RequestId, Response};
305 ///
306 /// # fn main() -> Result<(), Box<dyn Error + Sync + Send>> {
307 /// let running = Arc::new(AtomicBool::new(true));
308 /// # running.store(true, Ordering::SeqCst);
309 /// let r = running.clone();
310 ///
311 /// ctrlc::set_handler(move || {
312 /// r.store(false, Ordering::SeqCst);
313 /// }).expect("Error setting Ctrl-C handler");
314 ///
315 /// let (connection, io_threads) = Connection::stdio();
316 ///
317 /// let server_capabilities = serde_json::to_value(&ServerCapabilities::default()).unwrap();
318 /// let initialization_params = connection.initialize_while(
319 /// server_capabilities,
320 /// || running.load(Ordering::SeqCst)
321 /// );
322 ///
323 /// # assert!(initialization_params.is_err());
324 /// # Ok(())
325 /// # }
326 /// ```
327 pub fn initialize_while<C>(
328 &self,
329 server_capabilities: serde_json::Value,
330 running: C,
331 ) -> Result<serde_json::Value, ProtocolError>
332 where
333 C: Fn() -> bool,
334 {
335 let (id, params) = self.initialize_start_while(&running)?;
336
337 let initialize_data = serde_json::json!({
338 "capabilities": server_capabilities,
339 });
340
341 self.initialize_finish_while(id, initialize_data, running)?;
342
343 Ok(params)
344 }
345
346 /// If `req` is `Shutdown`, respond to it and return `true`, otherwise return `false`
347 pub fn handle_shutdown(&self, req: &Request) -> Result<bool, ProtocolError> {
348 if !req.is_shutdown() {
349 return Ok(false);
350 }
351 let resp = Response::new_ok(req.id.clone(), ());
352 let _ = self.sender.send(resp.into());
353 match &self.receiver.recv_timeout(std::time::Duration::from_secs(30)) {
354 Ok(Message::Notification(n)) if n.is_exit() => (),
355 Ok(msg) => {
356 return Err(ProtocolError::new(format!(
357 "unexpected message during shutdown: {msg:?}"
358 )))
359 }
360 Err(RecvTimeoutError::Timeout) => {
361 return Err(ProtocolError::new(format!("timed out waiting for exit notification")))
362 }
363 Err(RecvTimeoutError::Disconnected) => {
364 return Err(ProtocolError::new(format!(
365 "channel disconnected waiting for exit notification"
366 )))
367 }
368 }
369 Ok(true)
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use crossbeam_channel::unbounded;
376 use lsp_types::notification::{Exit, Initialized, Notification};
377 use lsp_types::request::{Initialize, Request};
378 use lsp_types::{InitializeParams, InitializedParams};
379 use serde_json::to_value;
380
381 use crate::{Connection, Message, ProtocolError, RequestId};
382
383 struct TestCase {
384 test_messages: Vec<Message>,
385 expected_resp: Result<(RequestId, serde_json::Value), ProtocolError>,
386 }
387
388 fn initialize_start_test(test_case: TestCase) {
389 let (reader_sender, reader_receiver) = unbounded::<Message>();
390 let (writer_sender, writer_receiver) = unbounded::<Message>();
391 let conn = Connection { sender: writer_sender, receiver: reader_receiver };
392
393 for msg in test_case.test_messages {
394 assert!(reader_sender.send(msg).is_ok());
395 }
396
397 let resp = conn.initialize_start();
398 assert_eq!(test_case.expected_resp, resp);
399
400 assert!(writer_receiver.recv_timeout(std::time::Duration::from_secs(1)).is_err());
401 }
402
403 #[test]
404 fn not_exit_notification() {
405 let notification = crate::Notification {
406 method: Initialized::METHOD.to_string(),
407 params: to_value(InitializedParams {}).unwrap(),
408 };
409
410 let params_as_value = to_value(InitializeParams::default()).unwrap();
411 let req_id = RequestId::from(234);
412 let request = crate::Request {
413 id: req_id.clone(),
414 method: Initialize::METHOD.to_string(),
415 params: params_as_value.clone(),
416 };
417
418 initialize_start_test(TestCase {
419 test_messages: vec![notification.into(), request.into()],
420 expected_resp: Ok((req_id, params_as_value)),
421 });
422 }
423
424 #[test]
425 fn exit_notification() {
426 let notification =
427 crate::Notification { method: Exit::METHOD.to_string(), params: to_value(()).unwrap() };
428 let notification_msg = Message::from(notification);
429
430 initialize_start_test(TestCase {
431 test_messages: vec![notification_msg.clone()],
432 expected_resp: Err(ProtocolError::new(format!(
433 "expected initialize request, got {:?}",
434 notification_msg
435 ))),
436 });
437 }
438}
439