1// This file is part of OpenCV project.
2// It is subject to the license terms in the LICENSE file found in the top-level directory
3// of this distribution and at http://opencv.org/license.html.
4
5#include "precomp.hpp"
6#include "opencv2/core/async.hpp"
7#include "opencv2/core/detail/async_promise.hpp"
8
9#include "opencv2/core/cvstd.hpp"
10
11#include <opencv2/core/utils/logger.defines.hpp>
12#undef CV_LOG_STRIP_LEVEL
13#define CV_LOG_STRIP_LEVEL CV_LOG_LEVEL_DEBUG + 1
14#include <opencv2/core/utils/logger.hpp>
15
16#ifndef OPENCV_DISABLE_THREAD_SUPPORT
17
18#include <mutex>
19#include <condition_variable>
20#include <chrono>
21
22namespace cv {
23
24/**
25Manages shared state of asynchronous result
26*/
27struct AsyncArray::Impl
28{
29 int refcount;
30 void addrefFuture() CV_NOEXCEPT { CV_XADD(&refcount_future, 1); CV_XADD(&refcount, 1); } \
31 void releaseFuture() CV_NOEXCEPT { CV_XADD(&refcount_future, -1); if(1 == CV_XADD(&refcount, -1)) delete this; } \
32 int refcount_future;
33 void addrefPromise() CV_NOEXCEPT { CV_XADD(&refcount_promise, 1); CV_XADD(&refcount, 1); } \
34 void releasePromise() CV_NOEXCEPT { CV_XADD(&refcount_promise, -1); if(1 == CV_XADD(&refcount, -1)) delete this; } \
35 int refcount_promise;
36
37 mutable std::mutex mtx;
38 mutable std::condition_variable cond_var;
39
40 mutable bool has_result; // Mat, UMat or exception
41
42 mutable cv::Ptr<Mat> result_mat;
43 mutable cv::Ptr<UMat> result_umat;
44
45
46 bool has_exception;
47#if CV__EXCEPTION_PTR
48 std::exception_ptr exception;
49#endif
50 cv::Exception cv_exception;
51
52 mutable bool result_is_fetched;
53
54 bool future_is_returned;
55
56 Impl()
57 : refcount(1), refcount_future(0), refcount_promise(1)
58 , has_result(false)
59 , has_exception(false)
60 , result_is_fetched(false)
61 , future_is_returned(false)
62 {
63 // nothing
64 }
65
66 ~Impl()
67 {
68 if (has_result && !result_is_fetched)
69 {
70 CV_LOG_INFO(NULL, "Asynchronous result has not been fetched");
71 }
72 }
73
74 bool get(OutputArray dst, int64 timeoutNs) const
75 {
76 CV_Assert(!result_is_fetched);
77 if (!has_result)
78 {
79 if(refcount_promise == 0)
80 CV_Error(Error::StsInternal, "Asynchronous result producer has been destroyed");
81 if (!wait_for(timeoutNs))
82 return false;
83 }
84 std::unique_lock<std::mutex> lock(mtx);
85 if (has_result)
86 {
87 if (!result_mat.empty())
88 {
89 dst.move(m&: *result_mat.get());
90 result_mat.release();
91 result_is_fetched = true;
92 return true;
93 }
94 if (!result_umat.empty())
95 {
96 dst.move(u&: *result_umat.get());
97 result_umat.release();
98 result_is_fetched = true;
99 return true;
100 }
101#if CV__EXCEPTION_PTR
102 if (has_exception && exception)
103 {
104 result_is_fetched = true;
105 std::rethrow_exception(exception);
106 }
107#endif
108 if (has_exception)
109 {
110 result_is_fetched = true;
111 throw cv_exception;
112 }
113 CV_Error(Error::StsInternal, "AsyncArray: invalid state of 'has_result = true'");
114 }
115 CV_Assert(!has_result);
116 CV_Assert(timeoutNs < 0);
117 return false;
118 }
119
120 bool valid() const CV_NOEXCEPT
121 {
122 if (result_is_fetched)
123 return false;
124 if (refcount_promise == 0 && !has_result)
125 return false;
126 return true;
127 }
128
129 bool wait_for(int64 timeoutNs) const
130 {
131 CV_Assert(valid());
132 if (has_result)
133 return has_result;
134 if (timeoutNs == 0)
135 return has_result;
136 CV_LOG_INFO(NULL, "Waiting for async result ...");
137 std::unique_lock<std::mutex> lock(mtx);
138 const auto cond_pred = [&]{ return has_result == true; };
139 if (timeoutNs > 0)
140 return cond_var.wait_for(lock&: lock, rtime: std::chrono::nanoseconds(timeoutNs), p: cond_pred);
141 else
142 {
143 cond_var.wait(lock&: lock, p: cond_pred);
144 CV_Assert(has_result);
145 return true;
146 }
147 }
148
149 AsyncArray getArrayResult()
150 {
151 CV_Assert(refcount_future == 0);
152 AsyncArray result;
153 addrefFuture();
154 result.p = this;
155 future_is_returned = true;
156 return result;
157 }
158
159 void setValue(InputArray value)
160 {
161 if (future_is_returned && refcount_future == 0)
162 CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
163 std::unique_lock<std::mutex> lock(mtx);
164 CV_Assert(!has_result);
165 int k = value.kind();
166 if (k == _InputArray::UMAT)
167 {
168 result_umat = makePtr<UMat>();
169 value.copyTo(arr: *result_umat.get());
170 }
171 else
172 {
173 result_mat = makePtr<Mat>();
174 value.copyTo(arr: *result_mat.get());
175 }
176 has_result = true;
177 cond_var.notify_all();
178 }
179
180#if CV__EXCEPTION_PTR
181 void setException(std::exception_ptr e)
182 {
183 if (future_is_returned && refcount_future == 0)
184 CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
185 std::unique_lock<std::mutex> lock(mtx);
186 CV_Assert(!has_result);
187 has_exception = true;
188 exception = e;
189 has_result = true;
190 cond_var.notify_all();
191 }
192#endif
193
194 void setException(const cv::Exception e)
195 {
196 if (future_is_returned && refcount_future == 0)
197 CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
198 std::unique_lock<std::mutex> lock(mtx);
199 CV_Assert(!has_result);
200 has_exception = true;
201 cv_exception = e;
202 has_result = true;
203 cond_var.notify_all();
204 }
205};
206
207} // namespace
208
209#else // OPENCV_DISABLE_THREAD_SUPPORT
210
211namespace cv {
212
213// no threading
214struct AsyncArray::Impl
215{
216 int refcount;
217 void addrefFuture() CV_NOEXCEPT { refcount_future++; refcount++; }
218 void releaseFuture() CV_NOEXCEPT { refcount_future--; if (0 == --refcount) delete this; }
219 int refcount_future;
220 void addrefPromise() CV_NOEXCEPT { refcount_promise++; refcount++; } \
221 void releasePromise() CV_NOEXCEPT { refcount_promise--; if (0 == --refcount) delete this; }
222 int refcount_promise;
223
224 mutable bool has_result; // Mat, UMat or exception
225
226 mutable cv::Ptr<Mat> result_mat;
227 mutable cv::Ptr<UMat> result_umat;
228
229
230 bool has_exception;
231#if CV__EXCEPTION_PTR
232 std::exception_ptr exception;
233#endif
234 cv::Exception cv_exception;
235
236 mutable bool result_is_fetched;
237
238 bool future_is_returned;
239
240 Impl()
241 : refcount(1), refcount_future(0), refcount_promise(1)
242 , has_result(false)
243 , has_exception(false)
244 , result_is_fetched(false)
245 , future_is_returned(false)
246 {
247 // nothing
248 }
249
250 ~Impl()
251 {
252 if (has_result && !result_is_fetched)
253 {
254 CV_LOG_INFO(NULL, "Asynchronous result has not been fetched");
255 }
256 }
257
258 bool get(OutputArray dst, int64 timeoutNs) const
259 {
260 CV_Assert(!result_is_fetched);
261 if (!has_result)
262 {
263 CV_UNUSED(timeoutNs);
264 CV_Error(Error::StsError, "Result is not produced (unable to wait for result in OPENCV_DISABLE_THREAD_SUPPORT mode)");
265 }
266 if (!result_mat.empty())
267 {
268 dst.move(*result_mat.get());
269 result_mat.release();
270 result_is_fetched = true;
271 return true;
272 }
273 if (!result_umat.empty())
274 {
275 dst.move(*result_umat.get());
276 result_umat.release();
277 result_is_fetched = true;
278 return true;
279 }
280#if CV__EXCEPTION_PTR
281 if (has_exception && exception)
282 {
283 result_is_fetched = true;
284 std::rethrow_exception(exception);
285 }
286#endif
287 if (has_exception)
288 {
289 result_is_fetched = true;
290 throw cv_exception;
291 }
292 CV_Error(Error::StsInternal, "AsyncArray: invalid state of 'has_result = true'");
293 return false;
294 }
295
296 bool valid() const CV_NOEXCEPT
297 {
298 if (result_is_fetched)
299 return false;
300 if (refcount_promise == 0 && !has_result)
301 return false;
302 return true;
303 }
304
305 bool wait_for(int64 timeoutNs) const
306 {
307 CV_Assert(valid());
308 if (has_result)
309 return has_result;
310 if (timeoutNs == 0)
311 return has_result;
312 CV_Error(Error::StsError, "Unable to wait in OPENCV_DISABLE_THREAD_SUPPORT mode");
313 }
314
315 AsyncArray getArrayResult()
316 {
317 CV_Assert(refcount_future == 0);
318 AsyncArray result;
319 addrefFuture();
320 result.p = this;
321 future_is_returned = true;
322 return result;
323 }
324
325 void setValue(InputArray value)
326 {
327 if (future_is_returned && refcount_future == 0)
328 CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
329 CV_Assert(!has_result);
330 int k = value.kind();
331 if (k == _InputArray::UMAT)
332 {
333 result_umat = makePtr<UMat>();
334 value.copyTo(*result_umat.get());
335 }
336 else
337 {
338 result_mat = makePtr<Mat>();
339 value.copyTo(*result_mat.get());
340 }
341 has_result = true;
342 }
343
344#if CV__EXCEPTION_PTR
345 void setException(std::exception_ptr e)
346 {
347 if (future_is_returned && refcount_future == 0)
348 CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
349 CV_Assert(!has_result);
350 has_exception = true;
351 exception = e;
352 has_result = true;
353 }
354#endif
355
356 void setException(const cv::Exception e)
357 {
358 if (future_is_returned && refcount_future == 0)
359 CV_Error(Error::StsError, "Associated AsyncArray has been destroyed");
360 CV_Assert(!has_result);
361 has_exception = true;
362 cv_exception = e;
363 has_result = true;
364 }
365};
366
367}
368
369#endif // OPENCV_DISABLE_THREAD_SUPPORT
370
371namespace cv {
372
373AsyncArray::AsyncArray() CV_NOEXCEPT
374 : p(NULL)
375{
376}
377
378AsyncArray::~AsyncArray() CV_NOEXCEPT
379{
380 release();
381}
382
383AsyncArray::AsyncArray(const AsyncArray& o) CV_NOEXCEPT
384 : p(o.p)
385{
386 if (p)
387 p->addrefFuture();
388}
389
390AsyncArray& AsyncArray::operator=(const AsyncArray& o) CV_NOEXCEPT
391{
392 Impl* newp = o.p;
393 if (newp)
394 newp->addrefFuture();
395 release();
396 p = newp;
397 return *this;
398}
399
400void AsyncArray::release() CV_NOEXCEPT
401{
402 Impl* impl = p;
403 p = NULL;
404 if (impl)
405 impl->releaseFuture();
406}
407
408bool AsyncArray::get(OutputArray dst, int64 timeoutNs) const
409{
410 CV_Assert(p);
411 return p->get(dst, timeoutNs);
412}
413
414void AsyncArray::get(OutputArray dst) const
415{
416 CV_Assert(p);
417 bool res = p->get(dst, timeoutNs: -1);
418 CV_Assert(res);
419}
420
421bool AsyncArray::wait_for(int64 timeoutNs) const
422{
423 CV_Assert(p);
424 return p->wait_for(timeoutNs);
425}
426
427bool AsyncArray::valid() const CV_NOEXCEPT
428{
429 if (!p) return false;
430 return p->valid();
431}
432
433
434//
435// AsyncPromise
436//
437
438AsyncPromise::AsyncPromise() CV_NOEXCEPT
439 : p(new AsyncArray::Impl())
440{
441}
442
443AsyncPromise::~AsyncPromise() CV_NOEXCEPT
444{
445 release();
446}
447
448AsyncPromise::AsyncPromise(const AsyncPromise& o) CV_NOEXCEPT
449 : p(o.p)
450{
451 if (p)
452 p->addrefPromise();
453}
454
455AsyncPromise& AsyncPromise::operator=(const AsyncPromise& o) CV_NOEXCEPT
456{
457 Impl* newp = o.p;
458 if (newp)
459 newp->addrefPromise();
460 release();
461 p = newp;
462 return *this;
463}
464
465void AsyncPromise::release() CV_NOEXCEPT
466{
467 Impl* impl = p;
468 p = NULL;
469 if (impl)
470 impl->releasePromise();
471}
472
473AsyncArray AsyncPromise::getArrayResult()
474{
475 CV_Assert(p);
476 return p->getArrayResult();
477}
478
479void AsyncPromise::setValue(InputArray value)
480{
481 CV_Assert(p);
482 return p->setValue(value);
483}
484
485void AsyncPromise::setException(const cv::Exception& exception)
486{
487 CV_Assert(p);
488 return p->setException(exception);
489}
490
491#if CV__EXCEPTION_PTR
492void AsyncPromise::setException(std::exception_ptr exception)
493{
494 CV_Assert(p);
495 return p->setException(exception);
496}
497#endif
498
499} // namespace
500

source code of opencv/modules/core/src/async.cpp