| 1 | #pragma once |
| 2 | |
| 3 | #include <mbgl/actor/actor.hpp> |
| 4 | #include <mbgl/actor/mailbox.hpp> |
| 5 | #include <mbgl/actor/scheduler.hpp> |
| 6 | #include <mbgl/util/platform.hpp> |
| 7 | #include <mbgl/util/run_loop.hpp> |
| 8 | #include <mbgl/util/util.hpp> |
| 9 | |
| 10 | #include <cassert> |
| 11 | #include <future> |
| 12 | #include <memory> |
| 13 | #include <mutex> |
| 14 | #include <queue> |
| 15 | #include <string> |
| 16 | #include <thread> |
| 17 | #include <utility> |
| 18 | |
| 19 | namespace mbgl { |
| 20 | namespace util { |
| 21 | |
| 22 | // Manages a thread with `Object`. |
| 23 | |
| 24 | // Upon creation of this object, it launches a thread and creates an object of type `Object` |
| 25 | // in that thread. When the `Thread<>` object is destructed, the destructor waits |
| 26 | // for thread termination. The `Thread<>` constructor blocks until the thread and |
| 27 | // the `Object` are fully created, so after the object creation, it's safe to obtain the |
| 28 | // `Object` stored in this thread. The thread created will always have low priority on |
| 29 | // the platforms that support setting thread priority. |
| 30 | // |
| 31 | // The following properties make this class different from `ThreadPool`: |
| 32 | // |
| 33 | // - Only one thread is created. |
| 34 | // - `Object` will live in a single thread, providing thread affinity. |
| 35 | // - It is safe to use `ThreadLocal` in an `Object` managed by `Thread<>` |
| 36 | // - A `RunLoop` is created for the `Object` thread. |
| 37 | // - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events. |
| 38 | // |
| 39 | template<class Object> |
| 40 | class Thread { |
| 41 | public: |
| 42 | template <class... Args> |
| 43 | Thread(const std::string& name, Args&&... args) { |
| 44 | |
| 45 | std::promise<void> running_; |
| 46 | running = running_.get_future(); |
| 47 | |
| 48 | auto capturedArgs = std::make_tuple(std::forward<Args>(args)...); |
| 49 | |
| 50 | thread = std::thread([ |
| 51 | this, |
| 52 | name, |
| 53 | capturedArgs = std::move(capturedArgs), |
| 54 | runningPromise = std::move(running_) |
| 55 | ] () mutable { |
| 56 | platform::setCurrentThreadName(name); |
| 57 | platform::makeThreadLowPriority(); |
| 58 | |
| 59 | util::RunLoop loop_(util::RunLoop::Type::New); |
| 60 | loop = &loop_; |
| 61 | EstablishedActor<Object> establishedActor(loop_, object, std::move(capturedArgs)); |
| 62 | |
| 63 | runningPromise.set_value(); |
| 64 | |
| 65 | loop->run(); |
| 66 | |
| 67 | (void) establishedActor; |
| 68 | |
| 69 | loop = nullptr; |
| 70 | }); |
| 71 | } |
| 72 | |
| 73 | ~Thread() { |
| 74 | if (paused) { |
| 75 | resume(); |
| 76 | } |
| 77 | |
| 78 | std::promise<void> stoppable; |
| 79 | |
| 80 | running.wait(); |
| 81 | |
| 82 | // Invoke a noop task on the run loop to ensure that we're executing |
| 83 | // run() before we call stop() |
| 84 | loop->invoke([&] { |
| 85 | stoppable.set_value(); |
| 86 | }); |
| 87 | |
| 88 | stoppable.get_future().get(); |
| 89 | |
| 90 | loop->stop(); |
| 91 | thread.join(); |
| 92 | } |
| 93 | |
| 94 | // Returns a non-owning reference to `Object` that |
| 95 | // can be used to send messages to `Object`. It is safe |
| 96 | // to the non-owning reference to outlive this object |
| 97 | // and be used after the `Thread<>` gets destroyed. |
| 98 | ActorRef<std::decay_t<Object>> actor() { |
| 99 | return object.self(); |
| 100 | } |
| 101 | |
| 102 | // Pauses the `Object` thread. It will prevent the object to wake |
| 103 | // up from events such as timers and file descriptor I/O. Messages |
| 104 | // sent to a paused `Object` will be queued and only processed after |
| 105 | // `resume()` is called. |
| 106 | void pause() { |
| 107 | MBGL_VERIFY_THREAD(tid); |
| 108 | |
| 109 | assert(!paused); |
| 110 | |
| 111 | paused = std::make_unique<std::promise<void>>(); |
| 112 | resumed = std::make_unique<std::promise<void>>(); |
| 113 | |
| 114 | auto pausing = paused->get_future(); |
| 115 | |
| 116 | running.wait(); |
| 117 | |
| 118 | loop->invoke(RunLoop::Priority::High, [this] { |
| 119 | auto resuming = resumed->get_future(); |
| 120 | paused->set_value(); |
| 121 | resuming.get(); |
| 122 | }); |
| 123 | |
| 124 | pausing.get(); |
| 125 | } |
| 126 | |
| 127 | // Resumes the `Object` thread previously paused by `pause()`. |
| 128 | void resume() { |
| 129 | MBGL_VERIFY_THREAD(tid); |
| 130 | |
| 131 | assert(paused); |
| 132 | |
| 133 | resumed->set_value(); |
| 134 | |
| 135 | resumed.reset(); |
| 136 | paused.reset(); |
| 137 | } |
| 138 | |
| 139 | private: |
| 140 | MBGL_STORE_THREAD(tid); |
| 141 | |
| 142 | AspiringActor<Object> object; |
| 143 | |
| 144 | std::thread thread; |
| 145 | |
| 146 | std::future<void> running; |
| 147 | |
| 148 | std::unique_ptr<std::promise<void>> paused; |
| 149 | std::unique_ptr<std::promise<void>> resumed; |
| 150 | |
| 151 | util::RunLoop* loop = nullptr; |
| 152 | }; |
| 153 | |
| 154 | } // namespace util |
| 155 | } // namespace mbgl |
| 156 | |