1//
2// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3//
4// Distributed under the Boost Software License, Version 1.0. (See accompanying
5// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6//
7// Official repository: https://github.com/boostorg/beast
8//
9
10#ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
11#define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP
12
13#include <boost/beast/websocket/detail/mask.hpp>
14#include <boost/beast/core/async_base.hpp>
15#include <boost/beast/core/buffer_traits.hpp>
16#include <boost/beast/core/buffers_cat.hpp>
17#include <boost/beast/core/buffers_prefix.hpp>
18#include <boost/beast/core/buffers_range.hpp>
19#include <boost/beast/core/buffers_suffix.hpp>
20#include <boost/beast/core/flat_static_buffer.hpp>
21#include <boost/beast/core/stream_traits.hpp>
22#include <boost/beast/core/detail/bind_continuation.hpp>
23#include <boost/beast/core/detail/clamp.hpp>
24#include <boost/beast/core/detail/config.hpp>
25#include <boost/beast/websocket/detail/frame.hpp>
26#include <boost/beast/websocket/impl/stream_impl.hpp>
27#include <boost/asio/coroutine.hpp>
28#include <boost/assert.hpp>
29#include <boost/config.hpp>
30#include <boost/throw_exception.hpp>
31#include <algorithm>
32#include <memory>
33
34namespace boost {
35namespace beast {
36namespace websocket {
37
38template<class NextLayer, bool deflateSupported>
39template<class Handler, class Buffers>
40class stream<NextLayer, deflateSupported>::write_some_op
41 : public beast::async_base<
42 Handler, beast::executor_type<stream>>
43 , public asio::coroutine
44{
45 enum
46 {
47 do_nomask_nofrag,
48 do_nomask_frag,
49 do_mask_nofrag,
50 do_mask_frag,
51 do_deflate
52 };
53
54 boost::weak_ptr<impl_type> wp_;
55 buffers_suffix<Buffers> cb_;
56 detail::frame_header fh_;
57 detail::prepared_key key_;
58 std::size_t bytes_transferred_ = 0;
59 std::size_t remain_;
60 std::size_t in_;
61 int how_;
62 bool fin_;
63 bool more_ = false; // for ubsan
64 bool cont_ = false;
65
66public:
67 static constexpr int id = 2; // for soft_mutex
68
69 template<class Handler_>
70 write_some_op(
71 Handler_&& h,
72 boost::shared_ptr<impl_type> const& sp,
73 bool fin,
74 Buffers const& bs)
75 : beast::async_base<Handler,
76 beast::executor_type<stream>>(
77 std::forward<Handler_>(h),
78 sp->stream().get_executor())
79 , wp_(sp)
80 , cb_(bs)
81 , fin_(fin)
82 {
83 auto& impl = *sp;
84
85 // Set up the outgoing frame header
86 if(! impl.wr_cont)
87 {
88 impl.begin_msg(beast::buffer_bytes(bs));
89 fh_.rsv1 = impl.wr_compress;
90 }
91 else
92 {
93 fh_.rsv1 = false;
94 }
95 fh_.rsv2 = false;
96 fh_.rsv3 = false;
97 fh_.op = impl.wr_cont ?
98 detail::opcode::cont : impl.wr_opcode;
99 fh_.mask =
100 impl.role == role_type::client;
101
102 // Choose a write algorithm
103 if(impl.wr_compress)
104 {
105 how_ = do_deflate;
106 }
107 else if(! fh_.mask)
108 {
109 if(! impl.wr_frag)
110 {
111 how_ = do_nomask_nofrag;
112 }
113 else
114 {
115 BOOST_ASSERT(impl.wr_buf_size != 0);
116 remain_ = beast::buffer_bytes(cb_);
117 if(remain_ > impl.wr_buf_size)
118 how_ = do_nomask_frag;
119 else
120 how_ = do_nomask_nofrag;
121 }
122 }
123 else
124 {
125 if(! impl.wr_frag)
126 {
127 how_ = do_mask_nofrag;
128 }
129 else
130 {
131 BOOST_ASSERT(impl.wr_buf_size != 0);
132 remain_ = beast::buffer_bytes(cb_);
133 if(remain_ > impl.wr_buf_size)
134 how_ = do_mask_frag;
135 else
136 how_ = do_mask_nofrag;
137 }
138 }
139 (*this)({}, 0, false);
140 }
141
142 void operator()(
143 error_code ec = {},
144 std::size_t bytes_transferred = 0,
145 bool cont = true);
146};
147
148template<class NextLayer, bool deflateSupported>
149template<class Handler, class Buffers>
150void
151stream<NextLayer, deflateSupported>::
152write_some_op<Handler, Buffers>::
153operator()(
154 error_code ec,
155 std::size_t bytes_transferred,
156 bool cont)
157{
158 using beast::detail::clamp;
159 std::size_t n;
160 net::mutable_buffer b;
161 auto sp = wp_.lock();
162 if(! sp)
163 {
164 BOOST_BEAST_ASSIGN_EC(ec, net::error::operation_aborted);
165 bytes_transferred_ = 0;
166 return this->complete(cont, ec, bytes_transferred_);
167 }
168 auto& impl = *sp;
169 BOOST_ASIO_CORO_REENTER(*this)
170 {
171 // Acquire the write lock
172 if(! impl.wr_block.try_lock(this))
173 {
174 do_suspend:
175 BOOST_ASIO_CORO_YIELD
176 {
177 BOOST_ASIO_HANDLER_LOCATION((
178 __FILE__, __LINE__,
179 fin_ ?
180 "websocket::async_write" :
181 "websocket::async_write_some"
182 ));
183 this->set_allowed_cancellation(net::cancellation_type::all);
184 impl.op_wr.emplace(std::move(*this),
185 net::cancellation_type::all);
186 }
187 if (ec)
188 return this->complete(cont, ec, bytes_transferred_);
189
190 this->set_allowed_cancellation(net::cancellation_type::terminal);
191 impl.wr_block.lock(this);
192 BOOST_ASIO_CORO_YIELD
193 {
194 BOOST_ASIO_HANDLER_LOCATION((
195 __FILE__, __LINE__,
196 fin_ ?
197 "websocket::async_write" :
198 "websocket::async_write_some"
199 ));
200
201 const auto ex = this->get_immediate_executor();
202 net::dispatch(ex, std::move(*this));
203 }
204 BOOST_ASSERT(impl.wr_block.is_locked(this));
205 }
206 if(impl.check_stop_now(ec))
207 goto upcall;
208
209 //------------------------------------------------------------------
210
211 if(how_ == do_nomask_nofrag)
212 {
213 // send a single frame
214 fh_.fin = fin_;
215 fh_.len = beast::buffer_bytes(cb_);
216 impl.wr_fb.clear();
217 detail::write<flat_static_buffer_base>(
218 impl.wr_fb, fh_);
219 impl.wr_cont = ! fin_;
220 BOOST_ASIO_CORO_YIELD
221 {
222 BOOST_ASIO_HANDLER_LOCATION((
223 __FILE__, __LINE__,
224 fin_ ?
225 "websocket::async_write" :
226 "websocket::async_write_some"
227 ));
228
229 net::async_write(impl.stream(),
230 buffers_cat(
231 net::const_buffer(impl.wr_fb.data()),
232 net::const_buffer(0, 0),
233 cb_,
234 buffers_prefix(0, cb_)
235 ),
236 beast::detail::bind_continuation(std::move(*this)));
237 }
238 bytes_transferred_ += clamp(fh_.len);
239 if(impl.check_stop_now(ec))
240 goto upcall;
241 goto upcall;
242 }
243
244 //------------------------------------------------------------------
245
246 if(how_ == do_nomask_frag)
247 {
248 // send multiple frames
249 for(;;)
250 {
251 n = clamp(remain_, impl.wr_buf_size);
252 fh_.len = n;
253 remain_ -= n;
254 fh_.fin = fin_ ? remain_ == 0 : false;
255 impl.wr_fb.clear();
256 detail::write<flat_static_buffer_base>(
257 impl.wr_fb, fh_);
258 impl.wr_cont = ! fin_;
259 // Send frame
260 BOOST_ASIO_CORO_YIELD
261 {
262 BOOST_ASIO_HANDLER_LOCATION((
263 __FILE__, __LINE__,
264 fin_ ?
265 "websocket::async_write" :
266 "websocket::async_write_some"
267 ));
268
269 buffers_suffix<Buffers> empty_cb(cb_);
270 empty_cb.consume(~std::size_t(0));
271
272 net::async_write(impl.stream(),
273 buffers_cat(
274 net::const_buffer(impl.wr_fb.data()),
275 net::const_buffer(0, 0),
276 empty_cb,
277 buffers_prefix(clamp(fh_.len), cb_)
278 ),
279 beast::detail::bind_continuation(std::move(*this)));
280 }
281 n = clamp(fh_.len); // restore `n` on yield
282 bytes_transferred_ += n;
283 if(impl.check_stop_now(ec))
284 goto upcall;
285 if(remain_ == 0)
286 break;
287 cb_.consume(n);
288 fh_.op = detail::opcode::cont;
289
290 // Give up the write lock in between each frame
291 // so that outgoing control frames might be sent.
292 impl.wr_block.unlock(this);
293 if( impl.op_close.maybe_invoke()
294 || impl.op_idle_ping.maybe_invoke()
295 || impl.op_rd.maybe_invoke()
296 || impl.op_ping.maybe_invoke())
297 {
298 BOOST_ASSERT(impl.wr_block.is_locked());
299 goto do_suspend;
300 }
301 impl.wr_block.lock(this);
302 }
303 goto upcall;
304 }
305
306 //------------------------------------------------------------------
307
308 if(how_ == do_mask_nofrag)
309 {
310 // send a single frame using multiple writes
311 remain_ = beast::buffer_bytes(cb_);
312 fh_.fin = fin_;
313 fh_.len = remain_;
314 fh_.key = impl.create_mask();
315 detail::prepare_key(prepared&: key_, key: fh_.key);
316 impl.wr_fb.clear();
317 detail::write<flat_static_buffer_base>(
318 impl.wr_fb, fh_);
319 n = clamp(remain_, impl.wr_buf_size);
320 net::buffer_copy(net::buffer(
321 impl.wr_buf.get(), n), cb_);
322 detail::mask_inplace(net::buffer(
323 impl.wr_buf.get(), n), key_);
324 remain_ -= n;
325 impl.wr_cont = ! fin_;
326 // write frame header and some payload
327 BOOST_ASIO_CORO_YIELD
328 {
329 BOOST_ASIO_HANDLER_LOCATION((
330 __FILE__, __LINE__,
331 fin_ ?
332 "websocket::async_write" :
333 "websocket::async_write_some"
334 ));
335
336 buffers_suffix<Buffers> empty_cb(cb_);
337 empty_cb.consume(~std::size_t(0));
338
339 net::async_write(impl.stream(),
340 buffers_cat(
341 net::const_buffer(impl.wr_fb.data()),
342 net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
343 empty_cb,
344 buffers_prefix(0, empty_cb)
345 ),
346 beast::detail::bind_continuation(std::move(*this)));
347 }
348 // VFALCO What about consuming the buffer on error?
349 bytes_transferred_ +=
350 bytes_transferred - impl.wr_fb.size();
351 if(impl.check_stop_now(ec))
352 goto upcall;
353 while(remain_ > 0)
354 {
355 cb_.consume(impl.wr_buf_size);
356 n = clamp(remain_, impl.wr_buf_size);
357 net::buffer_copy(net::buffer(
358 impl.wr_buf.get(), n), cb_);
359 detail::mask_inplace(net::buffer(
360 impl.wr_buf.get(), n), key_);
361 remain_ -= n;
362 // write more payload
363 BOOST_ASIO_CORO_YIELD
364 {
365 BOOST_ASIO_HANDLER_LOCATION((
366 __FILE__, __LINE__,
367 fin_ ?
368 "websocket::async_write" :
369 "websocket::async_write_some"
370 ));
371
372 buffers_suffix<Buffers> empty_cb(cb_);
373 empty_cb.consume(~std::size_t(0));
374
375 net::async_write(impl.stream(),
376 buffers_cat(
377 net::const_buffer(0, 0),
378 net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
379 empty_cb,
380 buffers_prefix(0, empty_cb)
381 ),
382 beast::detail::bind_continuation(std::move(*this)));
383 }
384 bytes_transferred_ += bytes_transferred;
385 if(impl.check_stop_now(ec))
386 goto upcall;
387 }
388 goto upcall;
389 }
390
391 //------------------------------------------------------------------
392
393 if(how_ == do_mask_frag)
394 {
395 // send multiple frames
396 for(;;)
397 {
398 n = clamp(remain_, impl.wr_buf_size);
399 remain_ -= n;
400 fh_.len = n;
401 fh_.key = impl.create_mask();
402 fh_.fin = fin_ ? remain_ == 0 : false;
403 detail::prepare_key(prepared&: key_, key: fh_.key);
404 net::buffer_copy(net::buffer(
405 impl.wr_buf.get(), n), cb_);
406 detail::mask_inplace(net::buffer(
407 impl.wr_buf.get(), n), key_);
408 impl.wr_fb.clear();
409 detail::write<flat_static_buffer_base>(
410 impl.wr_fb, fh_);
411 impl.wr_cont = ! fin_;
412 // Send frame
413 BOOST_ASIO_CORO_YIELD
414 {
415 BOOST_ASIO_HANDLER_LOCATION((
416 __FILE__, __LINE__,
417 fin_ ?
418 "websocket::async_write" :
419 "websocket::async_write_some"
420 ));
421
422 buffers_suffix<Buffers> empty_cb(cb_);
423 empty_cb.consume(~std::size_t(0));
424
425 net::async_write(impl.stream(),
426 buffers_cat(
427 net::const_buffer(impl.wr_fb.data()),
428 net::const_buffer(net::buffer(impl.wr_buf.get(), n)),
429 empty_cb,
430 buffers_prefix(0, empty_cb)
431 ),
432 beast::detail::bind_continuation(std::move(*this)));
433 }
434 n = bytes_transferred - impl.wr_fb.size();
435 bytes_transferred_ += n;
436 if(impl.check_stop_now(ec))
437 goto upcall;
438 if(remain_ == 0)
439 break;
440 cb_.consume(n);
441 fh_.op = detail::opcode::cont;
442 // Give up the write lock in between each frame
443 // so that outgoing control frames might be sent.
444 impl.wr_block.unlock(this);
445 if( impl.op_close.maybe_invoke()
446 || impl.op_idle_ping.maybe_invoke()
447 || impl.op_rd.maybe_invoke()
448 || impl.op_ping.maybe_invoke())
449 {
450 BOOST_ASSERT(impl.wr_block.is_locked());
451 goto do_suspend;
452 }
453 impl.wr_block.lock(this);
454 }
455 goto upcall;
456 }
457
458 //------------------------------------------------------------------
459
460 if(how_ == do_deflate)
461 {
462 // send compressed frames
463 for(;;)
464 {
465 b = net::buffer(impl.wr_buf.get(),
466 impl.wr_buf_size);
467 more_ = impl.deflate(b, cb_, fin_, in_, ec);
468 if(impl.check_stop_now(ec))
469 goto upcall;
470 n = beast::buffer_bytes(b);
471 if(n == 0)
472 {
473 // The input was consumed, but there is
474 // no output due to compression latency.
475 BOOST_ASSERT(! fin_);
476 BOOST_ASSERT(beast::buffer_bytes(cb_) == 0);
477 goto upcall;
478 }
479 if(fh_.mask)
480 {
481 fh_.key = impl.create_mask();
482 detail::prepared_key key;
483 detail::prepare_key(prepared&: key, key: fh_.key);
484 detail::mask_inplace(b, key);
485 }
486 fh_.fin = ! more_;
487 fh_.len = n;
488 impl.wr_fb.clear();
489 detail::write<
490 flat_static_buffer_base>(impl.wr_fb, fh_);
491 impl.wr_cont = ! fin_;
492 // Send frame
493 BOOST_ASIO_CORO_YIELD
494 {
495 BOOST_ASIO_HANDLER_LOCATION((
496 __FILE__, __LINE__,
497 fin_ ?
498 "websocket::async_write" :
499 "websocket::async_write_some"
500 ));
501
502 buffers_suffix<Buffers> empty_cb(cb_);
503 empty_cb.consume(~std::size_t(0));
504
505 net::async_write(impl.stream(),
506 buffers_cat(
507 net::const_buffer(impl.wr_fb.data()),
508 net::const_buffer(b),
509 empty_cb,
510 buffers_prefix(0, empty_cb)
511 ),
512 beast::detail::bind_continuation(std::move(*this)));
513 }
514 bytes_transferred_ += in_;
515 if(impl.check_stop_now(ec))
516 goto upcall;
517 if(more_)
518 {
519 fh_.op = detail::opcode::cont;
520 fh_.rsv1 = false;
521 // Give up the write lock in between each frame
522 // so that outgoing control frames might be sent.
523 impl.wr_block.unlock(this);
524 if( impl.op_close.maybe_invoke()
525 || impl.op_idle_ping.maybe_invoke()
526 || impl.op_rd.maybe_invoke()
527 || impl.op_ping.maybe_invoke())
528 {
529 BOOST_ASSERT(impl.wr_block.is_locked());
530 goto do_suspend;
531 }
532 impl.wr_block.lock(this);
533 }
534 else
535 {
536 if(fh_.fin)
537 impl.do_context_takeover_write(impl.role);
538 goto upcall;
539 }
540 }
541 }
542
543 //--------------------------------------------------------------------------
544
545 upcall:
546 impl.wr_block.unlock(this);
547 impl.op_close.maybe_invoke()
548 || impl.op_idle_ping.maybe_invoke()
549 || impl.op_rd.maybe_invoke()
550 || impl.op_ping.maybe_invoke();
551 this->complete(cont, ec, bytes_transferred_);
552 }
553}
554
555template<class NextLayer, bool deflateSupported>
556struct stream<NextLayer, deflateSupported>::
557 run_write_some_op
558{
559 template<
560 class WriteHandler,
561 class ConstBufferSequence>
562 void
563 operator()(
564 WriteHandler&& h,
565 boost::shared_ptr<impl_type> const& sp,
566 bool fin,
567 ConstBufferSequence const& b)
568 {
569 // If you get an error on the following line it means
570 // that your handler does not meet the documented type
571 // requirements for the handler.
572
573 static_assert(
574 beast::detail::is_invocable<WriteHandler,
575 void(error_code, std::size_t)>::value,
576 "WriteHandler type requirements not met");
577
578 write_some_op<
579 typename std::decay<WriteHandler>::type,
580 ConstBufferSequence>(
581 std::forward<WriteHandler>(h),
582 sp,
583 fin,
584 b);
585 }
586};
587
588//------------------------------------------------------------------------------
589
590template<class NextLayer, bool deflateSupported>
591template<class ConstBufferSequence>
592std::size_t
593stream<NextLayer, deflateSupported>::
594write_some(bool fin, ConstBufferSequence const& buffers)
595{
596 static_assert(is_sync_stream<next_layer_type>::value,
597 "SyncStream type requirements not met");
598 static_assert(net::is_const_buffer_sequence<
599 ConstBufferSequence>::value,
600 "ConstBufferSequence type requirements not met");
601 error_code ec;
602 auto const bytes_transferred =
603 write_some(fin, buffers, ec);
604 if(ec)
605 BOOST_THROW_EXCEPTION(system_error{ec});
606 return bytes_transferred;
607}
608
609template<class NextLayer, bool deflateSupported>
610template<class ConstBufferSequence>
611std::size_t
612stream<NextLayer, deflateSupported>::
613write_some(bool fin,
614 ConstBufferSequence const& buffers, error_code& ec)
615{
616 static_assert(is_sync_stream<next_layer_type>::value,
617 "SyncStream type requirements not met");
618 static_assert(net::is_const_buffer_sequence<
619 ConstBufferSequence>::value,
620 "ConstBufferSequence type requirements not met");
621 using beast::detail::clamp;
622 auto& impl = *impl_;
623 std::size_t bytes_transferred = 0;
624 ec = {};
625 if(impl.check_stop_now(ec))
626 return bytes_transferred;
627 detail::frame_header fh;
628 if(! impl.wr_cont)
629 {
630 impl.begin_msg(beast::buffer_bytes(buffers));
631 fh.rsv1 = impl.wr_compress;
632 }
633 else
634 {
635 fh.rsv1 = false;
636 }
637 fh.rsv2 = false;
638 fh.rsv3 = false;
639 fh.op = impl.wr_cont ?
640 detail::opcode::cont : impl.wr_opcode;
641 fh.mask = impl.role == role_type::client;
642 auto remain = beast::buffer_bytes(buffers);
643 if(impl.wr_compress)
644 {
645
646 buffers_suffix<
647 ConstBufferSequence> cb(buffers);
648 for(;;)
649 {
650 auto b = net::buffer(
651 impl.wr_buf.get(), impl.wr_buf_size);
652 auto const more = impl.deflate(
653 b, cb, fin, bytes_transferred, ec);
654 if(impl.check_stop_now(ec))
655 return bytes_transferred;
656 auto const n = beast::buffer_bytes(b);
657 if(n == 0)
658 {
659 // The input was consumed, but there
660 // is no output due to compression
661 // latency.
662 BOOST_ASSERT(! fin);
663 BOOST_ASSERT(beast::buffer_bytes(cb) == 0);
664 fh.fin = false;
665 break;
666 }
667 if(fh.mask)
668 {
669 fh.key = this->impl_->create_mask();
670 detail::prepared_key key;
671 detail::prepare_key(prepared&: key, key: fh.key);
672 detail::mask_inplace(b, key);
673 }
674 fh.fin = ! more;
675 fh.len = n;
676 detail::fh_buffer fh_buf;
677 detail::write<
678 flat_static_buffer_base>(db&: fh_buf, fh);
679 impl.wr_cont = ! fin;
680 net::write(impl.stream(),
681 buffers_cat(fh_buf.data(), b), ec);
682 if(impl.check_stop_now(ec))
683 return bytes_transferred;
684 if(! more)
685 break;
686 fh.op = detail::opcode::cont;
687 fh.rsv1 = false;
688 }
689 if(fh.fin)
690 impl.do_context_takeover_write(impl.role);
691 }
692 else if(! fh.mask)
693 {
694 if(! impl.wr_frag)
695 {
696 // no mask, no autofrag
697 fh.fin = fin;
698 fh.len = remain;
699 detail::fh_buffer fh_buf;
700 detail::write<
701 flat_static_buffer_base>(db&: fh_buf, fh);
702 impl.wr_cont = ! fin;
703 net::write(impl.stream(),
704 buffers_cat(fh_buf.data(), buffers), ec);
705 if(impl.check_stop_now(ec))
706 return bytes_transferred;
707 bytes_transferred += remain;
708 }
709 else
710 {
711 // no mask, autofrag
712 BOOST_ASSERT(impl.wr_buf_size != 0);
713 buffers_suffix<
714 ConstBufferSequence> cb{buffers};
715 for(;;)
716 {
717 auto const n = clamp(remain, impl.wr_buf_size);
718 remain -= n;
719 fh.len = n;
720 fh.fin = fin ? remain == 0 : false;
721 detail::fh_buffer fh_buf;
722 detail::write<
723 flat_static_buffer_base>(db&: fh_buf, fh);
724 impl.wr_cont = ! fin;
725 net::write(impl.stream(),
726 beast::buffers_cat(fh_buf.data(),
727 beast::buffers_prefix(n, cb)), ec);
728 bytes_transferred += n;
729 if(impl.check_stop_now(ec))
730 return bytes_transferred;
731 if(remain == 0)
732 break;
733 fh.op = detail::opcode::cont;
734 cb.consume(n);
735 }
736 }
737 }
738 else if(! impl.wr_frag)
739 {
740 // mask, no autofrag
741 fh.fin = fin;
742 fh.len = remain;
743 fh.key = this->impl_->create_mask();
744 detail::prepared_key key;
745 detail::prepare_key(prepared&: key, key: fh.key);
746 detail::fh_buffer fh_buf;
747 detail::write<
748 flat_static_buffer_base>(db&: fh_buf, fh);
749 buffers_suffix<
750 ConstBufferSequence> cb{buffers};
751 {
752 auto const n =
753 clamp(remain, impl.wr_buf_size);
754 auto const b =
755 net::buffer(impl.wr_buf.get(), n);
756 net::buffer_copy(b, cb);
757 cb.consume(n);
758 remain -= n;
759 detail::mask_inplace(b, key);
760 impl.wr_cont = ! fin;
761 net::write(impl.stream(),
762 buffers_cat(fh_buf.data(), b), ec);
763 bytes_transferred += n;
764 if(impl.check_stop_now(ec))
765 return bytes_transferred;
766 }
767 while(remain > 0)
768 {
769 auto const n =
770 clamp(remain, impl.wr_buf_size);
771 auto const b =
772 net::buffer(impl.wr_buf.get(), n);
773 net::buffer_copy(b, cb);
774 cb.consume(n);
775 remain -= n;
776 detail::mask_inplace(b, key);
777 net::write(impl.stream(), b, ec);
778 bytes_transferred += n;
779 if(impl.check_stop_now(ec))
780 return bytes_transferred;
781 }
782 }
783 else
784 {
785 // mask, autofrag
786 BOOST_ASSERT(impl.wr_buf_size != 0);
787 buffers_suffix<
788 ConstBufferSequence> cb(buffers);
789 for(;;)
790 {
791 fh.key = this->impl_->create_mask();
792 detail::prepared_key key;
793 detail::prepare_key(prepared&: key, key: fh.key);
794 auto const n =
795 clamp(remain, impl.wr_buf_size);
796 auto const b =
797 net::buffer(impl.wr_buf.get(), n);
798 net::buffer_copy(b, cb);
799 detail::mask_inplace(b, key);
800 fh.len = n;
801 remain -= n;
802 fh.fin = fin ? remain == 0 : false;
803 impl.wr_cont = ! fh.fin;
804 detail::fh_buffer fh_buf;
805 detail::write<
806 flat_static_buffer_base>(db&: fh_buf, fh);
807 net::write(impl.stream(),
808 buffers_cat(fh_buf.data(), b), ec);
809 bytes_transferred += n;
810 if(impl.check_stop_now(ec))
811 return bytes_transferred;
812 if(remain == 0)
813 break;
814 fh.op = detail::opcode::cont;
815 cb.consume(n);
816 }
817 }
818 return bytes_transferred;
819}
820
821template<class NextLayer, bool deflateSupported>
822template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
823BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
824stream<NextLayer, deflateSupported>::
825async_write_some(bool fin,
826 ConstBufferSequence const& bs, WriteHandler&& handler)
827{
828 static_assert(is_async_stream<next_layer_type>::value,
829 "AsyncStream type requirements not met");
830 static_assert(net::is_const_buffer_sequence<
831 ConstBufferSequence>::value,
832 "ConstBufferSequence type requirements not met");
833 return net::async_initiate<
834 WriteHandler,
835 void(error_code, std::size_t)>(
836 run_write_some_op{},
837 handler,
838 impl_,
839 fin,
840 bs);
841}
842
843//------------------------------------------------------------------------------
844
845template<class NextLayer, bool deflateSupported>
846template<class ConstBufferSequence>
847std::size_t
848stream<NextLayer, deflateSupported>::
849write(ConstBufferSequence const& buffers)
850{
851 static_assert(is_sync_stream<next_layer_type>::value,
852 "SyncStream type requirements not met");
853 static_assert(net::is_const_buffer_sequence<
854 ConstBufferSequence>::value,
855 "ConstBufferSequence type requirements not met");
856 error_code ec;
857 auto const bytes_transferred = write(buffers, ec);
858 if(ec)
859 BOOST_THROW_EXCEPTION(system_error{ec});
860 return bytes_transferred;
861}
862
863template<class NextLayer, bool deflateSupported>
864template<class ConstBufferSequence>
865std::size_t
866stream<NextLayer, deflateSupported>::
867write(ConstBufferSequence const& buffers, error_code& ec)
868{
869 static_assert(is_sync_stream<next_layer_type>::value,
870 "SyncStream type requirements not met");
871 static_assert(net::is_const_buffer_sequence<
872 ConstBufferSequence>::value,
873 "ConstBufferSequence type requirements not met");
874 return write_some(true, buffers, ec);
875}
876
877template<class NextLayer, bool deflateSupported>
878template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
879BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
880stream<NextLayer, deflateSupported>::
881async_write(
882 ConstBufferSequence const& bs, WriteHandler&& handler)
883{
884 static_assert(is_async_stream<next_layer_type>::value,
885 "AsyncStream type requirements not met");
886 static_assert(net::is_const_buffer_sequence<
887 ConstBufferSequence>::value,
888 "ConstBufferSequence type requirements not met");
889 return net::async_initiate<
890 WriteHandler,
891 void(error_code, std::size_t)>(
892 run_write_some_op{},
893 handler,
894 impl_,
895 true,
896 bs);
897}
898
899} // websocket
900} // beast
901} // boost
902
903#endif
904

source code of boost/libs/beast/include/boost/beast/websocket/impl/write.hpp