• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021-2023 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_
19 
20 #include <memory>
21 #include <utility>
22 #include <future>
23 #include <iostream>
24 #include <list>
25 #include "actor/actor.h"
26 #include "actor/log.h"
27 #include "async/spinlock.h"
28 #include "async/status.h"
29 #include "async/uuid_generator.h"
30 #include "async/future_base.h"
31 #include "mindrt/include/mindrt.hpp"
32 
33 namespace mindspore {
34 template <typename T>
35 class Promise;
36 
37 template <typename T>
38 class Option;
39 
40 template <typename T>
41 class Future : public FutureBase {
42  public:
43   typedef MindrtStatus WaitForStatus;
44   typedef typename FutureData<T>::CompleteCallback CompleteCallback;
45   typedef typename FutureData<T>::AbandonedCallback AbandonedCallback;
46   typedef FutureData<T> Data;
Future()47   Future() : data(new (std::nothrow) Data()) {
48     MINDRT_OOM_EXIT(data);
49     data->abandoned = true;
50   }
51 
Future(const Future<T> & f)52   Future(const Future<T> &f) : FutureBase(f), data(f.data) {}
53 
Future(Future<T> && f)54   Future(Future<T> &&f) : data(std::move(f.data)) {}
55 
Future(const T & t)56   explicit Future(const T &t) : data(new (std::nothrow) Data()) {
57     MINDRT_OOM_EXIT(data);
58     SetValue(std::move(t));
59   }
60 
61   template <typename V>
Future(const V & value)62   explicit Future(const V &value) : data(new (std::nothrow) Data()) {
63     MINDRT_OOM_EXIT(data);
64     SetValue(value);
65   }
66 
Future(const MindrtStatus & s)67   explicit Future(const MindrtStatus &s) : data(new (std::nothrow) Data()) {
68     MINDRT_OOM_EXIT(data);
69     SetFailed(s.GetCode());
70   }
71 
Future(const std::shared_ptr<Data> & t)72   explicit Future(const std::shared_ptr<Data> &t) : data(t) {}
73 
~Future()74   ~Future() override {}
75 
76   Future<T> &operator=(const Future<T> &f) {
77     if (&f != this) {
78       data = f.data;
79     }
80     return *this;
81   }
82 
83   bool operator==(const Future<T> &f) const { return data == f.data; }
84 
85   bool operator!=(const Future<T> &f) const { return !(*this == f); }
86 
Get()87   const T &Get() const {
88     if (data->status.IsError()) {
89       MS_LOG(WARNING) << "Future::Get() but status == Error: " << GetErrorCode();
90       return data->t;
91     }
92 
93     if (data->gotten) {
94       return data->t;
95     }
96 
97     data->t = data->future.get();
98     data->gotten = true;
99 
100     return data->t;
101   }
102 
Valid()103   bool Valid() const noexcept { return data->future.valid(); }
104 
IsInit()105   bool IsInit() const { return data->status.IsInit(); }
106 
IsOK()107   bool IsOK() const { return data->status.IsOK(); }
108 
IsError()109   bool IsError() const { return data->status.IsError(); }
110 
GetStatus()111   MindrtStatus GetStatus() const { return data->status; }
112 
GetErrorCode()113   int32_t GetErrorCode() const {
114     const MindrtStatus &status_ = data->status;
115     if (status_.IsError()) {
116       return status_.GetCode();
117     }
118 
119     return 0;
120   }
121 
Wait()122   void Wait() const {
123     if (!data->status.IsInit()) {
124       return;
125     }
126 
127     data->future.wait();
128   }
129 
130   template <typename F>
OnComplete(internal::DeferredHelper<F> && deferred)131   const Future<T> &OnComplete(internal::DeferredHelper<F> &&deferred) const {
132     return OnComplete(std::move(deferred).operator std::function<void(const Future<T> &)>());
133   }
134 
135   template <typename F>
OnAbandoned(internal::DeferredHelper<F> && deferred)136   const Future<T> &OnAbandoned(internal::DeferredHelper<F> &&deferred) const {
137     return OnAbandoned(std::move(deferred).operator std::function<void(const Future<T> &)>());
138   }
139 
OnComplete(CompleteCallback && callback)140   const Future<T> &OnComplete(CompleteCallback &&callback) const {
141     bool call = false;
142 
143     data->lock.lock();
144     if (data->status.IsInit()) {
145       // using move to make callback execute once
146       data->onCompleteCallbacks.push_back(std::move(callback));
147     } else {
148       call = true;
149     }
150     data->lock.unlock();
151 
152     if (call) {
153       std::move(callback)(*this);
154     }
155 
156     return *this;
157   }
158 
OnAbandoned(AbandonedCallback && callback)159   const Future<T> &OnAbandoned(AbandonedCallback &&callback) const {
160     bool call = false;
161 
162     data->lock.lock();
163     if (data->abandoned) {
164       call = true;
165     } else if (data->status.IsInit()) {
166       // using move to make callback execute once
167       data->onAbandonedCallbacks.push_back(std::move(callback));
168     }
169     data->lock.unlock();
170 
171     if (call) {
172       std::move(callback)(*this);
173     }
174 
175     return *this;
176   }
177 
SetValue(T && t)178   void SetValue(T &&t) const { return Set(std::move(t)); }
179 
SetValue(const T & t)180   void SetValue(const T &t) const { return Set(t); }
181 
SetOK()182   void SetOK() const {
183     bool call = false;
184 
185     data->lock.lock();
186     if (data->status.IsInit()) {
187       data->status.SetOK();
188       data->promise.set_value(T());
189       call = true;
190     }
191     data->lock.unlock();
192 
193     if (call) {
194       RunCallbacks();
195     }
196   }
197 
SetFailed(int32_t errCode)198   void SetFailed(int32_t errCode) const {
199     MINDRT_ASSERT(errCode != MindrtStatus::KINIT && errCode != MindrtStatus::KOK);
200 
201     bool call = false;
202 
203     data->lock.lock();
204     if (data->status.IsInit()) {
205       data->status.SetCode(errCode);
206       data->promise.set_value(T());
207       call = true;
208     }
209     data->lock.unlock();
210 
211     if (call) {
212       RunCallbacks();
213     }
214   }
215 
216   // remove all callbacks
Clear()217   void Clear() const { data->Clear(); }
218 
219   void Abandon(bool abandon = false) const {
220     bool call = false;
221 
222     std::list<AbandonedCallback> callbacks;
223     data->lock.lock();
224     if (!data->abandoned && data->status.IsInit() && (!data->associated || abandon)) {
225       call = data->abandoned = true;
226       callbacks.swap(data->onAbandonedCallbacks);
227     }
228     data->lock.unlock();
229 
230     if (call) {
231       internal::Run(std::move(callbacks), *this);
232     }
233   }
234 
235   template <typename R>
Then(const std::function<Future<R> (const T &)> & f)236   Future<R> Then(const std::function<Future<R>(const T &)> &f) const {
237     std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
238     MINDRT_OOM_EXIT(promise);
239     Future<R> future = promise->GetFuture();
240 
241     std::function<void(const Future<T> &)> handler =
242       std::bind(&internal::Thenf<T, R>, f, promise, std::placeholders::_1);
243 
244     OnComplete(std::move(handler));
245 
246     return future;
247   }
248 
249   template <typename R>
Then(const std::function<R (const T &)> & f)250   Future<R> Then(const std::function<R(const T &)> &f) const {
251     std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>());
252     MINDRT_OOM_EXIT(promise);
253     Future<R> future = promise->GetFuture();
254 
255     std::function<void(const Future<T> &)> handler =
256       std::bind(&internal::Then<T, R>, f, promise, std::placeholders::_1);
257 
258     OnComplete(std::move(handler));
259 
260     return future;
261   }
262 
263   template <typename R>
Then(const std::function<Future<R> ()> & f)264   Future<R> Then(const std::function<Future<R>()> &f) const {
265     return Then(std::function<Future<R>(const T &)>(std::bind(f)));
266   }
267 
268   template <typename R>
Then(const std::function<R ()> & f)269   Future<R> Then(const std::function<R()> &f) const {
270     return Then(std::function<R(const T &)>(std::bind(f)));
271   }
272 
273   template <typename F>
274   auto Then(F &&f) const -> decltype(this->Then(std::forward<F>(f), FutureBase())) {
275     return Then(std::forward<F>(f), FutureBase());
276   }
277 
278   template <typename F>
OnComplete(F && f)279   const Future<T> &OnComplete(F &&f) const {
280     return OnComplete(std::forward<F>(f), FutureBase());
281   }
282 
283   template <typename F>
OnAbandoned(F && f)284   const Future<T> &OnAbandoned(F &&f) const {
285     return OnAbandoned(std::forward<F>(f), FutureBase());
286   }
287 
288   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
Then(internal::DeferredHelper<F> && f,FutureBase)289   Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const {
290     return Then<R>(std::move(f).operator std::function<Future<R>(const T &)>());
291   }
292 
293  private:
294   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if<
295                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type>
Then(internal::DeferredHelper<F> && f,LessFuture)296   Future<R> Then(internal::DeferredHelper<F> &&f, LessFuture) const {
297     return Then<R>(std::move(f).operator std::function<Future<R>()>());
298   }
299 
300   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
Then(F && f,FutureBase)301   Future<R> Then(F &&f, FutureBase) const {
302     return Then<R>(std::function<Future<R>(const T &)>(f));
303   }
304 
305   template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if<
306                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type>
Then(F && f,LessFuture)307   Future<R> Then(F &&f, LessFuture) const {
308     return Then<R>(std::function<Future<R>()>(std::forward<F>(f)));
309   }
310 
311   template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type>
OnComplete(F && f,FutureBase)312   const Future<T> &OnComplete(F &&f, FutureBase) const {
313     return OnComplete(
314       std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); }));
315   }
316 
317   template <typename F, typename = typename std::result_of<typename std::enable_if<
318                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>
OnComplete(F && f,LessFuture)319   const Future<T> &OnComplete(F &&f, LessFuture) const {
320     return OnComplete(std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); }));
321   }
322 
323   template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type>
OnAbandoned(F && f,FutureBase)324   const Future<T> &OnAbandoned(F &&f, FutureBase) const {
325     return OnAbandoned(
326       std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); }));
327   }
328 
329   template <typename F, typename = typename std::result_of<typename std::enable_if<
330                           !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>
OnAbandoned(F && f,LessFuture)331   const Future<T> &OnAbandoned(F &&f, LessFuture) const {
332     return OnAbandoned(
333       std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); }));
334   }
335 
RunCallbacks()336   void RunCallbacks() const {
337     std::shared_ptr<typename Future<T>::Data> copy = data;
338     internal::Run(std::move(copy->onCompleteCallbacks), Future<T>(copy));
339     copy->Clear();
340   }
341 
Run()342   void Run() const {
343     auto iter = data->onCompleteCallbacks.begin();
344     for (; iter != data->onCompleteCallbacks.end(); ++iter) {
345       (*iter)(*this);
346     }
347   }
348 
349   template <typename V>
Set(V && value)350   void Set(V &&value) const {
351     bool call = false;
352 
353     data->lock.lock();
354     if (data->status.IsInit()) {
355       data->status.SetOK();
356       data->promise.set_value(std::forward<V>(value));
357       call = true;
358     }
359     data->lock.unlock();
360 
361     if (call) {
362       RunCallbacks();
363     }
364   }
365 
366   template <typename V>
367   friend class Future;
368   friend class Promise<T>;
369 
370   std::shared_ptr<Data> data;
371 };
372 
373 template <typename T>
374 class Promise {
375  public:
Promise()376   Promise() : future() { future.data->abandoned = false; }
377 
Promise(const T & t)378   explicit Promise(const T &t) : future(t) {}
379 
~Promise()380   virtual ~Promise() {
381     if (future.data) {
382       future.Abandon();
383     }
384   }
385 
SetValue(const T & value)386   void SetValue(const T &value) const { Set(value); }
387 
SetValue(T && value)388   void SetValue(T &&value) const { Set(std::move(value)); }
389 
SetValue(const Future<T> & tFuture)390   void SetValue(const Future<T> &tFuture) const { Associate(tFuture); }
391 
SetFailed(int32_t code)392   void SetFailed(int32_t code) const {
393     if (!future.data->associated) {
394       future.SetFailed(code);
395     }
396   }
397 
GetFuture()398   Future<T> GetFuture() const { return future; }
399 
Associate(const Future<T> & f)400   void Associate(const Future<T> &f) const {
401     bool associated = false;
402 
403     future.data->lock.lock();
404     if (future.IsInit() && !future.data->associated) {
405       associated = (future.data->associated = true);
406     }
407     future.data->lock.unlock();
408 
409     if (associated) {
410       f.OnComplete(std::bind(&internal::Complete<T>, future, std::placeholders::_1))
411         .OnAbandoned(std::bind(&internal::Abandon<T>, future, true));
412     }
413   }
414 
415  private:
416   template <typename V>
Set(V && value)417   void Set(V &&value) const {
418     if (future.IsInit() && !future.data->associated) {
419       future.SetValue(std::forward<V>(value));
420     }
421   }
422 
423   template <typename V>
424   friend class Future;
425 
426   Future<T> future;
427 };
428 
429 template <>
430 class Promise<void>;
431 
432 template <typename T>
433 class Promise<T &>;
434 };  // namespace mindspore
435 
436 #endif
437