1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
11#define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
12
13#include <boost/beast/websocket/teardown.hpp>
14#include <boost/beast/websocket/detail/mask.hpp>
15#include <boost/beast/websocket/impl/stream_impl.hpp>
16#include <boost/beast/core/async_base.hpp>
17#include <boost/beast/core/flat_static_buffer.hpp>
18#include <boost/beast/core/stream_traits.hpp>
19#include <boost/beast/core/detail/bind_continuation.hpp>
20#include <boost/asio/coroutine.hpp>
21#include <boost/asio/dispatch.hpp>
22#include <boost/throw_exception.hpp>
23#include <memory>
24
25namespace boost {
26namespace beast {
27namespace websocket {
28
29/* Close the WebSocket Connection
30
31 This composed operation sends the close frame if it hasn't already
32 been sent, then reads and discards frames until receiving a close
33 frame. Finally it invokes the teardown operation to shut down the
34 underlying connection.
35*/
36template<class NextLayer, bool deflateSupported>
37template<class Handler>
38class stream<NextLayer, deflateSupported>::close_op
39 : public beast::stable_async_base<
40 Handler, beast::executor_type<stream>>
41 , public asio::coroutine
42{
43 boost::weak_ptr<impl_type> wp_;
44 error_code ev_;
45 detail::frame_buffer& fb_;
46
47public:
48 static constexpr int id = 5; // for soft_mutex
49
50 template<class Handler_>
51 close_op(
52 Handler_&& h,
53 boost::shared_ptr<impl_type> const& sp,
54 close_reason const& cr)
55 : stable_async_base<Handler,
56 beast::executor_type<stream>>(
57 std::forward<Handler_>(h),
58 sp->stream().get_executor())
59 , wp_(sp)
60 , fb_(beast::allocate_stable<
61 detail::frame_buffer>(*this))
62 {
63 // Serialize the close frame
64 sp->template write_close<
65 flat_static_buffer_base>(fb_, cr);
66 (*this)({}, 0, false);
67 }
68
69 void
70 operator()(
71 error_code ec = {},
72 std::size_t bytes_transferred = 0,
73 bool cont = true)
74 {
75 using beast::detail::clamp;
76 auto sp = wp_.lock();
77 if(! sp)
78 {
79 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
80 return this->complete(cont, ec);
81 }
82 auto& impl = *sp;
83 BOOST_ASIO_CORO_REENTER(*this)
84 {
85 // Acquire the write lock
86 if(! impl.wr_block.try_lock(this))
87 {
88 BOOST_ASIO_CORO_YIELD
89 {
90 BOOST_ASIO_HANDLER_LOCATION((
91 __FILE__, __LINE__,
92 "websocket::async_close"));
93 this->set_allowed_cancellation(net::cancellation_type::all);
94 impl.op_close.emplace(std::move(*this),
95 net::cancellation_type::all);
96 }
97 // cancel fired before we could do anything.
98 if (ec == net::error::operation_aborted)
99 return this->complete(cont, ec);
100 this->set_allowed_cancellation(net::cancellation_type::terminal);
101
102 impl.wr_block.lock(this);
103 BOOST_ASIO_CORO_YIELD
104 {
105 BOOST_ASIO_HANDLER_LOCATION((
106 __FILE__, __LINE__,
107 "websocket::async_close"));
108
109 const auto ex = this->get_immediate_executor();
110 net::dispatch(ex, std::move(*this));
111 }
112 BOOST_ASSERT(impl.wr_block.is_locked(this));
113 }
114 if(impl.check_stop_now(ec))
115 goto upcall;
116
117 // Can't call close twice
118 // TODO return a custom error code
119 BOOST_ASSERT(! impl.wr_close);
120
121 // Send close frame
122 impl.wr_close = true;
123 impl.change_status(status::closing);
124 impl.update_timer(this->get_executor());
125 BOOST_ASIO_CORO_YIELD
126 {
127 BOOST_ASIO_HANDLER_LOCATION((
128 __FILE__, __LINE__,
129 "websocket::async_close"));
130
131 net::async_write(impl.stream(), fb_.data(),
132 beast::detail::bind_continuation(std::move(*this)));
133 }
134 if(impl.check_stop_now(ec))
135 goto upcall;
136
137 if(impl.rd_close)
138 {
139 // This happens when the read_op gets a close frame
140 // at the same time close_op is sending the close frame.
141 // The read_op will be suspended on the write block.
142 goto teardown;
143 }
144
145 // Acquire the read lock
146 if(! impl.rd_block.try_lock(this))
147 {
148 BOOST_ASIO_CORO_YIELD
149 {
150 BOOST_ASIO_HANDLER_LOCATION((
151 __FILE__, __LINE__,
152 "websocket::async_close"));
153 // terminal only, that's the default
154 impl.op_r_close.emplace(std::move(*this));
155 }
156 if (ec == net::error::operation_aborted)
157 {
158 // if a cancellation fires here, we do a dirty shutdown
159 impl.change_status(status::closed);
160 close_socket(get_lowest_layer(impl.stream()));
161 return this->complete(cont, ec);
162 }
163
164 impl.rd_block.lock(this);
165 BOOST_ASIO_CORO_YIELD
166 {
167 BOOST_ASIO_HANDLER_LOCATION((
168 __FILE__, __LINE__,
169 "websocket::async_close"));
170
171 const auto ex = this->get_immediate_executor();
172 net::dispatch(ex, std::move(*this));
173 }
174 BOOST_ASSERT(impl.rd_block.is_locked(this));
175 if(impl.check_stop_now(ec))
176 goto upcall;
177 BOOST_ASSERT(! impl.rd_close);
178 }
179
180 // Read until a receiving a close frame
181 // TODO There should be a timeout on this
182 if(impl.rd_remain > 0)
183 goto read_payload;
184 for(;;)
185 {
186 // Read frame header
187 while(! impl.parse_fh(
188 impl.rd_fh, impl.rd_buf, ev_))
189 {
190 if(ev_)
191 goto teardown;
192 BOOST_ASIO_CORO_YIELD
193 {
194 BOOST_ASIO_HANDLER_LOCATION((
195 __FILE__, __LINE__,
196 "websocket::async_close"));
197
198 impl.stream().async_read_some(
199 impl.rd_buf.prepare(read_size(
200 impl.rd_buf, impl.rd_buf.max_size())),
201 beast::detail::bind_continuation(std::move(*this)));
202 }
203 impl.rd_buf.commit(bytes_transferred);
204 if(impl.check_stop_now(ec)) //< this catches cancellation
205 goto upcall;
206 }
207 if(detail::is_control(op: impl.rd_fh.op))
208 {
209 // Discard ping or pong frame
210 if(impl.rd_fh.op != detail::opcode::close)
211 {
212 impl.rd_buf.consume(clamp(impl.rd_fh.len));
213 continue;
214 }
215
216 // Process close frame
217 // TODO Should we invoke the control callback?
218 BOOST_ASSERT(! impl.rd_close);
219 impl.rd_close = true;
220 auto const mb = buffers_prefix(
221 clamp(impl.rd_fh.len),
222 impl.rd_buf.data());
223 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
224 detail::mask_inplace(mb, impl.rd_key);
225 detail::read_close(impl.cr, mb, ev_);
226 if(ev_)
227 goto teardown;
228 impl.rd_buf.consume(clamp(impl.rd_fh.len));
229 goto teardown;
230 }
231
232 read_payload:
233 // Discard message frame
234 while(impl.rd_buf.size() < impl.rd_remain)
235 {
236 impl.rd_remain -= impl.rd_buf.size();
237 impl.rd_buf.consume(impl.rd_buf.size());
238 BOOST_ASIO_CORO_YIELD
239 {
240 BOOST_ASIO_HANDLER_LOCATION((
241 __FILE__, __LINE__,
242 "websocket::async_close"));
243
244 impl.stream().async_read_some(
245 impl.rd_buf.prepare(read_size(
246 impl.rd_buf, impl.rd_buf.max_size())),
247 beast::detail::bind_continuation(std::move(*this)));
248 }
249 impl.rd_buf.commit(bytes_transferred);
250 if(impl.check_stop_now(ec))
251 goto upcall;
252 }
253 BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
254 impl.rd_buf.consume(clamp(impl.rd_remain));
255 impl.rd_remain = 0;
256 }
257
258 teardown:
259 // Teardown
260 BOOST_ASSERT(impl.wr_block.is_locked(this));
261 using beast::websocket::async_teardown;
262 BOOST_ASIO_CORO_YIELD
263 {
264 BOOST_ASIO_HANDLER_LOCATION((
265 __FILE__, __LINE__,
266 "websocket::async_close"));
267
268 async_teardown(impl.role, impl.stream(),
269 beast::detail::bind_continuation(std::move(*this)));
270 }
271 BOOST_ASSERT(impl.wr_block.is_locked(this));
272 if(ec == net::error::eof)
273 {
274 // Rationale:
275 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
276 ec = {};
277 }
278 if(! ec)
279 {
280 BOOST_BEAST_ASSIGN_EC(ec, ev_);
281 }
282 if(ec)
283 impl.change_status(status::failed);
284 else
285 impl.change_status(status::closed);
286 impl.close();
287
288 upcall:
289 impl.wr_block.unlock(this);
290 impl.rd_block.try_unlock(this)
291 && impl.op_r_rd.maybe_invoke();
292 impl.op_rd.maybe_invoke()
293 || impl.op_idle_ping.maybe_invoke()
294 || impl.op_ping.maybe_invoke()
295 || impl.op_wr.maybe_invoke();
296 this->complete(cont, ec);
297 }
298 }
299};
300
301template<class NextLayer, bool deflateSupported>
302struct stream<NextLayer, deflateSupported>::
303 run_close_op
304{
305 template<class CloseHandler>
306 void
307 operator()(
308 CloseHandler&& h,
309 boost::shared_ptr<impl_type> const& sp,
310 close_reason const& cr)
311 {
312 // If you get an error on the following line it means
313 // that your handler does not meet the documented type
314 // requirements for the handler.
315
316 static_assert(
317 beast::detail::is_invocable<CloseHandler,
318 void(error_code)>::value,
319 "CloseHandler type requirements not met");
320
321 close_op<
322 typename std::decay<CloseHandler>::type>(
323 std::forward<CloseHandler>(h),
324 sp,
325 cr);
326 }
327};
328
329//------------------------------------------------------------------------------
330
331template<class NextLayer, bool deflateSupported>
332void
333stream<NextLayer, deflateSupported>::
334close(close_reason const& cr)
335{
336 static_assert(is_sync_stream<next_layer_type>::value,
337 "SyncStream type requirements not met");
338 error_code ec;
339 close(cr, ec);
340 if(ec)
341 BOOST_THROW_EXCEPTION(system_error{ec});
342}
343
344template<class NextLayer, bool deflateSupported>
345void
346stream<NextLayer, deflateSupported>::
347close(close_reason const& cr, error_code& ec)
348{
349 static_assert(is_sync_stream<next_layer_type>::value,
350 "SyncStream type requirements not met");
351 using beast::detail::clamp;
352 auto& impl = *impl_;
353 ec = {};
354 if(impl.check_stop_now(ec))
355 return;
356 BOOST_ASSERT(! impl.rd_close);
357
358 // Can't call close twice
359 // TODO return a custom error code
360 BOOST_ASSERT(! impl.wr_close);
361
362 // Send close frame
363 {
364 impl.wr_close = true;
365 impl.change_status(status::closing);
366 detail::frame_buffer fb;
367 impl.template write_close<flat_static_buffer_base>(fb, cr);
368 net::write(impl.stream(), fb.data(), ec);
369 if(impl.check_stop_now(ec))
370 return;
371 }
372
373 // Read until a receiving a close frame
374 error_code ev;
375 if(impl.rd_remain > 0)
376 goto read_payload;
377 for(;;)
378 {
379 // Read frame header
380 while(! impl.parse_fh(
381 impl.rd_fh, impl.rd_buf, ev))
382 {
383 if(ev)
384 {
385 // Protocol violation
386 return do_fail(code: close_code::none, ev, ec);
387 }
388 impl.rd_buf.commit(impl.stream().read_some(
389 impl.rd_buf.prepare(read_size(
390 impl.rd_buf, impl.rd_buf.max_size())), ec));
391 if(impl.check_stop_now(ec))
392 return;
393 }
394
395 if(detail::is_control(op: impl.rd_fh.op))
396 {
397 // Discard ping/pong frame
398 if(impl.rd_fh.op != detail::opcode::close)
399 {
400 impl.rd_buf.consume(clamp(impl.rd_fh.len));
401 continue;
402 }
403
404 // Handle close frame
405 // TODO Should we invoke the control callback?
406 BOOST_ASSERT(! impl.rd_close);
407 impl.rd_close = true;
408 auto const mb = buffers_prefix(
409 clamp(impl.rd_fh.len),
410 impl.rd_buf.data());
411 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
412 detail::mask_inplace(mb, impl.rd_key);
413 detail::read_close(impl.cr, mb, ev);
414 if(ev)
415 {
416 // Protocol violation
417 return do_fail(code: close_code::none, ev, ec);
418 }
419 impl.rd_buf.consume(clamp(impl.rd_fh.len));
420 break;
421 }
422
423 read_payload:
424 // Discard message frame
425 while(impl.rd_buf.size() < impl.rd_remain)
426 {
427 impl.rd_remain -= impl.rd_buf.size();
428 impl.rd_buf.consume(impl.rd_buf.size());
429 impl.rd_buf.commit(
430 impl.stream().read_some(
431 impl.rd_buf.prepare(
432 read_size(
433 impl.rd_buf,
434 impl.rd_buf.max_size())),
435 ec));
436 if(impl.check_stop_now(ec))
437 return;
438 }
439 BOOST_ASSERT(
440 impl.rd_buf.size() >= impl.rd_remain);
441 impl.rd_buf.consume(clamp(impl.rd_remain));
442 impl.rd_remain = 0;
443 }
444 // _Close the WebSocket Connection_
445 do_fail(code: close_code::none, ev: error::closed, ec);
446 if(ec == error::closed)
447 ec = {};
448}
449
450template<class NextLayer, bool deflateSupported>
451template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
452BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
453stream<NextLayer, deflateSupported>::
454async_close(close_reason const& cr, CloseHandler&& handler)
455{
456 static_assert(is_async_stream<next_layer_type>::value,
457 "AsyncStream type requirements not met");
458 return net::async_initiate<
459 CloseHandler,
460 void(error_code)>(
461 run_close_op{},
462 handler,
463 impl_,
464 cr);
465}
466
467} // websocket
468} // beast
469} // boost
470
471#endif
472

source code of boost/libs/beast/include/boost/beast/websocket/impl/close.hpp