• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2024 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "future.h"
17 
18 #include <meta/interface/object_type_info.h>
19 
META_BEGIN_NAMESPACE()20 META_BEGIN_NAMESPACE()
21 
22 Future::StateType Future::GetState() const
23 {
24     std::unique_lock lock { mutex_ };
25     return state_;
26 }
27 
Wait() const28 Future::StateType Future::Wait() const
29 {
30     std::unique_lock lock { mutex_ };
31     while (state_ == IFuture::WAITING) {
32         cond_.wait(lock);
33     }
34     return state_;
35 }
36 
WaitFor(const TimeSpan & time) const37 Future::StateType Future::WaitFor(const TimeSpan& time) const
38 {
39     std::unique_lock lock { mutex_ };
40     while (state_ == IFuture::WAITING) {
41         if (cond_.wait_for(lock, std::chrono::microseconds(time.ToMicroseconds())) == std::cv_status::timeout) {
42             return IFuture::WAITING;
43         }
44     }
45     return state_;
46 }
47 
GetResult() const48 IAny::Ptr Future::GetResult() const
49 {
50     std::unique_lock lock { mutex_ };
51     while (state_ == IFuture::WAITING) {
52         cond_.wait(lock);
53     }
54     return result_;
55 }
56 
Then(const IFutureContinuation::Ptr & func,const ITaskQueue::Ptr & queue)57 IFuture::Ptr Future::Then(const IFutureContinuation::Ptr& func, const ITaskQueue::Ptr& queue)
58 {
59     std::unique_lock lock { mutex_ };
60     IFuture::Ptr result;
61     if (state_ == IFuture::ABANDONED) {
62         BASE_NS::shared_ptr<Future> f(new Future);
63         f->SetAbandoned();
64         result = BASE_NS::move(f);
65     } else {
66         ContinuationData d { queue == nullptr, queue };
67         d.continuation.reset(new ContinuationQueueTask(func));
68         result = d.continuation->GetFuture();
69         continuations_.push_back(BASE_NS::move(d));
70         if (state_ == IFuture::COMPLETED) {
71             ActivateContinuation(lock);
72         }
73     }
74     return result;
75 }
76 
Cancel()77 void Future::Cancel()
78 {
79     bool notify = false;
80     ITaskQueue::Token token {};
81     {
82         std::unique_lock lock { mutex_ };
83         if (state_ != IFuture::COMPLETED) {
84             notify = true;
85             token = token_;
86             token_ = {};
87             for (auto&& v : continuations_) {
88                 v.continuation->SetAbandoned();
89             }
90             continuations_.clear();
91             state_ = IFuture::ABANDONED;
92         }
93     }
94     if (token) {
95         if (auto q = queue_.lock()) {
96             q->CancelTask(token);
97         }
98     }
99     if (notify) {
100         cond_.notify_all();
101     }
102 }
103 
ActivateContinuation(ContinuationData d,const IAny::Ptr & result)104 void Future::ActivateContinuation(ContinuationData d, const IAny::Ptr& result)
105 {
106     if (auto q = d.queue.lock()) {
107         d.continuation->SetParam(result);
108         auto fut = interface_pointer_cast<IFutureSetInfo>(d.continuation->GetFuture());
109         auto token = q->AddTask(BASE_NS::move(d.continuation));
110         if (fut) {
111             fut->SetQueueInfo(q, token);
112         }
113     } else if (d.runInline) {
114         d.continuation->SetParam(result);
115         d.continuation->Invoke();
116     } else {
117         d.continuation->SetAbandoned();
118     }
119 }
120 
ActivateContinuation(std::unique_lock<std::mutex> & lock)121 void Future::ActivateContinuation(std::unique_lock<std::mutex>& lock)
122 {
123     BASE_NS::vector<ContinuationData> cdata = BASE_NS::move(continuations_);
124     auto result = result_;
125     lock.unlock();
126     for (auto&& v : cdata) {
127         ActivateContinuation(BASE_NS::move(v), result);
128     }
129 }
130 
SetResult(IAny::Ptr p)131 void Future::SetResult(IAny::Ptr p)
132 {
133     std::unique_lock lock { mutex_ };
134     token_ = {};
135     if (state_ == IFuture::WAITING) {
136         result_ = BASE_NS::move(p);
137         state_ = IFuture::COMPLETED;
138         ActivateContinuation(lock);
139         cond_.notify_all();
140     }
141 }
142 
SetAbandoned()143 void Future::SetAbandoned()
144 {
145     std::unique_lock lock { mutex_ };
146     if (state_ == IFuture::WAITING) {
147         state_ = IFuture::ABANDONED;
148         token_ = {};
149 
150         for (auto&& v : continuations_) {
151             v.continuation->SetAbandoned();
152         }
153         continuations_.clear();
154         cond_.notify_all();
155     }
156 }
157 
SetQueueInfo(const ITaskQueue::Ptr & queue,ITaskQueue::Token token)158 void Future::SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token)
159 {
160     std::unique_lock lock { mutex_ };
161     if (state_ == IFuture::WAITING) {
162         queue_ = queue;
163         token_ = token;
164     }
165 }
166 
~Promise()167 Promise::~Promise()
168 {
169     SetAbandoned();
170 }
171 
Set(const IAny::Ptr & res)172 void Promise::Set(const IAny::Ptr& res)
173 {
174     if (future_) {
175         future_->SetResult(res);
176         future_ = nullptr;
177     }
178 }
179 
SetAbandoned()180 void Promise::SetAbandoned()
181 {
182     if (future_) {
183         future_->SetAbandoned();
184         future_ = nullptr;
185     }
186 }
187 
GetFuture()188 IFuture::Ptr Promise::GetFuture()
189 {
190     if (!future_) {
191         future_.reset(new Future);
192     }
193     return future_;
194 }
195 
SetQueueInfo(const ITaskQueue::Ptr & queue,ITaskQueue::Token token)196 void Promise::SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token)
197 {
198     if (future_) {
199         future_->SetQueueInfo(queue, token);
200     }
201 }
202 
203 namespace Internal {
204 
GetPromiseFactory()205 IObjectFactory::Ptr GetPromiseFactory()
206 {
207     return Promise::GetFactory();
208 }
209 
210 } // namespace Internal
211 
212 META_END_NAMESPACE()
213