• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /**
2  * Copyright 2021 Huawei Technologies Co., Ltd
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_COLLECT_H
18 #define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_COLLECT_H
19 
20 #include <future>
21 #include <iostream>
22 #include <list>
23 #include <memory>
24 #include <tuple>
25 #include "async/common.h"
26 #include "async/future.h"
27 #include "async/defer.h"
28 #include "async/spinlock.h"
29 #include "actor/actor.h"
30 #include "mindrt/include/mindrt.hpp"
31 
32 namespace mindspore {
33 template <typename T>
34 class Future;
35 
36 template <typename T>
37 class Promise;
38 
39 template <typename T>
40 class Collected;
41 
42 template <typename T>
43 class Collected {
44  public:
Collected(const std::list<Future<T>> & f,Promise<std::list<T>> * p)45   Collected(const std::list<Future<T>> &f, Promise<std::list<T>> *p) : futures(f), promise(p), ready(0) {}
46 
~Collected()47   virtual ~Collected() {
48     delete promise;
49     promise = nullptr;
50   }
51 
52   Collected(const Collected &) = delete;
53   Collected(Collected &&) = default;
54 
55   Collected &operator=(const Collected &) = delete;
56   Collected &operator=(Collected &&) = default;
57 
58  public:
Discarded()59   void Discarded() {
60     auto iter = futures.begin();
61     for (; iter != futures.end(); ++iter) {
62       iter->SetFailed(MindrtStatus::KERROR);
63     }
64   }
65 
Waited(const Future<T> & future)66   void Waited(const Future<T> &future) {
67     if (future.IsError()) {
68       promise->SetFailed(future.GetErrorCode());
69     } else if (future.IsOK()) {
70       auto val = ready.fetch_add(1);
71       if ((val + 1) == futures.size()) {
72         std::list<T> values;
73         auto iter = futures.begin();
74         for (; iter != futures.end(); ++iter) {
75           values.push_back(iter->Get());
76         }
77         promise->SetValue(values);
78       }
79     }
80   }
81 
82  private:
83   const std::list<Future<T>> futures;
84   Promise<std::list<T>> *promise;
85   std::atomic_ulong ready;
86 };
87 
88 template <typename T>
Collect(const std::list<Future<T>> & futures)89 inline Future<std::list<T>> Collect(const std::list<Future<T>> &futures) {
90   if (futures.empty()) return Future<std::list<T>>(std::list<T>());
91 
92   Promise<std::list<T>> *promise = new (std::nothrow) Promise<std::list<T>>();
93   MINDRT_OOM_EXIT(promise);
94   std::shared_ptr<Collected<T>> collect = std::make_shared<Collected<T>>(futures, promise);
95 
96   for (auto iter = futures.begin(); iter != futures.end(); ++iter) {
97     (void)iter->OnComplete(Defer(collect, &Collected<T>::Waited, std::placeholders::_1));
98   }
99 
100   Future<std::list<T>> future = promise->GetFuture();
101   (void)future.OnComplete(Defer(collect, &Collected<T>::Discarded));
102 
103   return future;
104 }
105 
106 template <typename... Ts>
Collect(const Future<Ts> &...futures)107 Future<std::tuple<Ts...>> Collect(const Future<Ts> &... futures) {
108   std::list<Future<Nothing>> wrappers = {futures.Then([]() { return Nothing(); })...};
109 
110   auto f = [](const Future<Ts> &... futures) { return std::make_tuple(futures.Get()...); };
111 
112   return Collect(wrappers).Then(std::bind(f, futures...));
113 }
114 };  // namespace mindspore
115 
116 #endif
117