1 | #pragma once |
2 | |
3 | #include <mbgl/actor/actor.hpp> |
4 | #include <mbgl/actor/mailbox.hpp> |
5 | #include <mbgl/actor/scheduler.hpp> |
6 | #include <mbgl/util/platform.hpp> |
7 | #include <mbgl/util/run_loop.hpp> |
8 | #include <mbgl/util/util.hpp> |
9 | |
10 | #include <cassert> |
11 | #include <future> |
12 | #include <memory> |
13 | #include <mutex> |
14 | #include <queue> |
15 | #include <string> |
16 | #include <thread> |
17 | #include <utility> |
18 | |
19 | namespace mbgl { |
20 | namespace util { |
21 | |
22 | // Manages a thread with `Object`. |
23 | |
24 | // Upon creation of this object, it launches a thread and creates an object of type `Object` |
25 | // in that thread. When the `Thread<>` object is destructed, the destructor waits |
26 | // for thread termination. The `Thread<>` constructor blocks until the thread and |
27 | // the `Object` are fully created, so after the object creation, it's safe to obtain the |
28 | // `Object` stored in this thread. The thread created will always have low priority on |
29 | // the platforms that support setting thread priority. |
30 | // |
31 | // The following properties make this class different from `ThreadPool`: |
32 | // |
33 | // - Only one thread is created. |
34 | // - `Object` will live in a single thread, providing thread affinity. |
35 | // - It is safe to use `ThreadLocal` in an `Object` managed by `Thread<>` |
36 | // - A `RunLoop` is created for the `Object` thread. |
37 | // - `Object` can use `Timer` and do asynchronous I/O, like wait for sockets events. |
38 | // |
39 | template<class Object> |
40 | class Thread { |
41 | public: |
42 | template <class... Args> |
43 | Thread(const std::string& name, Args&&... args) { |
44 | |
45 | std::promise<void> running_; |
46 | running = running_.get_future(); |
47 | |
48 | auto capturedArgs = std::make_tuple(std::forward<Args>(args)...); |
49 | |
50 | thread = std::thread([ |
51 | this, |
52 | name, |
53 | capturedArgs = std::move(capturedArgs), |
54 | runningPromise = std::move(running_) |
55 | ] () mutable { |
56 | platform::setCurrentThreadName(name); |
57 | platform::makeThreadLowPriority(); |
58 | |
59 | util::RunLoop loop_(util::RunLoop::Type::New); |
60 | loop = &loop_; |
61 | EstablishedActor<Object> establishedActor(loop_, object, std::move(capturedArgs)); |
62 | |
63 | runningPromise.set_value(); |
64 | |
65 | loop->run(); |
66 | |
67 | (void) establishedActor; |
68 | |
69 | loop = nullptr; |
70 | }); |
71 | } |
72 | |
73 | ~Thread() { |
74 | if (paused) { |
75 | resume(); |
76 | } |
77 | |
78 | std::promise<void> stoppable; |
79 | |
80 | running.wait(); |
81 | |
82 | // Invoke a noop task on the run loop to ensure that we're executing |
83 | // run() before we call stop() |
84 | loop->invoke([&] { |
85 | stoppable.set_value(); |
86 | }); |
87 | |
88 | stoppable.get_future().get(); |
89 | |
90 | loop->stop(); |
91 | thread.join(); |
92 | } |
93 | |
94 | // Returns a non-owning reference to `Object` that |
95 | // can be used to send messages to `Object`. It is safe |
96 | // to the non-owning reference to outlive this object |
97 | // and be used after the `Thread<>` gets destroyed. |
98 | ActorRef<std::decay_t<Object>> actor() { |
99 | return object.self(); |
100 | } |
101 | |
102 | // Pauses the `Object` thread. It will prevent the object to wake |
103 | // up from events such as timers and file descriptor I/O. Messages |
104 | // sent to a paused `Object` will be queued and only processed after |
105 | // `resume()` is called. |
106 | void pause() { |
107 | MBGL_VERIFY_THREAD(tid); |
108 | |
109 | assert(!paused); |
110 | |
111 | paused = std::make_unique<std::promise<void>>(); |
112 | resumed = std::make_unique<std::promise<void>>(); |
113 | |
114 | auto pausing = paused->get_future(); |
115 | |
116 | running.wait(); |
117 | |
118 | loop->invoke(RunLoop::Priority::High, [this] { |
119 | auto resuming = resumed->get_future(); |
120 | paused->set_value(); |
121 | resuming.get(); |
122 | }); |
123 | |
124 | pausing.get(); |
125 | } |
126 | |
127 | // Resumes the `Object` thread previously paused by `pause()`. |
128 | void resume() { |
129 | MBGL_VERIFY_THREAD(tid); |
130 | |
131 | assert(paused); |
132 | |
133 | resumed->set_value(); |
134 | |
135 | resumed.reset(); |
136 | paused.reset(); |
137 | } |
138 | |
139 | private: |
140 | MBGL_STORE_THREAD(tid); |
141 | |
142 | AspiringActor<Object> object; |
143 | |
144 | std::thread thread; |
145 | |
146 | std::future<void> running; |
147 | |
148 | std::unique_ptr<std::promise<void>> paused; |
149 | std::unique_ptr<std::promise<void>> resumed; |
150 | |
151 | util::RunLoop* loop = nullptr; |
152 | }; |
153 | |
154 | } // namespace util |
155 | } // namespace mbgl |
156 | |