1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "future.h"
17
18 #include <meta/interface/object_type_info.h>
19
META_BEGIN_NAMESPACE()20 META_BEGIN_NAMESPACE()
21
22 Future::StateType Future::GetState() const
23 {
24 std::unique_lock lock { mutex_ };
25 return state_;
26 }
27
Wait() const28 Future::StateType Future::Wait() const
29 {
30 std::unique_lock lock { mutex_ };
31 while (state_ == IFuture::WAITING) {
32 cond_.wait(lock);
33 }
34 return state_;
35 }
36
WaitFor(const TimeSpan & time) const37 Future::StateType Future::WaitFor(const TimeSpan& time) const
38 {
39 std::unique_lock lock { mutex_ };
40 while (state_ == IFuture::WAITING) {
41 if (cond_.wait_for(lock, std::chrono::microseconds(time.ToMicroseconds())) == std::cv_status::timeout) {
42 return IFuture::WAITING;
43 }
44 }
45 return state_;
46 }
47
GetResult() const48 IAny::Ptr Future::GetResult() const
49 {
50 std::unique_lock lock { mutex_ };
51 while (state_ == IFuture::WAITING) {
52 cond_.wait(lock);
53 }
54 return result_;
55 }
56
Then(const IFutureContinuation::Ptr & func,const ITaskQueue::Ptr & queue)57 IFuture::Ptr Future::Then(const IFutureContinuation::Ptr& func, const ITaskQueue::Ptr& queue)
58 {
59 std::unique_lock lock { mutex_ };
60 IFuture::Ptr result;
61 if (state_ == IFuture::ABANDONED) {
62 BASE_NS::shared_ptr<Future> f(new Future);
63 f->SetAbandoned();
64 result = BASE_NS::move(f);
65 } else {
66 ContinuationData d { queue == nullptr, queue };
67 d.continuation.reset(new ContinuationQueueTask(func));
68 result = d.continuation->GetFuture();
69 continuations_.push_back(BASE_NS::move(d));
70 if (state_ == IFuture::COMPLETED) {
71 ActivateContinuation(lock);
72 }
73 }
74 return result;
75 }
76
Cancel()77 void Future::Cancel()
78 {
79 bool notify = false;
80 ITaskQueue::Token token {};
81 {
82 std::unique_lock lock { mutex_ };
83 if (state_ != IFuture::COMPLETED) {
84 notify = true;
85 token = token_;
86 token_ = {};
87 for (auto&& v : continuations_) {
88 v.continuation->SetAbandoned();
89 }
90 continuations_.clear();
91 state_ = IFuture::ABANDONED;
92 }
93 }
94 if (token) {
95 if (auto q = queue_.lock()) {
96 q->CancelTask(token);
97 }
98 }
99 if (notify) {
100 cond_.notify_all();
101 }
102 }
103
ActivateContinuation(ContinuationData d,const IAny::Ptr & result)104 void Future::ActivateContinuation(ContinuationData d, const IAny::Ptr& result)
105 {
106 if (auto q = d.queue.lock()) {
107 d.continuation->SetParam(result);
108 auto fut = interface_pointer_cast<IFutureSetInfo>(d.continuation->GetFuture());
109 auto token = q->AddTask(BASE_NS::move(d.continuation));
110 if (fut) {
111 fut->SetQueueInfo(q, token);
112 }
113 } else if (d.runInline) {
114 d.continuation->SetParam(result);
115 d.continuation->Invoke();
116 } else {
117 d.continuation->SetAbandoned();
118 }
119 }
120
ActivateContinuation(std::unique_lock<std::mutex> & lock)121 void Future::ActivateContinuation(std::unique_lock<std::mutex>& lock)
122 {
123 BASE_NS::vector<ContinuationData> cdata = BASE_NS::move(continuations_);
124 auto result = result_;
125 lock.unlock();
126 for (auto&& v : cdata) {
127 ActivateContinuation(BASE_NS::move(v), result);
128 }
129 }
130
SetResult(IAny::Ptr p)131 void Future::SetResult(IAny::Ptr p)
132 {
133 std::unique_lock lock { mutex_ };
134 token_ = {};
135 if (state_ == IFuture::WAITING) {
136 result_ = BASE_NS::move(p);
137 state_ = IFuture::COMPLETED;
138 ActivateContinuation(lock);
139 cond_.notify_all();
140 }
141 }
142
SetAbandoned()143 void Future::SetAbandoned()
144 {
145 std::unique_lock lock { mutex_ };
146 if (state_ == IFuture::WAITING) {
147 state_ = IFuture::ABANDONED;
148 token_ = {};
149
150 for (auto&& v : continuations_) {
151 v.continuation->SetAbandoned();
152 }
153 continuations_.clear();
154 cond_.notify_all();
155 }
156 }
157
SetQueueInfo(const ITaskQueue::Ptr & queue,ITaskQueue::Token token)158 void Future::SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token)
159 {
160 std::unique_lock lock { mutex_ };
161 if (state_ == IFuture::WAITING) {
162 queue_ = queue;
163 token_ = token;
164 }
165 }
166
~Promise()167 Promise::~Promise()
168 {
169 SetAbandoned();
170 }
171
Set(const IAny::Ptr & res)172 void Promise::Set(const IAny::Ptr& res)
173 {
174 if (future_) {
175 future_->SetResult(res);
176 future_ = nullptr;
177 }
178 }
179
SetAbandoned()180 void Promise::SetAbandoned()
181 {
182 if (future_) {
183 future_->SetAbandoned();
184 future_ = nullptr;
185 }
186 }
187
GetFuture()188 IFuture::Ptr Promise::GetFuture()
189 {
190 if (!future_) {
191 future_.reset(new Future);
192 }
193 return future_;
194 }
195
SetQueueInfo(const ITaskQueue::Ptr & queue,ITaskQueue::Token token)196 void Promise::SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token)
197 {
198 if (future_) {
199 future_->SetQueueInfo(queue, token);
200 }
201 }
202
203 namespace Internal {
204
GetPromiseFactory()205 IObjectFactory::Ptr GetPromiseFactory()
206 {
207 return Promise::GetFactory();
208 }
209
210 } // namespace Internal
211
212 META_END_NAMESPACE()
213