1 /** 2 * Copyright 2021-2023 Huawei Technologies Co., Ltd 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_ 18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H_ 19 20 #include <memory> 21 #include <utility> 22 #include <future> 23 #include <iostream> 24 #include <list> 25 #include "actor/actor.h" 26 #include "actor/log.h" 27 #include "async/spinlock.h" 28 #include "async/status.h" 29 #include "async/uuid_generator.h" 30 #include "async/future_base.h" 31 #include "mindrt/include/mindrt.hpp" 32 33 namespace mindspore { 34 template <typename T> 35 class Promise; 36 37 template <typename T> 38 class Option; 39 40 template <typename T> 41 class Future : public FutureBase { 42 public: 43 typedef MindrtStatus WaitForStatus; 44 typedef typename FutureData<T>::CompleteCallback CompleteCallback; 45 typedef typename FutureData<T>::AbandonedCallback AbandonedCallback; 46 typedef FutureData<T> Data; Future()47 Future() : data(new (std::nothrow) Data()) { 48 MINDRT_OOM_EXIT(data); 49 data->abandoned = true; 50 } 51 Future(const Future<T> & f)52 Future(const Future<T> &f) : FutureBase(f), data(f.data) {} 53 Future(Future<T> && f)54 Future(Future<T> &&f) : data(std::move(f.data)) {} 55 Future(const T & t)56 explicit Future(const T &t) : data(new (std::nothrow) Data()) { 57 MINDRT_OOM_EXIT(data); 58 SetValue(std::move(t)); 59 } 60 61 template <typename V> Future(const V & value)62 explicit Future(const V &value) : data(new (std::nothrow) Data()) { 63 MINDRT_OOM_EXIT(data); 64 SetValue(value); 65 } 66 Future(const MindrtStatus & s)67 explicit Future(const MindrtStatus &s) : data(new (std::nothrow) Data()) { 68 MINDRT_OOM_EXIT(data); 69 SetFailed(s.GetCode()); 70 } 71 Future(const std::shared_ptr<Data> & t)72 explicit Future(const std::shared_ptr<Data> &t) : data(t) {} 73 ~Future()74 ~Future() override {} 75 76 Future<T> &operator=(const Future<T> &f) { 77 if (&f != this) { 78 data = f.data; 79 } 80 return *this; 81 } 82 83 bool operator==(const Future<T> &f) const { return data == f.data; } 84 85 bool operator!=(const Future<T> &f) const { return !(*this == f); } 86 Get()87 const T &Get() const { 88 if (data->status.IsError()) { 89 MS_LOG(WARNING) << "Future::Get() but status == Error: " << GetErrorCode(); 90 return data->t; 91 } 92 93 if (data->gotten) { 94 return data->t; 95 } 96 97 data->t = data->future.get(); 98 data->gotten = true; 99 100 return data->t; 101 } 102 Valid()103 bool Valid() const noexcept { return data->future.valid(); } 104 IsInit()105 bool IsInit() const { return data->status.IsInit(); } 106 IsOK()107 bool IsOK() const { return data->status.IsOK(); } 108 IsError()109 bool IsError() const { return data->status.IsError(); } 110 GetStatus()111 MindrtStatus GetStatus() const { return data->status; } 112 GetErrorCode()113 int32_t GetErrorCode() const { 114 const MindrtStatus &status_ = data->status; 115 if (status_.IsError()) { 116 return status_.GetCode(); 117 } 118 119 return 0; 120 } 121 Wait()122 void Wait() const { 123 if (!data->status.IsInit()) { 124 return; 125 } 126 127 data->future.wait(); 128 } 129 130 template <typename F> OnComplete(internal::DeferredHelper<F> && deferred)131 const Future<T> &OnComplete(internal::DeferredHelper<F> &&deferred) const { 132 return OnComplete(std::move(deferred).operator std::function<void(const Future<T> &)>()); 133 } 134 135 template <typename F> OnAbandoned(internal::DeferredHelper<F> && deferred)136 const Future<T> &OnAbandoned(internal::DeferredHelper<F> &&deferred) const { 137 return OnAbandoned(std::move(deferred).operator std::function<void(const Future<T> &)>()); 138 } 139 OnComplete(CompleteCallback && callback)140 const Future<T> &OnComplete(CompleteCallback &&callback) const { 141 bool call = false; 142 143 data->lock.lock(); 144 if (data->status.IsInit()) { 145 // using move to make callback execute once 146 data->onCompleteCallbacks.push_back(std::move(callback)); 147 } else { 148 call = true; 149 } 150 data->lock.unlock(); 151 152 if (call) { 153 std::move(callback)(*this); 154 } 155 156 return *this; 157 } 158 OnAbandoned(AbandonedCallback && callback)159 const Future<T> &OnAbandoned(AbandonedCallback &&callback) const { 160 bool call = false; 161 162 data->lock.lock(); 163 if (data->abandoned) { 164 call = true; 165 } else if (data->status.IsInit()) { 166 // using move to make callback execute once 167 data->onAbandonedCallbacks.push_back(std::move(callback)); 168 } 169 data->lock.unlock(); 170 171 if (call) { 172 std::move(callback)(*this); 173 } 174 175 return *this; 176 } 177 SetValue(T && t)178 void SetValue(T &&t) const { return Set(std::move(t)); } 179 SetValue(const T & t)180 void SetValue(const T &t) const { return Set(t); } 181 SetOK()182 void SetOK() const { 183 bool call = false; 184 185 data->lock.lock(); 186 if (data->status.IsInit()) { 187 data->status.SetOK(); 188 data->promise.set_value(T()); 189 call = true; 190 } 191 data->lock.unlock(); 192 193 if (call) { 194 RunCallbacks(); 195 } 196 } 197 SetFailed(int32_t errCode)198 void SetFailed(int32_t errCode) const { 199 MINDRT_ASSERT(errCode != MindrtStatus::KINIT && errCode != MindrtStatus::KOK); 200 201 bool call = false; 202 203 data->lock.lock(); 204 if (data->status.IsInit()) { 205 data->status.SetCode(errCode); 206 data->promise.set_value(T()); 207 call = true; 208 } 209 data->lock.unlock(); 210 211 if (call) { 212 RunCallbacks(); 213 } 214 } 215 216 // remove all callbacks Clear()217 void Clear() const { data->Clear(); } 218 219 void Abandon(bool abandon = false) const { 220 bool call = false; 221 222 std::list<AbandonedCallback> callbacks; 223 data->lock.lock(); 224 if (!data->abandoned && data->status.IsInit() && (!data->associated || abandon)) { 225 call = data->abandoned = true; 226 callbacks.swap(data->onAbandonedCallbacks); 227 } 228 data->lock.unlock(); 229 230 if (call) { 231 internal::Run(std::move(callbacks), *this); 232 } 233 } 234 235 template <typename R> Then(const std::function<Future<R> (const T &)> & f)236 Future<R> Then(const std::function<Future<R>(const T &)> &f) const { 237 std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>()); 238 MINDRT_OOM_EXIT(promise); 239 Future<R> future = promise->GetFuture(); 240 241 std::function<void(const Future<T> &)> handler = 242 std::bind(&internal::Thenf<T, R>, f, promise, std::placeholders::_1); 243 244 OnComplete(std::move(handler)); 245 246 return future; 247 } 248 249 template <typename R> Then(const std::function<R (const T &)> & f)250 Future<R> Then(const std::function<R(const T &)> &f) const { 251 std::shared_ptr<Promise<R>> promise(new (std::nothrow) Promise<R>()); 252 MINDRT_OOM_EXIT(promise); 253 Future<R> future = promise->GetFuture(); 254 255 std::function<void(const Future<T> &)> handler = 256 std::bind(&internal::Then<T, R>, f, promise, std::placeholders::_1); 257 258 OnComplete(std::move(handler)); 259 260 return future; 261 } 262 263 template <typename R> Then(const std::function<Future<R> ()> & f)264 Future<R> Then(const std::function<Future<R>()> &f) const { 265 return Then(std::function<Future<R>(const T &)>(std::bind(f))); 266 } 267 268 template <typename R> Then(const std::function<R ()> & f)269 Future<R> Then(const std::function<R()> &f) const { 270 return Then(std::function<R(const T &)>(std::bind(f))); 271 } 272 273 template <typename F> 274 auto Then(F &&f) const -> decltype(this->Then(std::forward<F>(f), FutureBase())) { 275 return Then(std::forward<F>(f), FutureBase()); 276 } 277 278 template <typename F> OnComplete(F && f)279 const Future<T> &OnComplete(F &&f) const { 280 return OnComplete(std::forward<F>(f), FutureBase()); 281 } 282 283 template <typename F> OnAbandoned(F && f)284 const Future<T> &OnAbandoned(F &&f) const { 285 return OnAbandoned(std::forward<F>(f), FutureBase()); 286 } 287 288 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type> Then(internal::DeferredHelper<F> && f,FutureBase)289 Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const { 290 return Then<R>(std::move(f).operator std::function<Future<R>(const T &)>()); 291 } 292 293 private: 294 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if< 295 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type> Then(internal::DeferredHelper<F> && f,LessFuture)296 Future<R> Then(internal::DeferredHelper<F> &&f, LessFuture) const { 297 return Then<R>(std::move(f).operator std::function<Future<R>()>()); 298 } 299 300 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type> Then(F && f,FutureBase)301 Future<R> Then(F &&f, FutureBase) const { 302 return Then<R>(std::function<Future<R>(const T &)>(f)); 303 } 304 305 template <typename F, typename R = typename internal::Unwrap<typename std::result_of<typename std::enable_if< 306 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type>::type> Then(F && f,LessFuture)307 Future<R> Then(F &&f, LessFuture) const { 308 return Then<R>(std::function<Future<R>()>(std::forward<F>(f))); 309 } 310 311 template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type> OnComplete(F && f,FutureBase)312 const Future<T> &OnComplete(F &&f, FutureBase) const { 313 return OnComplete( 314 std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); })); 315 } 316 317 template <typename F, typename = typename std::result_of<typename std::enable_if< 318 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type> OnComplete(F && f,LessFuture)319 const Future<T> &OnComplete(F &&f, LessFuture) const { 320 return OnComplete(std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); })); 321 } 322 323 template <typename F, typename = typename std::result_of<F(const Future<T> &)>::type> OnAbandoned(F && f,FutureBase)324 const Future<T> &OnAbandoned(F &&f, FutureBase) const { 325 return OnAbandoned( 326 std::function<void(const Future<T> &)>([=](const Future<T> &future) mutable { std::forward<F>(f)(future); })); 327 } 328 329 template <typename F, typename = typename std::result_of<typename std::enable_if< 330 !std::is_bind_expression<typename std::decay<F>::type>::value, F>::type()>::type> OnAbandoned(F && f,LessFuture)331 const Future<T> &OnAbandoned(F &&f, LessFuture) const { 332 return OnAbandoned( 333 std::function<void(const Future<T> &)>([=](const Future<T> &) mutable { std::forward<F>(f)(); })); 334 } 335 RunCallbacks()336 void RunCallbacks() const { 337 std::shared_ptr<typename Future<T>::Data> copy = data; 338 internal::Run(std::move(copy->onCompleteCallbacks), Future<T>(copy)); 339 copy->Clear(); 340 } 341 Run()342 void Run() const { 343 auto iter = data->onCompleteCallbacks.begin(); 344 for (; iter != data->onCompleteCallbacks.end(); ++iter) { 345 (*iter)(*this); 346 } 347 } 348 349 template <typename V> Set(V && value)350 void Set(V &&value) const { 351 bool call = false; 352 353 data->lock.lock(); 354 if (data->status.IsInit()) { 355 data->status.SetOK(); 356 data->promise.set_value(std::forward<V>(value)); 357 call = true; 358 } 359 data->lock.unlock(); 360 361 if (call) { 362 RunCallbacks(); 363 } 364 } 365 366 template <typename V> 367 friend class Future; 368 friend class Promise<T>; 369 370 std::shared_ptr<Data> data; 371 }; 372 373 template <typename T> 374 class Promise { 375 public: Promise()376 Promise() : future() { future.data->abandoned = false; } 377 Promise(const T & t)378 explicit Promise(const T &t) : future(t) {} 379 ~Promise()380 virtual ~Promise() { 381 if (future.data) { 382 future.Abandon(); 383 } 384 } 385 SetValue(const T & value)386 void SetValue(const T &value) const { Set(value); } 387 SetValue(T && value)388 void SetValue(T &&value) const { Set(std::move(value)); } 389 SetValue(const Future<T> & tFuture)390 void SetValue(const Future<T> &tFuture) const { Associate(tFuture); } 391 SetFailed(int32_t code)392 void SetFailed(int32_t code) const { 393 if (!future.data->associated) { 394 future.SetFailed(code); 395 } 396 } 397 GetFuture()398 Future<T> GetFuture() const { return future; } 399 Associate(const Future<T> & f)400 void Associate(const Future<T> &f) const { 401 bool associated = false; 402 403 future.data->lock.lock(); 404 if (future.IsInit() && !future.data->associated) { 405 associated = (future.data->associated = true); 406 } 407 future.data->lock.unlock(); 408 409 if (associated) { 410 f.OnComplete(std::bind(&internal::Complete<T>, future, std::placeholders::_1)) 411 .OnAbandoned(std::bind(&internal::Abandon<T>, future, true)); 412 } 413 } 414 415 private: 416 template <typename V> Set(V && value)417 void Set(V &&value) const { 418 if (future.IsInit() && !future.data->associated) { 419 future.SetValue(std::forward<V>(value)); 420 } 421 } 422 423 template <typename V> 424 friend class Future; 425 426 Future<T> future; 427 }; 428 429 template <> 430 class Promise<void>; 431 432 template <typename T> 433 class Promise<T &>; 434 }; // namespace mindspore 435 436 #endif 437