• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
19 
20 #include <memory>
21 #include <utility>
22 #include <future>
23 #include <iostream>
24 #include <list>
25 #include "actor/actor.h"
26 #include "actor/log.h"
27 #include "async/spinlock.h"
28 #include "async/status.h"
29 #include "async/uuid_generator.h"
30 #include "async/future_base.h"
31 #include "mindrt/include/mindrt.hpp"
32 
33 namespace mindspore {
34 
35 template <typename T>
36 class Promise;
37 
38 template <typename T>
39 class Option;
40 
41 template <typename T>
42 class Future : public FutureBase {
43  public:
44   typedef MindrtStatus WaitForStatus;
45   typedef typename FutureData<T>::CompleteCallback CompleteCallback;
46   typedef typename FutureData<T>::AbandonedCallback AbandonedCallback;
47   typedef FutureData<T> Data;
Future()48   Future() : data(new (std::nothrow) Data()) {
49     MINDRT_OOM_EXIT(data);
50     data->abandoned = true;
51   }
52 
Future(const Future<T> & f)53   Future(const Future<T> &f) : FutureBase(f), data(f.data) {}
54 
Future(Future<T> && f)55   Future(Future<T> &&f) : data(std::move(f.data)) {}
56 
Future(const T & t)57   explicit Future(const T &t) : data(new (std::nothrow) Data()) {
58     MINDRT_OOM_EXIT(data);
59     SetValue(std::move(t));
60   }
61 
62   template <typename V>
Future(const V & value)63   explicit Future(const V &value) : data(new (std::nothrow) Data()) {
64     MINDRT_OOM_EXIT(data);
65     SetValue(value);
66   }
67 
Future(const MindrtStatus & s)68   explicit Future(const MindrtStatus &s) : data(new (std::nothrow) Data()) {
69     MINDRT_OOM_EXIT(data);
70     SetFailed(s.GetCode());
71   }
72 
Future(const std::shared_ptr<Data> & t)73   explicit Future(const std::shared_ptr<Data> &t) : data(t) {}
74 
~Future()75   ~Future() override {}
76 
77   Future<T> &operator=(const Future<T> &f) {
78     if (&f != this) {
79       data = f.data;
80     }
81     return *this;
82   }
83 
84   bool operator==(const Future<T> &f) const { return data == f.data; }
85 
86   bool operator!=(const Future<T> &f) const { return !(*this == f); }
87 
Get()88   const T &Get() const {
89     if (data->status.IsError()) {
90       MS_LOG(WARNING) << "Future::Get() but status == Error: " << GetErrorCode();
91       return data->t;
92     }
93 
94     if (data->gotten) {
95       return data->t;
96     }
97 
98     //        try {
99     data->t = data->future.get();
100     data->gotten = true;
101     //        } catch (std::future_error const &e) {
102     //            ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Future error: %s",
103     //            "%s",
104     //                                e.what());
105     //        } catch (std::exception const &e) {
106     //            ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Standard exception:
107     //            %s",
108     //                                "%s", e.what());
109     //        } catch (...) {
110     //            ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Unknown exception.");
111     //        }
112 
113     return data->t;
114   }
115 
Valid()116   bool Valid() const noexcept { return data->future.valid(); }
117 
IsInit()118   bool IsInit() const { return data->status.IsInit(); }
119 
IsOK()120   bool IsOK() const { return data->status.IsOK(); }
121 
IsError()122   bool IsError() const { return data->status.IsError(); }
123 
GetStatus()124   MindrtStatus GetStatus() const { return data->status; }
125 
GetErrorCode()126   int32_t GetErrorCode() const {
127     const MindrtStatus &status_ = data->status;
128     if (status_.IsError()) {
129       return status_.GetCode();
130     }
131 
132     return 0;
133   }
134 
Wait()135   void Wait() const {
136     if (!data->status.IsInit()) {
137       return;
138     }
139 
140     data->future.wait();
141   }
142 
143   template <typename F>
OnComplete(internal::DeferredHelper<F> && deferred)144   const Future<T> &OnComplete(internal::DeferredHelper<F> &&deferred) const {
145     return OnComplete(std::move(deferred).operator std::function<void(const Future<T> &)>());
146   }
147 
148   template <typename F>
OnAbandoned(internal::DeferredHelper<F> && deferred)149   const Future<T> &OnAbandoned(internal::DeferredHelper<F> &&deferred) const {
150     return OnAbandoned(std::move(deferred).operator std::function<void(const Future<T> &)>());
151   }
152 
OnComplete(CompleteCallback && callback)153   const Future<T> &OnComplete(CompleteCallback &&callback) const {
154     bool call = false;
155 
156     data->lock.Lock();
157     if (data->status.IsInit()) {
158       // using move to make callback execute once
159       data->onCompleteCallbacks.push_back(std::move(callback));
160     } else {
161       call = true;
162     }
163     data->lock.Unlock();
164 
165     if (call) {
166       std::move(callback)(*this);
167     }
168 
169     return *this;
170   }
171 
OnAbandoned(AbandonedCallback && callback)172   const Future<T> &OnAbandoned(AbandonedCallback &&callback) const {
173     bool call = false;
174 
175     data->lock.Lock();
176     if (data->abandoned) {
177       call = true;
178     } else if (data->status.IsInit()) {
179       // using move to make callback execute once
180       data->onAbandonedCallbacks.push_back(std::move(callback));
181     }
182     data->lock.Unlock();
183 
184     if (call) {
185       std::move(callback)(*this);
186     }
187 
188     return *this;
189   }
190 
SetValue(T && t)191   void SetValue(T &&t) const { return Set(std::move(t)); }
192 
SetValue(const T & t)193   void SetValue(const T &t) const { return Set(t); }
194 
SetOK()195   void SetOK() const {
196     bool call = false;
197 
198     data->lock.Lock();
199     if (data->status.IsInit()) {
200       data->status.SetOK();
201       data->promise.set_value(T());
202       call = true;
203     }
204     data->lock.Unlock();
205 
206     if (call) {
207       RunCallbacks();
208     }
209   }
210 
SetFailed(int32_t errCode)211   void SetFailed(int32_t errCode) const {
212     MINDRT_ASSERT(errCode != MindrtStatus::KINIT && errCode != MindrtStatus::KOK);
213 
214     bool call = false;
215 
216     data->lock.Lock();
217     if (data->status.IsInit()) {
218       data->status.SetCode(errCode);
219       data->promise.set_value(T());
220       call = true;
221     }
222     data->lock.Unlock();
223 
224     if (call) {
225       RunCallbacks();
226     }
227   }
228 
229   // remove all callbacks
Clear()230   void Clear() const { data->Clear(); }
231 
232   void Abandon(bool abandon = false) const {
233     bool call = false;
234 
235     std::list<AbandonedCallback> callbacks;
236     data->lock.Lock();
237     if (!data->abandoned && data->status.IsInit() && (!data->associated || abandon)) {
238       call = data->abandoned = true;
239       callbacks.swap(data->onAbandonedCallbacks);
240     }
241     data->lock.Unlock();
242 
243     if (call) {
244       internal::Run(std::move(callbacks), *this);
245     }
246   }
247 
248   template <typename R>
Then(const std::function<Future<R> (const T &)> & f)249   Future<R> Then(const std::function<Future<R>(const T &)> &f) const {
250     std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
251     MINDRT_OOM_EXIT(promise);
252     Future<R> future = promise->GetFuture();
253 
254     std::function<void(const Future<T> &)> handler =
255       std::bind(&internal::Thenf<T, R>, f, promise, std::placeholders::_1);
256 
257     OnComplete(std::move(handler));
258 
259     return future;
260   }
261 
262   template <typename R>
Then(const std::function<R (const T &)> & f)263   Future<R> Then(const std::function<R(const T &)> &f) const {
264     std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
265     MINDRT_OOM_EXIT(promise);
266     Future<R> future = promise->GetFuture();
267 
268     std::function<void(const Future<T> &)> handler =
269       std::bind(&internal::Then<T, R>, f, promise, std::placeholders::_1);
270 
271     OnComplete(std::move(handler));
272 
273     return future;
274   }
275 
276   template <typename R>
Then(const std::function<Future<R> ()> & f)277   Future<R> Then(const std::function<Future<R>()> &f) const {
278     return Then(std::function<Future<R>(const T &)>(std::bind(f)));
279   }
280 
281   template <typename R>
Then(const std::function<R ()> & f)282   Future<R> Then(const std::function<R()> &f) const {
283     return Then(std::function<R(const T &)>(std::bind(f)));
284   }
285 
286   template <typename F>
287   auto Then(F &&f) const -> decltype(this->Then(std::forward<F>(f), FutureBase())) {
288     return Then(std::forward<F>(f), FutureBase());
289   }
290 
291   template <typename F>
OnComplete(F && f)292   const Future<T> &OnComplete(F &&f) const {
293     return OnComplete(std::forward<F>(f), FutureBase());
294   }
295 
296   template <typename F>
OnAbandoned(F && f)297   const Future<T> &OnAbandoned(F &&f) const {
298     return OnAbandoned(std::forward<F>(f), FutureBase());
299   }
300 
301   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
Then(internal::DeferredHelper<F> && f,FutureBase)302   Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const {
303     return Then<R>(std::move(f).operator std::function<Future<R>(const T &)>());
304   }
305 
306  private:
307   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if<
308                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type>
Then(internal::DeferredHelper<F> && f,LessFuture)309   Future<R> Then(internal::DeferredHelper<F> &&f, LessFuture) const {
310     return Then<R>(std::move(f).operator std::function<Future<R>()>());
311   }
312 
313   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
Then(F && f,FutureBase)314   Future<R> Then(F &&f, FutureBase) const {
315     return Then<R>(std::function<Future<R>(const T &)>(f));
316   }
317 
318   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if<
319                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type>
Then(F && f,LessFuture)320   Future<R> Then(F &&f, LessFuture) const {
321     return Then<R>(std::function<Future<R>()>(std::forward<F>(f)));
322   }
323 
324   template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type>
OnComplete(F && f,FutureBase)325   const Future<T> &OnComplete(F &&f, FutureBase) const {
326     return OnComplete(
327       std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); }));
328   }
329 
330   template <typename F, typename = typename std::result_of<typename std::enable_if<
331                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>
OnComplete(F && f,LessFuture)332   const Future<T> &OnComplete(F &&f, LessFuture) const {
333     return OnComplete(std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); }));
334   }
335 
336   template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type>
OnAbandoned(F && f,FutureBase)337   const Future<T> &OnAbandoned(F &&f, FutureBase) const {
338     return OnAbandoned(
339       std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); }));
340   }
341 
342   template <typename F, typename = typename std::result_of<typename std::enable_if<
343                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>
OnAbandoned(F && f,LessFuture)344   const Future<T> &OnAbandoned(F &&f, LessFuture) const {
345     return OnAbandoned(
346       std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); }));
347   }
348 
RunCallbacks()349   void RunCallbacks() const {
350     std::shared_ptr<typename Future<T>::Data> copy = data;
351     internal::Run(std::move(copy->onCompleteCallbacks), Future<T>(copy));
352     copy->Clear();
353   }
354 
Run()355   void Run() const {
356     auto iter = data->onCompleteCallbacks.begin();
357     for (; iter != data->onCompleteCallbacks.end(); ++iter) {
358       (*iter)(*this);
359     }
360   }
361 
362   template <typename V>
Set(V && value)363   void Set(V &&value) const {
364     bool call = false;
365 
366     data->lock.Lock();
367     if (data->status.IsInit()) {
368       data->status.SetOK();
369       data->promise.set_value(std::forward<V>(value));
370       call = true;
371     }
372     data->lock.Unlock();
373 
374     if (call) {
375       RunCallbacks();
376     }
377   }
378 
379   template <typename V>
380   friend class Future;
381   friend class Promise<T>;
382 
383   std::shared_ptr<Data> data;
384 };
385 
386 template <typename T>
387 class Promise {
388  public:
Promise()389   Promise() : future() { future.data->abandoned = false; }
390 
Promise(const T & t)391   explicit Promise(const T &t) : future(t) {}
392 
~Promise()393   virtual ~Promise() {
394     //        try {
395     if (future.data) {
396       future.Abandon();
397     }
398     //        } catch (...) {
399     //        }
400   }
401 
SetValue(const T & value)402   void SetValue(const T &value) const { Set(value); }
403 
SetValue(T && value)404   void SetValue(T &&value) const { Set(std::move(value)); }
405 
SetValue(const Future<T> & tFuture)406   void SetValue(const Future<T> &tFuture) const { Associate(tFuture); }
407 
SetFailed(int32_t code)408   void SetFailed(int32_t code) const {
409     if (!future.data->associated) {
410       future.SetFailed(code);
411     }
412   }
413 
GetFuture()414   Future<T> GetFuture() const { return future; }
415 
Associate(const Future<T> & f)416   void Associate(const Future<T> &f) const {
417     bool associated = false;
418 
419     future.data->lock.Lock();
420     if (future.IsInit() && !future.data->associated) {
421       associated = (future.data->associated = true);
422     }
423     future.data->lock.Unlock();
424 
425     if (associated) {
426       f.OnComplete(std::bind(&internal::Complete<T>, future, std::placeholders::_1))
427         .OnAbandoned(std::bind(&internal::Abandon<T>, future, true));
428     }
429   }
430 
431  private:
432   template <typename V>
Set(V && value)433   void Set(V &&value) const {
434     if (future.IsInit() && !future.data->associated) {
435       future.SetValue(std::forward<V>(value));
436     }
437   }
438 
439   template <typename V>
440   friend class Future;
441 
442   Future<T> future;
443 };
444 
445 template <>
446 class Promise<void>;
447 
448 template <typename T>
449 class Promise<T &>;
450 
451 };  // namespace mindspore
452 
453 #endif
454