1 /** 2 * Copyright 2021 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_ 18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_ 19 20 #include <memory> 21 #include <utility> 22 #include <future> 23 #include <iostream> 24 #include <list> 25 #include "actor/actor.h" 26 #include "actor/log.h" 27 #include "async/spinlock.h" 28 #include "async/status.h" 29 #include "async/uuid_generator.h" 30 #include "async/future_base.h" 31 #include "mindrt/include/mindrt.hpp" 32 33 namespace mindspore { 34 35 template <typename T> 36 class Promise; 37 38 template <typename T> 39 class Option; 40 41 template <typename T> 42 class Future : public FutureBase { 43 public: 44 typedef MindrtStatus WaitForStatus; 45 typedef typename FutureData<T>::CompleteCallback CompleteCallback; 46 typedef typename FutureData<T>::AbandonedCallback AbandonedCallback; 47 typedef FutureData<T> Data; Future()48 Future() : data(new (std::nothrow) Data()) { 49 MINDRT_OOM_EXIT(data); 50 data->abandoned = true; 51 } 52 Future(const Future<T> & f)53 Future(const Future<T> &f) : FutureBase(f), data(f.data) {} 54 Future(Future<T> && f)55 Future(Future<T> &&f) : data(std::move(f.data)) {} 56 Future(const T & t)57 explicit Future(const T &t) : data(new (std::nothrow) Data()) { 58 MINDRT_OOM_EXIT(data); 59 SetValue(std::move(t)); 60 } 61 62 template <typename V> Future(const V & value)63 explicit Future(const V &value) : data(new (std::nothrow) Data()) { 64 MINDRT_OOM_EXIT(data); 65 SetValue(value); 66 } 67 Future(const MindrtStatus & s)68 explicit Future(const MindrtStatus &s) : data(new (std::nothrow) Data()) { 69 MINDRT_OOM_EXIT(data); 70 SetFailed(s.GetCode()); 71 } 72 Future(const std::shared_ptr<Data> & t)73 explicit Future(const std::shared_ptr<Data> &t) : data(t) {} 74 ~Future()75 ~Future() override {} 76 77 Future<T> &operator=(const Future<T> &f) { 78 if (&f != this) { 79 data = f.data; 80 } 81 return *this; 82 } 83 84 bool operator==(const Future<T> &f) const { return data == f.data; } 85 86 bool operator!=(const Future<T> &f) const { return !(*this == f); } 87 Get()88 const T &Get() const { 89 if (data->status.IsError()) { 90 MS_LOG(WARNING) << "Future::Get() but status == Error: " << GetErrorCode(); 91 return data->t; 92 } 93 94 if (data->gotten) { 95 return data->t; 96 } 97 98 // try { 99 data->t = data->future.get(); 100 data->gotten = true; 101 // } catch (std::future_error const &e) { 102 // ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Future error: %s", 103 // "%s", 104 // e.what()); 105 // } catch (std::exception const &e) { 106 // ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Standard exception: 107 // %s", 108 // "%s", e.what()); 109 // } catch (...) { 110 // ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_MINDRT_LOG, "Unknown exception."); 111 // } 112 113 return data->t; 114 } 115 Valid()116 bool Valid() const noexcept { return data->future.valid(); } 117 IsInit()118 bool IsInit() const { return data->status.IsInit(); } 119 IsOK()120 bool IsOK() const { return data->status.IsOK(); } 121 IsError()122 bool IsError() const { return data->status.IsError(); } 123 GetStatus()124 MindrtStatus GetStatus() const { return data->status; } 125 GetErrorCode()126 int32_t GetErrorCode() const { 127 const MindrtStatus &status_ = data->status; 128 if (status_.IsError()) { 129 return status_.GetCode(); 130 } 131 132 return 0; 133 } 134 Wait()135 void Wait() const { 136 if (!data->status.IsInit()) { 137 return; 138 } 139 140 data->future.wait(); 141 } 142 143 template <typename F> OnComplete(internal::DeferredHelper<F> && deferred)144 const Future<T> &OnComplete(internal::DeferredHelper<F> &&deferred) const { 145 return OnComplete(std::move(deferred).operator std::function<void(const Future<T> &)>()); 146 } 147 148 template <typename F> OnAbandoned(internal::DeferredHelper<F> && deferred)149 const Future<T> &OnAbandoned(internal::DeferredHelper<F> &&deferred) const { 150 return OnAbandoned(std::move(deferred).operator std::function<void(const Future<T> &)>()); 151 } 152 OnComplete(CompleteCallback && callback)153 const Future<T> &OnComplete(CompleteCallback &&callback) const { 154 bool call = false; 155 156 data->lock.Lock(); 157 if (data->status.IsInit()) { 158 // using move to make callback execute once 159 data->onCompleteCallbacks.push_back(std::move(callback)); 160 } else { 161 call = true; 162 } 163 data->lock.Unlock(); 164 165 if (call) { 166 std::move(callback)(*this); 167 } 168 169 return *this; 170 } 171 OnAbandoned(AbandonedCallback && callback)172 const Future<T> &OnAbandoned(AbandonedCallback &&callback) const { 173 bool call = false; 174 175 data->lock.Lock(); 176 if (data->abandoned) { 177 call = true; 178 } else if (data->status.IsInit()) { 179 // using move to make callback execute once 180 data->onAbandonedCallbacks.push_back(std::move(callback)); 181 } 182 data->lock.Unlock(); 183 184 if (call) { 185 std::move(callback)(*this); 186 } 187 188 return *this; 189 } 190 SetValue(T && t)191 void SetValue(T &&t) const { return Set(std::move(t)); } 192 SetValue(const T & t)193 void SetValue(const T &t) const { return Set(t); } 194 SetOK()195 void SetOK() const { 196 bool call = false; 197 198 data->lock.Lock(); 199 if (data->status.IsInit()) { 200 data->status.SetOK(); 201 data->promise.set_value(T()); 202 call = true; 203 } 204 data->lock.Unlock(); 205 206 if (call) { 207 RunCallbacks(); 208 } 209 } 210 SetFailed(int32_t errCode)211 void SetFailed(int32_t errCode) const { 212 MINDRT_ASSERT(errCode != MindrtStatus::KINIT && errCode != MindrtStatus::KOK); 213 214 bool call = false; 215 216 data->lock.Lock(); 217 if (data->status.IsInit()) { 218 data->status.SetCode(errCode); 219 data->promise.set_value(T()); 220 call = true; 221 } 222 data->lock.Unlock(); 223 224 if (call) { 225 RunCallbacks(); 226 } 227 } 228 229 // remove all callbacks Clear()230 void Clear() const { data->Clear(); } 231 232 void Abandon(bool abandon = false) const { 233 bool call = false; 234 235 std::list<AbandonedCallback> callbacks; 236 data->lock.Lock(); 237 if (!data->abandoned && data->status.IsInit() && (!data->associated || abandon)) { 238 call = data->abandoned = true; 239 callbacks.swap(data->onAbandonedCallbacks); 240 } 241 data->lock.Unlock(); 242 243 if (call) { 244 internal::Run(std::move(callbacks), *this); 245 } 246 } 247 248 template <typename R> Then(const std::function<Future<R> (const T &)> & f)249 Future<R> Then(const std::function<Future<R>(const T &)> &f) const { 250 std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>()); 251 MINDRT_OOM_EXIT(promise); 252 Future<R> future = promise->GetFuture(); 253 254 std::function<void(const Future<T> &)> handler = 255 std::bind(&internal::Thenf<T, R>, f, promise, std::placeholders::_1); 256 257 OnComplete(std::move(handler)); 258 259 return future; 260 } 261 262 template <typename R> Then(const std::function<R (const T &)> & f)263 Future<R> Then(const std::function<R(const T &)> &f) const { 264 std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>()); 265 MINDRT_OOM_EXIT(promise); 266 Future<R> future = promise->GetFuture(); 267 268 std::function<void(const Future<T> &)> handler = 269 std::bind(&internal::Then<T, R>, f, promise, std::placeholders::_1); 270 271 OnComplete(std::move(handler)); 272 273 return future; 274 } 275 276 template <typename R> Then(const std::function<Future<R> ()> & f)277 Future<R> Then(const std::function<Future<R>()> &f) const { 278 return Then(std::function<Future<R>(const T &)>(std::bind(f))); 279 } 280 281 template <typename R> Then(const std::function<R ()> & f)282 Future<R> Then(const std::function<R()> &f) const { 283 return Then(std::function<R(const T &)>(std::bind(f))); 284 } 285 286 template <typename F> 287 auto Then(F &&f) const -> decltype(this->Then(std::forward<F>(f), FutureBase())) { 288 return Then(std::forward<F>(f), FutureBase()); 289 } 290 291 template <typename F> OnComplete(F && f)292 const Future<T> &OnComplete(F &&f) const { 293 return OnComplete(std::forward<F>(f), FutureBase()); 294 } 295 296 template <typename F> OnAbandoned(F && f)297 const Future<T> &OnAbandoned(F &&f) const { 298 return OnAbandoned(std::forward<F>(f), FutureBase()); 299 } 300 301 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type> Then(internal::DeferredHelper<F> && f,FutureBase)302 Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const { 303 return Then<R>(std::move(f).operator std::function<Future<R>(const T &)>()); 304 } 305 306 private: 307 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if< 308 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type> Then(internal::DeferredHelper<F> && f,LessFuture)309 Future<R> Then(internal::DeferredHelper<F> &&f, LessFuture) const { 310 return Then<R>(std::move(f).operator std::function<Future<R>()>()); 311 } 312 313 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type> Then(F && f,FutureBase)314 Future<R> Then(F &&f, FutureBase) const { 315 return Then<R>(std::function<Future<R>(const T &)>(f)); 316 } 317 318 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if< 319 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type> Then(F && f,LessFuture)320 Future<R> Then(F &&f, LessFuture) const { 321 return Then<R>(std::function<Future<R>()>(std::forward<F>(f))); 322 } 323 324 template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type> OnComplete(F && f,FutureBase)325 const Future<T> &OnComplete(F &&f, FutureBase) const { 326 return OnComplete( 327 std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); })); 328 } 329 330 template <typename F, typename = typename std::result_of<typename std::enable_if< 331 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type> OnComplete(F && f,LessFuture)332 const Future<T> &OnComplete(F &&f, LessFuture) const { 333 return OnComplete(std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); })); 334 } 335 336 template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type> OnAbandoned(F && f,FutureBase)337 const Future<T> &OnAbandoned(F &&f, FutureBase) const { 338 return OnAbandoned( 339 std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); })); 340 } 341 342 template <typename F, typename = typename std::result_of<typename std::enable_if< 343 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type> OnAbandoned(F && f,LessFuture)344 const Future<T> &OnAbandoned(F &&f, LessFuture) const { 345 return OnAbandoned( 346 std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); })); 347 } 348 RunCallbacks()349 void RunCallbacks() const { 350 std::shared_ptr<typename Future<T>::Data> copy = data; 351 internal::Run(std::move(copy->onCompleteCallbacks), Future<T>(copy)); 352 copy->Clear(); 353 } 354 Run()355 void Run() const { 356 auto iter = data->onCompleteCallbacks.begin(); 357 for (; iter != data->onCompleteCallbacks.end(); ++iter) { 358 (*iter)(*this); 359 } 360 } 361 362 template <typename V> Set(V && value)363 void Set(V &&value) const { 364 bool call = false; 365 366 data->lock.Lock(); 367 if (data->status.IsInit()) { 368 data->status.SetOK(); 369 data->promise.set_value(std::forward<V>(value)); 370 call = true; 371 } 372 data->lock.Unlock(); 373 374 if (call) { 375 RunCallbacks(); 376 } 377 } 378 379 template <typename V> 380 friend class Future; 381 friend class Promise<T>; 382 383 std::shared_ptr<Data> data; 384 }; 385 386 template <typename T> 387 class Promise { 388 public: Promise()389 Promise() : future() { future.data->abandoned = false; } 390 Promise(const T & t)391 explicit Promise(const T &t) : future(t) {} 392 ~Promise()393 virtual ~Promise() { 394 // try { 395 if (future.data) { 396 future.Abandon(); 397 } 398 // } catch (...) { 399 // } 400 } 401 SetValue(const T & value)402 void SetValue(const T &value) const { Set(value); } 403 SetValue(T && value)404 void SetValue(T &&value) const { Set(std::move(value)); } 405 SetValue(const Future<T> & tFuture)406 void SetValue(const Future<T> &tFuture) const { Associate(tFuture); } 407 SetFailed(int32_t code)408 void SetFailed(int32_t code) const { 409 if (!future.data->associated) { 410 future.SetFailed(code); 411 } 412 } 413 GetFuture()414 Future<T> GetFuture() const { return future; } 415 Associate(const Future<T> & f)416 void Associate(const Future<T> &f) const { 417 bool associated = false; 418 419 future.data->lock.Lock(); 420 if (future.IsInit() && !future.data->associated) { 421 associated = (future.data->associated = true); 422 } 423 future.data->lock.Unlock(); 424 425 if (associated) { 426 f.OnComplete(std::bind(&internal::Complete<T>, future, std::placeholders::_1)) 427 .OnAbandoned(std::bind(&internal::Abandon<T>, future, true)); 428 } 429 } 430 431 private: 432 template <typename V> Set(V && value)433 void Set(V &&value) const { 434 if (future.IsInit() && !future.data->associated) { 435 future.SetValue(std::forward<V>(value)); 436 } 437 } 438 439 template <typename V> 440 friend class Future; 441 442 Future<T> future; 443 }; 444 445 template <> 446 class Promise<void>; 447 448 template <typename T> 449 class Promise<T &>; 450 451 }; // namespace mindspore 452 453 #endif 454