1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
11#define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
12
13#include <boost/beast/core/buffer_traits.hpp>
14#include <boost/beast/websocket/teardown.hpp>
15#include <boost/beast/websocket/detail/mask.hpp>
16#include <boost/beast/websocket/impl/stream_impl.hpp>
17#include <boost/beast/core/async_base.hpp>
18#include <boost/beast/core/buffers_prefix.hpp>
19#include <boost/beast/core/buffers_suffix.hpp>
20#include <boost/beast/core/flat_static_buffer.hpp>
21#include <boost/beast/core/read_size.hpp>
22#include <boost/beast/core/stream_traits.hpp>
23#include <boost/beast/core/detail/bind_continuation.hpp>
24#include <boost/beast/core/detail/buffer.hpp>
25#include <boost/beast/core/detail/clamp.hpp>
26#include <boost/beast/core/detail/config.hpp>
27#include <boost/asio/coroutine.hpp>
28#include <boost/assert.hpp>
29#include <boost/config.hpp>
30#include <boost/optional.hpp>
31#include <boost/throw_exception.hpp>
32#include <algorithm>
33#include <limits>
34#include <memory>
35
36namespace boost {
37namespace beast {
38namespace websocket {
39
40/* Read some message data into a buffer sequence.
41
42 Also reads and handles control frames.
43*/
44template<class NextLayer, bool deflateSupported>
45template<class Handler, class MutableBufferSequence>
46class stream<NextLayer, deflateSupported>::read_some_op
47 : public beast::async_base<
48 Handler, beast::executor_type<stream>>
49 , public asio::coroutine
50{
51 boost::weak_ptr<impl_type> wp_;
52 MutableBufferSequence bs_;
53 buffers_suffix<MutableBufferSequence> cb_;
54 std::size_t bytes_written_ = 0;
55 error_code result_;
56 close_code code_;
57 bool did_read_ = false;
58
59public:
60 static constexpr int id = 1; // for soft_mutex
61
62 template<class Handler_>
63 read_some_op(
64 Handler_&& h,
65 boost::shared_ptr<impl_type> const& sp,
66 MutableBufferSequence const& bs)
67 : async_base<
68 Handler, beast::executor_type<stream>>(
69 std::forward<Handler_>(h),
70 sp->stream().get_executor())
71 , wp_(sp)
72 , bs_(bs)
73 , cb_(bs)
74 , code_(close_code::none)
75 {
76 (*this)({}, 0, false);
77 }
78
79 void operator()(
80 error_code ec = {},
81 std::size_t bytes_transferred = 0,
82 bool cont = true)
83 {
84 using beast::detail::clamp;
85 auto sp = wp_.lock();
86 if(! sp)
87 {
88 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
89 bytes_written_ = 0;
90 return this->complete(cont, ec, bytes_written_);
91 }
92 auto& impl = *sp;
93 BOOST_ASIO_CORO_REENTER(*this)
94 {
95 impl.update_timer(this->get_executor());
96
97 acquire_read_lock:
98 // Acquire the read lock
99 if(! impl.rd_block.try_lock(this))
100 {
101 do_suspend:
102 BOOST_ASIO_CORO_YIELD
103 {
104 BOOST_ASIO_HANDLER_LOCATION((
105 __FILE__, __LINE__,
106 "websocket::async_read_some"));
107
108 this->set_allowed_cancellation(net::cancellation_type::all);
109 impl.op_r_rd.emplace(std::move(*this), net::cancellation_type::all);
110 }
111 if (ec)
112 return this->complete(cont, ec, bytes_written_);
113
114 this->set_allowed_cancellation(net::cancellation_type::terminal);
115
116 impl.rd_block.lock(this);
117 BOOST_ASIO_CORO_YIELD
118 {
119 BOOST_ASIO_HANDLER_LOCATION((
120 __FILE__, __LINE__,
121 "websocket::async_read_some"));
122
123 const auto ex = this->get_immediate_executor();
124 net::dispatch(ex, std::move(*this));
125 }
126 BOOST_ASSERT(impl.rd_block.is_locked(this));
127
128 BOOST_ASSERT(!ec);
129 if(impl.check_stop_now(ec))
130 {
131 // Issue 2264 - There is no guarantee that the next
132 // error will be operation_aborted.
133 // The error could be a result of the peer resetting the
134 // connection
135 // BOOST_ASSERT(ec == net::error::operation_aborted);
136 goto upcall;
137 }
138 // VFALCO Should never get here
139
140 // The only way to get read blocked is if
141 // a `close_op` wrote a close frame
142 BOOST_ASSERT(impl.wr_close);
143 BOOST_ASSERT(impl.status_ != status::open);
144 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
145 goto upcall;
146 }
147 else
148 {
149 // Make sure the stream is not closed
150 if( impl.status_ == status::closed ||
151 impl.status_ == status::failed)
152 {
153 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
154 goto upcall;
155 }
156 }
157
158 // if status_ == status::closing, we want to suspend
159 // the read operation until the close completes,
160 // then finish the read with operation_aborted.
161
162 loop:
163 BOOST_ASSERT(impl.rd_block.is_locked(this));
164 // See if we need to read a frame header. This
165 // condition is structured to give the decompressor
166 // a chance to emit the final empty deflate block
167 //
168 if(impl.rd_remain == 0 &&
169 (! impl.rd_fh.fin || impl.rd_done))
170 {
171 // Read frame header
172 while(! impl.parse_fh(
173 impl.rd_fh, impl.rd_buf, result_))
174 {
175 if(result_)
176 {
177 // _Fail the WebSocket Connection_
178 if(result_ == error::message_too_big)
179 code_ = close_code::too_big;
180 else
181 code_ = close_code::protocol_error;
182 goto close;
183 }
184 BOOST_ASSERT(impl.rd_block.is_locked(this));
185 BOOST_ASIO_CORO_YIELD
186 {
187 BOOST_ASIO_HANDLER_LOCATION((
188 __FILE__, __LINE__,
189 "websocket::async_read_some"));
190
191 impl.stream().async_read_some(
192 impl.rd_buf.prepare(read_size(
193 impl.rd_buf, impl.rd_buf.max_size())),
194 std::move(*this));
195 }
196 BOOST_ASSERT(impl.rd_block.is_locked(this));
197 impl.rd_buf.commit(bytes_transferred);
198 if(impl.check_stop_now(ec))
199 goto upcall;
200 impl.reset_idle();
201
202 // Allow a close operation
203 // to acquire the read block
204 impl.rd_block.unlock(this);
205 if( impl.op_r_close.maybe_invoke())
206 {
207 // Suspend
208 BOOST_ASSERT(impl.rd_block.is_locked());
209 goto do_suspend;
210 }
211 // Acquire read block
212 impl.rd_block.lock(this);
213 }
214 // Immediately apply the mask to the portion
215 // of the buffer holding payload data.
216 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
217 detail::mask_inplace(buffers_prefix(
218 clamp(impl.rd_fh.len),
219 impl.rd_buf.data()),
220 impl.rd_key);
221 if(detail::is_control(op: impl.rd_fh.op))
222 {
223 // Clear this otherwise the next
224 // frame will be considered final.
225 impl.rd_fh.fin = false;
226
227 // Handle ping frame
228 if(impl.rd_fh.op == detail::opcode::ping)
229 {
230 if(impl.ctrl_cb)
231 {
232 if(! cont)
233 {
234 BOOST_ASIO_CORO_YIELD
235 {
236 BOOST_ASIO_HANDLER_LOCATION((
237 __FILE__, __LINE__,
238 "websocket::async_read_some"));
239
240 const auto ex = this->get_immediate_executor();
241 net::dispatch(ex, std::move(*this));
242 }
243 BOOST_ASSERT(cont);
244 // VFALCO call check_stop_now() here?
245 }
246 }
247 {
248 auto const b = buffers_prefix(
249 clamp(impl.rd_fh.len),
250 impl.rd_buf.data());
251 auto const len = buffer_bytes(b);
252 BOOST_ASSERT(len == impl.rd_fh.len);
253 ping_data payload;
254 detail::read_ping(payload, b);
255 impl.rd_buf.consume(len);
256 // Ignore ping when closing
257 if(impl.status_ == status::closing)
258 goto loop;
259 if(impl.ctrl_cb)
260 impl.ctrl_cb(
261 frame_type::ping, to_string_view(s: payload));
262 impl.rd_fb.clear();
263 impl.template write_ping<
264 flat_static_buffer_base>(impl.rd_fb,
265 detail::opcode::pong, payload);
266 }
267
268 // Allow a close operation
269 // to acquire the read block
270 impl.rd_block.unlock(this);
271 impl.op_r_close.maybe_invoke();
272
273 // Acquire the write lock
274 if(! impl.wr_block.try_lock(this))
275 {
276 BOOST_ASIO_CORO_YIELD
277 {
278 BOOST_ASIO_HANDLER_LOCATION((
279 __FILE__, __LINE__,
280 "websocket::async_read_some"));
281
282 impl.op_rd.emplace(std::move(*this));
283 }
284 if (ec)
285 return this->complete(cont, ec, bytes_written_);
286
287 impl.wr_block.lock(this);
288 BOOST_ASIO_CORO_YIELD
289 {
290 BOOST_ASIO_HANDLER_LOCATION((
291 __FILE__, __LINE__,
292 "websocket::async_read_some"));
293
294 const auto ex = this->get_immediate_executor();
295 net::dispatch(ex, std::move(*this));
296 }
297 BOOST_ASSERT(impl.wr_block.is_locked(this));
298 if(impl.check_stop_now(ec))
299 goto upcall;
300 }
301
302 // Send pong
303 BOOST_ASSERT(impl.wr_block.is_locked(this));
304 BOOST_ASIO_CORO_YIELD
305 {
306 BOOST_ASIO_HANDLER_LOCATION((
307 __FILE__, __LINE__,
308 "websocket::async_read_some"));
309
310 net::async_write(
311 impl.stream(), net::const_buffer(impl.rd_fb.data()),
312 beast::detail::bind_continuation(std::move(*this)));
313 }
314 BOOST_ASSERT(impl.wr_block.is_locked(this));
315 if(impl.check_stop_now(ec))
316 goto upcall;
317 impl.wr_block.unlock(this);
318 impl.op_close.maybe_invoke()
319 || impl.op_idle_ping.maybe_invoke()
320 || impl.op_ping.maybe_invoke()
321 || impl.op_wr.maybe_invoke();
322 goto acquire_read_lock;
323 }
324
325 // Handle pong frame
326 if(impl.rd_fh.op == detail::opcode::pong)
327 {
328 // Ignore pong when closing
329 if(! impl.wr_close && impl.ctrl_cb)
330 {
331 if(! cont)
332 {
333 BOOST_ASIO_CORO_YIELD
334 {
335 BOOST_ASIO_HANDLER_LOCATION((
336 __FILE__, __LINE__,
337 "websocket::async_read_some"));
338
339 const auto ex = this->get_immediate_executor();
340 net::dispatch(ex, std::move(*this));
341 }
342 BOOST_ASSERT(cont);
343 }
344 }
345 auto const cb = buffers_prefix(clamp(
346 impl.rd_fh.len), impl.rd_buf.data());
347 auto const len = buffer_bytes(cb);
348 BOOST_ASSERT(len == impl.rd_fh.len);
349 ping_data payload;
350 detail::read_ping(payload, cb);
351 impl.rd_buf.consume(len);
352 // Ignore pong when closing
353 if(! impl.wr_close && impl.ctrl_cb)
354 impl.ctrl_cb(frame_type::pong, to_string_view(s: payload));
355 goto loop;
356 }
357
358 // Handle close frame
359 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
360 {
361 if(impl.ctrl_cb)
362 {
363 if(! cont)
364 {
365 BOOST_ASIO_CORO_YIELD
366 {
367 BOOST_ASIO_HANDLER_LOCATION((
368 __FILE__, __LINE__,
369 "websocket::async_read_some"));
370
371 const auto ex = this->get_immediate_executor();
372 net::dispatch(ex, std::move(*this));
373 }
374 BOOST_ASSERT(cont);
375 }
376 }
377 auto const cb = buffers_prefix(clamp(
378 impl.rd_fh.len), impl.rd_buf.data());
379 auto const len = buffer_bytes(cb);
380 BOOST_ASSERT(len == impl.rd_fh.len);
381 BOOST_ASSERT(! impl.rd_close);
382 impl.rd_close = true;
383 close_reason cr;
384 detail::read_close(cr, cb, result_);
385 if(result_)
386 {
387 // _Fail the WebSocket Connection_
388 code_ = close_code::protocol_error;
389 goto close;
390 }
391 impl.cr = cr;
392 impl.rd_buf.consume(len);
393 if(impl.ctrl_cb)
394 impl.ctrl_cb(frame_type::close,
395 to_string_view(impl.cr.reason));
396 // See if we are already closing
397 if(impl.status_ == status::closing)
398 {
399 // _Close the WebSocket Connection_
400 BOOST_ASSERT(impl.wr_close);
401 code_ = close_code::none;
402 result_ = error::closed;
403 goto close;
404 }
405 // _Start the WebSocket Closing Handshake_
406 code_ = cr.code == close_code::none ?
407 close_code::normal :
408 static_cast<close_code>(cr.code);
409 result_ = error::closed;
410 goto close;
411 }
412 }
413 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
414 {
415 // Empty non-final frame
416 goto loop;
417 }
418 impl.rd_done = false;
419 }
420 if(! impl.rd_deflated())
421 {
422 if(impl.rd_remain > 0)
423 {
424 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
425 (std::min)(clamp(impl.rd_remain),
426 buffer_bytes(cb_)))
427 {
428 // Fill the read buffer first, otherwise we
429 // get fewer bytes at the cost of one I/O.
430 BOOST_ASIO_CORO_YIELD
431 {
432 BOOST_ASIO_HANDLER_LOCATION((
433 __FILE__, __LINE__,
434 "websocket::async_read_some"));
435
436 impl.stream().async_read_some(
437 impl.rd_buf.prepare(read_size(
438 impl.rd_buf, impl.rd_buf.max_size())),
439 std::move(*this));
440 }
441 impl.rd_buf.commit(bytes_transferred);
442 if(impl.check_stop_now(ec))
443 goto upcall;
444 impl.reset_idle();
445 if(impl.rd_fh.mask)
446 detail::mask_inplace(buffers_prefix(clamp(
447 impl.rd_remain), impl.rd_buf.data()),
448 impl.rd_key);
449 }
450 if(impl.rd_buf.size() > 0)
451 {
452 // Copy from the read buffer.
453 // The mask was already applied.
454 bytes_transferred = net::buffer_copy(cb_,
455 impl.rd_buf.data(), clamp(impl.rd_remain));
456 auto const mb = buffers_prefix(
457 bytes_transferred, cb_);
458 impl.rd_remain -= bytes_transferred;
459 if(impl.rd_op == detail::opcode::text)
460 {
461 if(! impl.rd_utf8.write(mb) ||
462 (impl.rd_remain == 0 && impl.rd_fh.fin &&
463 ! impl.rd_utf8.finish()))
464 {
465 // _Fail the WebSocket Connection_
466 code_ = close_code::bad_payload;
467 result_ = error::bad_frame_payload;
468 goto close;
469 }
470 }
471 bytes_written_ += bytes_transferred;
472 impl.rd_size += bytes_transferred;
473 impl.rd_buf.consume(bytes_transferred);
474 }
475 else
476 {
477 // Read into caller's buffer
478 BOOST_ASSERT(impl.rd_remain > 0);
479 BOOST_ASSERT(buffer_bytes(cb_) > 0);
480 BOOST_ASSERT(buffer_bytes(buffers_prefix(
481 clamp(impl.rd_remain), cb_)) > 0);
482 BOOST_ASIO_CORO_YIELD
483 {
484 BOOST_ASIO_HANDLER_LOCATION((
485 __FILE__, __LINE__,
486 "websocket::async_read_some"));
487
488 impl.stream().async_read_some(buffers_prefix(
489 clamp(impl.rd_remain), cb_), std::move(*this));
490 }
491 if(impl.check_stop_now(ec))
492 goto upcall;
493 impl.reset_idle();
494 BOOST_ASSERT(bytes_transferred > 0);
495 auto const mb = buffers_prefix(
496 bytes_transferred, cb_);
497 impl.rd_remain -= bytes_transferred;
498 if(impl.rd_fh.mask)
499 detail::mask_inplace(mb, impl.rd_key);
500 if(impl.rd_op == detail::opcode::text)
501 {
502 if(! impl.rd_utf8.write(mb) ||
503 (impl.rd_remain == 0 && impl.rd_fh.fin &&
504 ! impl.rd_utf8.finish()))
505 {
506 // _Fail the WebSocket Connection_
507 code_ = close_code::bad_payload;
508 result_ = error::bad_frame_payload;
509 goto close;
510 }
511 }
512 bytes_written_ += bytes_transferred;
513 impl.rd_size += bytes_transferred;
514 }
515 }
516 BOOST_ASSERT( ! impl.rd_done );
517 if( impl.rd_remain == 0 && impl.rd_fh.fin )
518 impl.rd_done = true;
519 }
520 else
521 {
522 // Read compressed message frame payload:
523 // inflate even if rd_fh_.len == 0, otherwise we
524 // never emit the end-of-stream deflate block.
525 while(buffer_bytes(cb_) > 0)
526 {
527 if( impl.rd_remain > 0 &&
528 impl.rd_buf.size() == 0 &&
529 ! did_read_)
530 {
531 // read new
532 BOOST_ASIO_CORO_YIELD
533 {
534 BOOST_ASIO_HANDLER_LOCATION((
535 __FILE__, __LINE__,
536 "websocket::async_read_some"));
537
538 impl.stream().async_read_some(
539 impl.rd_buf.prepare(read_size(
540 impl.rd_buf, impl.rd_buf.max_size())),
541 std::move(*this));
542 }
543 if(impl.check_stop_now(ec))
544 goto upcall;
545 impl.reset_idle();
546 BOOST_ASSERT(bytes_transferred > 0);
547 impl.rd_buf.commit(bytes_transferred);
548 if(impl.rd_fh.mask)
549 detail::mask_inplace(
550 buffers_prefix(clamp(impl.rd_remain),
551 impl.rd_buf.data()), impl.rd_key);
552 did_read_ = true;
553 }
554 zlib::z_params zs;
555 {
556 auto const out = buffers_front(cb_);
557 zs.next_out = out.data();
558 zs.avail_out = out.size();
559 BOOST_ASSERT(zs.avail_out > 0);
560 }
561 // boolean to track the end of the message.
562 bool fin = false;
563 if(impl.rd_remain > 0)
564 {
565 if(impl.rd_buf.size() > 0)
566 {
567 // use what's there
568 auto const in = buffers_prefix(
569 clamp(impl.rd_remain), buffers_front(
570 impl.rd_buf.data()));
571 zs.avail_in = in.size();
572 zs.next_in = in.data();
573 }
574 else
575 {
576 break;
577 }
578 }
579 else if(impl.rd_fh.fin)
580 {
581 // append the empty block codes
582 static std::uint8_t constexpr
583 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
584 zs.next_in = empty_block;
585 zs.avail_in = sizeof(empty_block);
586 fin = true;
587 }
588 else
589 {
590 break;
591 }
592 impl.inflate(zs, zlib::Flush::sync, ec);
593 if(impl.check_stop_now(ec))
594 goto upcall;
595 if(fin && zs.total_out == 0) {
596 impl.do_context_takeover_read(impl.role);
597 impl.rd_done = true;
598 break;
599 }
600 if(impl.rd_msg_max && beast::detail::sum_exceeds(
601 impl.rd_size, zs.total_out, impl.rd_msg_max))
602 {
603 // _Fail the WebSocket Connection_
604 code_ = close_code::too_big;
605 result_ = error::message_too_big;
606 goto close;
607 }
608 cb_.consume(zs.total_out);
609 impl.rd_size += zs.total_out;
610 if (! fin) {
611 impl.rd_remain -= zs.total_in;
612 impl.rd_buf.consume(zs.total_in);
613 }
614 bytes_written_ += zs.total_out;
615 }
616 if(impl.rd_op == detail::opcode::text)
617 {
618 // check utf8
619 if(! impl.rd_utf8.write(
620 buffers_prefix(bytes_written_, bs_)) || (
621 impl.rd_done && ! impl.rd_utf8.finish()))
622 {
623 // _Fail the WebSocket Connection_
624 code_ = close_code::bad_payload;
625 result_ = error::bad_frame_payload;
626 goto close;
627 }
628 }
629 }
630 goto upcall;
631
632 close:
633 // Acquire the write lock
634 if(! impl.wr_block.try_lock(this))
635 {
636 BOOST_ASIO_CORO_YIELD
637 {
638 BOOST_ASIO_HANDLER_LOCATION((
639 __FILE__, __LINE__,
640 "websocket::async_read_some"));
641
642 impl.op_rd.emplace(std::move(*this));
643 }
644 if (ec)
645 return this->complete(cont, ec, bytes_written_);
646
647 impl.wr_block.lock(this);
648 BOOST_ASIO_CORO_YIELD
649 {
650 BOOST_ASIO_HANDLER_LOCATION((
651 __FILE__, __LINE__,
652 "websocket::async_read_some"));
653
654 const auto ex = this->get_immediate_executor();
655 net::dispatch(ex, std::move(*this));
656 }
657 BOOST_ASSERT(impl.wr_block.is_locked(this));
658 if(impl.check_stop_now(ec))
659 goto upcall;
660 }
661
662 impl.change_status(status::closing);
663
664 if(! impl.wr_close)
665 {
666 impl.wr_close = true;
667
668 // Serialize close frame
669 impl.rd_fb.clear();
670 impl.template write_close<
671 flat_static_buffer_base>(
672 impl.rd_fb, code_);
673
674 // Send close frame
675 BOOST_ASSERT(impl.wr_block.is_locked(this));
676 BOOST_ASIO_CORO_YIELD
677 {
678 BOOST_ASIO_HANDLER_LOCATION((
679 __FILE__, __LINE__,
680 "websocket::async_read_some"));
681
682 net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()),
683 beast::detail::bind_continuation(std::move(*this)));
684 }
685 BOOST_ASSERT(impl.wr_block.is_locked(this));
686 if(impl.check_stop_now(ec))
687 goto upcall;
688 }
689
690 // Teardown
691 using beast::websocket::async_teardown;
692 BOOST_ASSERT(impl.wr_block.is_locked(this));
693 BOOST_ASIO_CORO_YIELD
694 {
695 BOOST_ASIO_HANDLER_LOCATION((
696 __FILE__, __LINE__,
697 "websocket::async_read_some"));
698
699 async_teardown(impl.role, impl.stream(),
700 beast::detail::bind_continuation(std::move(*this)));
701 }
702 BOOST_ASSERT(impl.wr_block.is_locked(this));
703 if(ec == net::error::eof)
704 {
705 // Rationale:
706 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
707 ec = {};
708 }
709 if(! ec)
710 {
711 BOOST_BEAST_ASSIGN_EC(ec, result_);
712 }
713 if(ec && ec != error::closed)
714 impl.change_status(status::failed);
715 else
716 impl.change_status(status::closed);
717 impl.close();
718
719 upcall:
720 impl.rd_block.try_unlock(this);
721 impl.op_r_close.maybe_invoke();
722 if(impl.wr_block.try_unlock(this))
723 impl.op_close.maybe_invoke()
724 || impl.op_idle_ping.maybe_invoke()
725 || impl.op_ping.maybe_invoke()
726 || impl.op_wr.maybe_invoke();
727 this->complete(cont, ec, bytes_written_);
728 }
729 }
730};
731
732//------------------------------------------------------------------------------
733
734template<class NextLayer, bool deflateSupported>
735template<class Handler, class DynamicBuffer>
736class stream<NextLayer, deflateSupported>::read_op
737 : public beast::async_base<
738 Handler, beast::executor_type<stream>>
739 , public asio::coroutine
740{
741 boost::weak_ptr<impl_type> wp_;
742 DynamicBuffer& b_;
743 std::size_t limit_;
744 std::size_t bytes_written_ = 0;
745 bool some_;
746
747public:
748 template<class Handler_>
749 read_op(
750 Handler_&& h,
751 boost::shared_ptr<impl_type> const& sp,
752 DynamicBuffer& b,
753 std::size_t limit,
754 bool some)
755 : async_base<Handler,
756 beast::executor_type<stream>>(
757 std::forward<Handler_>(h),
758 sp->stream().get_executor())
759 , wp_(sp)
760 , b_(b)
761 , limit_(limit ? limit : (
762 std::numeric_limits<std::size_t>::max)())
763 , some_(some)
764 {
765 (*this)({}, 0, false);
766 }
767
768 void operator()(
769 error_code ec = {},
770 std::size_t bytes_transferred = 0,
771 bool cont = true)
772 {
773 using beast::detail::clamp;
774 auto sp = wp_.lock();
775 if(! sp)
776 {
777 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
778 bytes_written_ = 0;
779 return this->complete(cont, ec, bytes_written_);
780 }
781 auto& impl = *sp;
782 using mutable_buffers_type = typename
783 DynamicBuffer::mutable_buffers_type;
784 BOOST_ASIO_CORO_REENTER(*this)
785 {
786 do
787 {
788 // VFALCO TODO use boost::beast::bind_continuation
789 BOOST_ASIO_CORO_YIELD
790 {
791 auto mb = beast::detail::dynamic_buffer_prepare(b_,
792 clamp(impl.read_size_hint_db(b_), limit_),
793 ec, error::buffer_overflow);
794 if(impl.check_stop_now(ec))
795 goto upcall;
796
797 BOOST_ASIO_HANDLER_LOCATION((
798 __FILE__, __LINE__,
799 "websocket::async_read"));
800
801 read_some_op<read_op, mutable_buffers_type>(
802 std::move(*this), sp, *mb);
803 }
804
805 b_.commit(bytes_transferred);
806 bytes_written_ += bytes_transferred;
807 if(ec)
808 goto upcall;
809 }
810 while(! some_ && ! impl.rd_done);
811
812 upcall:
813 this->complete(cont, ec, bytes_written_);
814 }
815 }
816};
817
818template<class NextLayer, bool deflateSupported>
819struct stream<NextLayer, deflateSupported>::
820 run_read_some_op
821{
822 template<
823 class ReadHandler,
824 class MutableBufferSequence>
825 void
826 operator()(
827 ReadHandler&& h,
828 boost::shared_ptr<impl_type> const& sp,
829 MutableBufferSequence const& b)
830 {
831 // If you get an error on the following line it means
832 // that your handler does not meet the documented type
833 // requirements for the handler.
834
835 static_assert(
836 beast::detail::is_invocable<ReadHandler,
837 void(error_code, std::size_t)>::value,
838 "ReadHandler type requirements not met");
839
840 read_some_op<
841 typename std::decay<ReadHandler>::type,
842 MutableBufferSequence>(
843 std::forward<ReadHandler>(h),
844 sp,
845 b);
846 }
847};
848
849template<class NextLayer, bool deflateSupported>
850struct stream<NextLayer, deflateSupported>::
851 run_read_op
852{
853 template<
854 class ReadHandler,
855 class DynamicBuffer>
856 void
857 operator()(
858 ReadHandler&& h,
859 boost::shared_ptr<impl_type> const& sp,
860 DynamicBuffer* b,
861 std::size_t limit,
862 bool some)
863 {
864 // If you get an error on the following line it means
865 // that your handler does not meet the documented type
866 // requirements for the handler.
867
868 static_assert(
869 beast::detail::is_invocable<ReadHandler,
870 void(error_code, std::size_t)>::value,
871 "ReadHandler type requirements not met");
872
873 read_op<
874 typename std::decay<ReadHandler>::type,
875 DynamicBuffer>(
876 std::forward<ReadHandler>(h),
877 sp,
878 *b,
879 limit,
880 some);
881 }
882};
883
884//------------------------------------------------------------------------------
885
886template<class NextLayer, bool deflateSupported>
887template<class DynamicBuffer>
888std::size_t
889stream<NextLayer, deflateSupported>::
890read(DynamicBuffer& buffer)
891{
892 static_assert(is_sync_stream<next_layer_type>::value,
893 "SyncStream type requirements not met");
894 static_assert(
895 net::is_dynamic_buffer<DynamicBuffer>::value,
896 "DynamicBuffer type requirements not met");
897 error_code ec;
898 auto const bytes_written = read(buffer, ec);
899 if(ec)
900 BOOST_THROW_EXCEPTION(system_error{ec});
901 return bytes_written;
902}
903
904template<class NextLayer, bool deflateSupported>
905template<class DynamicBuffer>
906std::size_t
907stream<NextLayer, deflateSupported>::
908read(DynamicBuffer& buffer, error_code& ec)
909{
910 static_assert(is_sync_stream<next_layer_type>::value,
911 "SyncStream type requirements not met");
912 static_assert(
913 net::is_dynamic_buffer<DynamicBuffer>::value,
914 "DynamicBuffer type requirements not met");
915 std::size_t bytes_written = 0;
916 do
917 {
918 bytes_written += read_some(buffer, 0, ec);
919 if(ec)
920 return bytes_written;
921 }
922 while(! is_message_done());
923 return bytes_written;
924}
925
926template<class NextLayer, bool deflateSupported>
927template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
928BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
929stream<NextLayer, deflateSupported>::
930async_read(DynamicBuffer& buffer, ReadHandler&& handler)
931{
932 static_assert(is_async_stream<next_layer_type>::value,
933 "AsyncStream type requirements not met");
934 static_assert(
935 net::is_dynamic_buffer<DynamicBuffer>::value,
936 "DynamicBuffer type requirements not met");
937 return net::async_initiate<
938 ReadHandler,
939 void(error_code, std::size_t)>(
940 run_read_op{},
941 handler,
942 impl_,
943 &buffer,
944 0,
945 false);
946}
947
948//------------------------------------------------------------------------------
949
950template<class NextLayer, bool deflateSupported>
951template<class DynamicBuffer>
952std::size_t
953stream<NextLayer, deflateSupported>::
954read_some(
955 DynamicBuffer& buffer,
956 std::size_t limit)
957{
958 static_assert(is_sync_stream<next_layer_type>::value,
959 "SyncStream type requirements not met");
960 static_assert(
961 net::is_dynamic_buffer<DynamicBuffer>::value,
962 "DynamicBuffer type requirements not met");
963 error_code ec;
964 auto const bytes_written =
965 read_some(buffer, limit, ec);
966 if(ec)
967 BOOST_THROW_EXCEPTION(system_error{ec});
968 return bytes_written;
969}
970
971template<class NextLayer, bool deflateSupported>
972template<class DynamicBuffer>
973std::size_t
974stream<NextLayer, deflateSupported>::
975read_some(
976 DynamicBuffer& buffer,
977 std::size_t limit,
978 error_code& ec)
979{
980 static_assert(is_sync_stream<next_layer_type>::value,
981 "SyncStream type requirements not met");
982 static_assert(
983 net::is_dynamic_buffer<DynamicBuffer>::value,
984 "DynamicBuffer type requirements not met");
985 using beast::detail::clamp;
986 if(! limit)
987 limit = (std::numeric_limits<std::size_t>::max)();
988 auto const size =
989 clamp(read_size_hint(buffer), limit);
990 BOOST_ASSERT(size > 0);
991 auto mb = beast::detail::dynamic_buffer_prepare(
992 buffer, size, ec, error::buffer_overflow);
993 if(impl_->check_stop_now(ec))
994 return 0;
995 auto const bytes_written = read_some(*mb, ec);
996 buffer.commit(bytes_written);
997 return bytes_written;
998}
999
1000template<class NextLayer, bool deflateSupported>
1001template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1002BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1003stream<NextLayer, deflateSupported>::
1004async_read_some(
1005 DynamicBuffer& buffer,
1006 std::size_t limit,
1007 ReadHandler&& handler)
1008{
1009 static_assert(is_async_stream<next_layer_type>::value,
1010 "AsyncStream type requirements not met");
1011 static_assert(
1012 net::is_dynamic_buffer<DynamicBuffer>::value,
1013 "DynamicBuffer type requirements not met");
1014 return net::async_initiate<
1015 ReadHandler,
1016 void(error_code, std::size_t)>(
1017 run_read_op{},
1018 handler,
1019 impl_,
1020 &buffer,
1021 limit,
1022 true);
1023}
1024
1025//------------------------------------------------------------------------------
1026
1027template<class NextLayer, bool deflateSupported>
1028template<class MutableBufferSequence>
1029std::size_t
1030stream<NextLayer, deflateSupported>::
1031read_some(
1032 MutableBufferSequence const& buffers)
1033{
1034 static_assert(is_sync_stream<next_layer_type>::value,
1035 "SyncStream type requirements not met");
1036 static_assert(net::is_mutable_buffer_sequence<
1037 MutableBufferSequence>::value,
1038 "MutableBufferSequence type requirements not met");
1039 error_code ec;
1040 auto const bytes_written = read_some(buffers, ec);
1041 if(ec)
1042 BOOST_THROW_EXCEPTION(system_error{ec});
1043 return bytes_written;
1044}
1045
1046template<class NextLayer, bool deflateSupported>
1047template<class MutableBufferSequence>
1048std::size_t
1049stream<NextLayer, deflateSupported>::
1050read_some(
1051 MutableBufferSequence const& buffers,
1052 error_code& ec)
1053{
1054 static_assert(is_sync_stream<next_layer_type>::value,
1055 "SyncStream type requirements not met");
1056 static_assert(net::is_mutable_buffer_sequence<
1057 MutableBufferSequence>::value,
1058 "MutableBufferSequence type requirements not met");
1059 using beast::detail::clamp;
1060 auto& impl = *impl_;
1061 close_code code{};
1062 std::size_t bytes_written = 0;
1063 ec = {};
1064 // Make sure the stream is open
1065 if(impl.check_stop_now(ec))
1066 return bytes_written;
1067loop:
1068 // See if we need to read a frame header. This
1069 // condition is structured to give the decompressor
1070 // a chance to emit the final empty deflate block
1071 //
1072 if(impl.rd_remain == 0 && (
1073 ! impl.rd_fh.fin || impl.rd_done))
1074 {
1075 // Read frame header
1076 error_code result;
1077 while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
1078 {
1079 if(result)
1080 {
1081 // _Fail the WebSocket Connection_
1082 if(result == error::message_too_big)
1083 code = close_code::too_big;
1084 else
1085 code = close_code::protocol_error;
1086 do_fail(code, ev: result, ec);
1087 return bytes_written;
1088 }
1089 auto const bytes_transferred =
1090 impl.stream().read_some(
1091 impl.rd_buf.prepare(read_size(
1092 impl.rd_buf, impl.rd_buf.max_size())),
1093 ec);
1094 impl.rd_buf.commit(bytes_transferred);
1095 if(impl.check_stop_now(ec))
1096 return bytes_written;
1097 }
1098 // Immediately apply the mask to the portion
1099 // of the buffer holding payload data.
1100 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
1101 detail::mask_inplace(buffers_prefix(
1102 clamp(impl.rd_fh.len), impl.rd_buf.data()),
1103 impl.rd_key);
1104 if(detail::is_control(op: impl.rd_fh.op))
1105 {
1106 // Get control frame payload
1107 auto const b = buffers_prefix(
1108 clamp(impl.rd_fh.len), impl.rd_buf.data());
1109 auto const len = buffer_bytes(b);
1110 BOOST_ASSERT(len == impl.rd_fh.len);
1111
1112 // Clear this otherwise the next
1113 // frame will be considered final.
1114 impl.rd_fh.fin = false;
1115
1116 // Handle ping frame
1117 if(impl.rd_fh.op == detail::opcode::ping)
1118 {
1119 ping_data payload;
1120 detail::read_ping(payload, b);
1121 impl.rd_buf.consume(len);
1122 if(impl.wr_close)
1123 {
1124 // Ignore ping when closing
1125 goto loop;
1126 }
1127 if(impl.ctrl_cb)
1128 impl.ctrl_cb(frame_type::ping, to_string_view(s: payload));
1129 detail::frame_buffer fb;
1130 impl.template write_ping<flat_static_buffer_base>(fb,
1131 detail::opcode::pong, payload);
1132 net::write(impl.stream(), fb.data(), ec);
1133 if(impl.check_stop_now(ec))
1134 return bytes_written;
1135 goto loop;
1136 }
1137 // Handle pong frame
1138 if(impl.rd_fh.op == detail::opcode::pong)
1139 {
1140 ping_data payload;
1141 detail::read_ping(payload, b);
1142 impl.rd_buf.consume(len);
1143 if(impl.ctrl_cb)
1144 impl.ctrl_cb(frame_type::pong, to_string_view(s: payload));
1145 goto loop;
1146 }
1147 // Handle close frame
1148 BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
1149 {
1150 BOOST_ASSERT(! impl.rd_close);
1151 impl.rd_close = true;
1152 close_reason cr;
1153 detail::read_close(cr, b, result);
1154 if(result)
1155 {
1156 // _Fail the WebSocket Connection_
1157 do_fail(code: close_code::protocol_error,
1158 ev: result, ec);
1159 return bytes_written;
1160 }
1161 impl.cr = cr;
1162 impl.rd_buf.consume(len);
1163 if(impl.ctrl_cb)
1164 impl.ctrl_cb(frame_type::close, to_string_view(impl.cr.reason));
1165 BOOST_ASSERT(! impl.wr_close);
1166 // _Start the WebSocket Closing Handshake_
1167 do_fail(
1168 code: cr.code == close_code::none ?
1169 close_code::normal :
1170 static_cast<close_code>(cr.code),
1171 ev: error::closed, ec);
1172 return bytes_written;
1173 }
1174 }
1175 if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
1176 {
1177 // Empty non-final frame
1178 goto loop;
1179 }
1180 impl.rd_done = false;
1181 }
1182 else
1183 {
1184 ec = {};
1185 }
1186 if(! impl.rd_deflated())
1187 {
1188 if(impl.rd_remain > 0)
1189 {
1190 if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
1191 (std::min)(clamp(impl.rd_remain),
1192 buffer_bytes(buffers)))
1193 {
1194 // Fill the read buffer first, otherwise we
1195 // get fewer bytes at the cost of one I/O.
1196 impl.rd_buf.commit(impl.stream().read_some(
1197 impl.rd_buf.prepare(read_size(impl.rd_buf,
1198 impl.rd_buf.max_size())), ec));
1199 if(impl.check_stop_now(ec))
1200 return bytes_written;
1201 if(impl.rd_fh.mask)
1202 detail::mask_inplace(
1203 buffers_prefix(clamp(impl.rd_remain),
1204 impl.rd_buf.data()), impl.rd_key);
1205 }
1206 if(impl.rd_buf.size() > 0)
1207 {
1208 // Copy from the read buffer.
1209 // The mask was already applied.
1210 auto const bytes_transferred = net::buffer_copy(
1211 buffers, impl.rd_buf.data(),
1212 clamp(impl.rd_remain));
1213 auto const mb = buffers_prefix(
1214 bytes_transferred, buffers);
1215 impl.rd_remain -= bytes_transferred;
1216 if(impl.rd_op == detail::opcode::text)
1217 {
1218 if(! impl.rd_utf8.write(mb) ||
1219 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1220 ! impl.rd_utf8.finish()))
1221 {
1222 // _Fail the WebSocket Connection_
1223 do_fail(code: close_code::bad_payload,
1224 ev: error::bad_frame_payload, ec);
1225 return bytes_written;
1226 }
1227 }
1228 bytes_written += bytes_transferred;
1229 impl.rd_size += bytes_transferred;
1230 impl.rd_buf.consume(bytes_transferred);
1231 }
1232 else
1233 {
1234 // Read into caller's buffer
1235 BOOST_ASSERT(impl.rd_remain > 0);
1236 BOOST_ASSERT(buffer_bytes(buffers) > 0);
1237 BOOST_ASSERT(buffer_bytes(buffers_prefix(
1238 clamp(impl.rd_remain), buffers)) > 0);
1239 auto const bytes_transferred =
1240 impl.stream().read_some(buffers_prefix(
1241 clamp(impl.rd_remain), buffers), ec);
1242 // VFALCO What if some bytes were written?
1243 if(impl.check_stop_now(ec))
1244 return bytes_written;
1245 BOOST_ASSERT(bytes_transferred > 0);
1246 auto const mb = buffers_prefix(
1247 bytes_transferred, buffers);
1248 impl.rd_remain -= bytes_transferred;
1249 if(impl.rd_fh.mask)
1250 detail::mask_inplace(mb, impl.rd_key);
1251 if(impl.rd_op == detail::opcode::text)
1252 {
1253 if(! impl.rd_utf8.write(mb) ||
1254 (impl.rd_remain == 0 && impl.rd_fh.fin &&
1255 ! impl.rd_utf8.finish()))
1256 {
1257 // _Fail the WebSocket Connection_
1258 do_fail(code: close_code::bad_payload,
1259 ev: error::bad_frame_payload, ec);
1260 return bytes_written;
1261 }
1262 }
1263 bytes_written += bytes_transferred;
1264 impl.rd_size += bytes_transferred;
1265 }
1266 }
1267 BOOST_ASSERT( ! impl.rd_done );
1268 if( impl.rd_remain == 0 && impl.rd_fh.fin )
1269 impl.rd_done = true;
1270 }
1271 else
1272 {
1273 // Read compressed message frame payload:
1274 // inflate even if rd_fh_.len == 0, otherwise we
1275 // never emit the end-of-stream deflate block.
1276 //
1277 bool did_read = false;
1278 buffers_suffix<MutableBufferSequence> cb(buffers);
1279 while(buffer_bytes(cb) > 0)
1280 {
1281 zlib::z_params zs;
1282 {
1283 auto const out = beast::buffers_front(cb);
1284 zs.next_out = out.data();
1285 zs.avail_out = out.size();
1286 BOOST_ASSERT(zs.avail_out > 0);
1287 }
1288 // boolean to track the end of the message.
1289 bool fin = false;
1290 if(impl.rd_remain > 0)
1291 {
1292 if(impl.rd_buf.size() > 0)
1293 {
1294 // use what's there
1295 auto const in = buffers_prefix(
1296 clamp(impl.rd_remain), beast::buffers_front(
1297 impl.rd_buf.data()));
1298 zs.avail_in = in.size();
1299 zs.next_in = in.data();
1300 }
1301 else if(! did_read)
1302 {
1303 // read new
1304 auto const bytes_transferred =
1305 impl.stream().read_some(
1306 impl.rd_buf.prepare(read_size(
1307 impl.rd_buf, impl.rd_buf.max_size())),
1308 ec);
1309 if(impl.check_stop_now(ec))
1310 return bytes_written;
1311 BOOST_ASSERT(bytes_transferred > 0);
1312 impl.rd_buf.commit(bytes_transferred);
1313 if(impl.rd_fh.mask)
1314 detail::mask_inplace(
1315 buffers_prefix(clamp(impl.rd_remain),
1316 impl.rd_buf.data()), impl.rd_key);
1317 auto const in = buffers_prefix(
1318 clamp(impl.rd_remain), buffers_front(
1319 impl.rd_buf.data()));
1320 zs.avail_in = in.size();
1321 zs.next_in = in.data();
1322 did_read = true;
1323 }
1324 else
1325 {
1326 break;
1327 }
1328 }
1329 else if(impl.rd_fh.fin)
1330 {
1331 // append the empty block codes
1332 static std::uint8_t constexpr
1333 empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
1334 zs.next_in = empty_block;
1335 zs.avail_in = sizeof(empty_block);
1336 fin = true;
1337 }
1338 else
1339 {
1340 break;
1341 }
1342 impl.inflate(zs, zlib::Flush::sync, ec);
1343 if(impl.check_stop_now(ec))
1344 return bytes_written;
1345 if (fin && zs.total_out == 0) {
1346 impl.do_context_takeover_read(impl.role);
1347 impl.rd_done = true;
1348 break;
1349 }
1350 if(impl.rd_msg_max && beast::detail::sum_exceeds(
1351 impl.rd_size, zs.total_out, impl.rd_msg_max))
1352 {
1353 do_fail(code: close_code::too_big,
1354 ev: error::message_too_big, ec);
1355 return bytes_written;
1356 }
1357 cb.consume(zs.total_out);
1358 impl.rd_size += zs.total_out;
1359 if (! fin) {
1360 impl.rd_remain -= zs.total_in;
1361 impl.rd_buf.consume(zs.total_in);
1362 }
1363 bytes_written += zs.total_out;
1364 }
1365 if(impl.rd_op == detail::opcode::text)
1366 {
1367 // check utf8
1368 if(! impl.rd_utf8.write(beast::buffers_prefix(
1369 bytes_written, buffers)) || (
1370 impl.rd_done && ! impl.rd_utf8.finish()))
1371 {
1372 // _Fail the WebSocket Connection_
1373 do_fail(code: close_code::bad_payload,
1374 ev: error::bad_frame_payload, ec);
1375 return bytes_written;
1376 }
1377 }
1378 }
1379 return bytes_written;
1380}
1381
1382template<class NextLayer, bool deflateSupported>
1383template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
1384BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
1385stream<NextLayer, deflateSupported>::
1386async_read_some(
1387 MutableBufferSequence const& buffers,
1388 ReadHandler&& handler)
1389{
1390 static_assert(is_async_stream<next_layer_type>::value,
1391 "AsyncStream type requirements not met");
1392 static_assert(net::is_mutable_buffer_sequence<
1393 MutableBufferSequence>::value,
1394 "MutableBufferSequence type requirements not met");
1395 return net::async_initiate<
1396 ReadHandler,
1397 void(error_code, std::size_t)>(
1398 run_read_some_op{},
1399 handler,
1400 impl_,
1401 buffers);
1402}
1403
1404} // websocket
1405} // beast
1406} // boost
1407
1408#endif
1409

source code of boost/libs/beast/include/boost/beast/websocket/impl/read.hpp