1 /* 2 * Copyright (c) 2024 Huawei Device Co., Ltd. 3 * Licensed under the Apache License, Version 2.0 (the "License"); 4 * you may not use this file except in compliance with the License. 5 * You may obtain a copy of the License at 6 * 7 * http://www.apache.org/licenses/LICENSE-2.0 8 * 9 * Unless required by applicable law or agreed to in writing, software 10 * distributed under the License is distributed on an "AS IS" BASIS, 11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 * See the License for the specific language governing permissions and 13 * limitations under the License. 14 */ 15 16 #ifndef META_SRC_FUTURE_H 17 #define META_SRC_FUTURE_H 18 19 #include <mutex> 20 #include <thread> 21 22 #include <meta/base/interface_macros.h> 23 #include <meta/base/namespace.h> 24 #include <meta/ext/minimal_object.h> 25 #include <meta/ext/object_factory.h> 26 #include <meta/ext/task_queue.h> 27 #include <meta/interface/intf_future.h> 28 #include <meta/interface/intf_object_flags.h> 29 #include <meta/interface/intf_promise.h> 30 #include <meta/interface/intf_task_queue.h> 31 #include <meta/interface/object_type_info.h> 32 33 META_BEGIN_NAMESPACE() 34 35 class ContinuationQueueTask; 36 37 class Future final : public IntroduceInterfaces<IFuture> { 38 public: 39 StateType GetState() const override; 40 StateType Wait() const override; 41 StateType WaitFor(const TimeSpan& time) const override; 42 IAny::Ptr GetResult() const override; 43 IFuture::Ptr Then(const IFutureContinuation::Ptr& func, const ITaskQueue::Ptr& queue) override; 44 void Cancel() override; 45 46 void SetResult(IAny::Ptr p); 47 void SetAbandoned(); 48 49 void SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token); 50 51 private: 52 // Then continuation support 53 struct ContinuationData { 54 bool runInline {}; 55 ITaskQueue::WeakPtr queue; 56 BASE_NS::shared_ptr<ContinuationQueueTask> continuation; 57 }; 58 59 void ActivateContinuation(std::unique_lock<std::mutex>& lock); 60 void ActivateContinuation(const ContinuationData& d, const IAny::Ptr& result); 61 62 private: 63 mutable std::mutex mutex_; 64 mutable std::condition_variable cond_; 65 IAny::Ptr result_; 66 StateType state_ { IFuture::WAITING }; 67 ITaskQueue::WeakPtr queue_; 68 ITaskQueue::Token token_ {}; 69 BASE_NS::vector<ContinuationData> continuations_; 70 }; 71 72 class Promise final : public IntroduceInterfaces<MinimalObject, IPromise> { 73 META_OBJECT(Promise, META_NS::ClassId::Promise, IntroduceInterfaces) 74 public: 75 Promise() = default; 76 META_NO_COPY_MOVE(Promise) 77 public: 78 ~Promise() override; 79 80 void Set(const IAny::Ptr& res) override; 81 void SetAbandoned() override; 82 [[nodiscard]] IFuture::Ptr GetFuture() override; 83 void SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token) override; 84 85 private: 86 BASE_NS::shared_ptr<Future> future_; 87 }; 88 89 class ContinuationQueueTask : public IntroduceInterfaces<ITaskQueueTask> { 90 public: ContinuationQueueTask(IFutureContinuation::Ptr task)91 explicit ContinuationQueueTask(IFutureContinuation::Ptr task) : task_(BASE_NS::move(task)) {} 92 SetParam(IAny::Ptr arg)93 void SetParam(IAny::Ptr arg) 94 { 95 arg_ = BASE_NS::move(arg); 96 } 97 SetQueueInfo(const ITaskQueue::Ptr & queue,ITaskQueue::Token token)98 void SetQueueInfo(const ITaskQueue::Ptr& queue, ITaskQueue::Token token) 99 { 100 promise_.SetQueueInfo(queue, token); 101 } 102 Invoke()103 bool Invoke() override 104 { 105 promise_.Set(task_->Invoke(arg_)); 106 return false; 107 } 108 GetFuture()109 [[nodiscard]] IFuture::Ptr GetFuture() 110 { 111 return promise_.GetFuture(); 112 } 113 SetAbandoned()114 void SetAbandoned() 115 { 116 promise_.SetAbandoned(); 117 } 118 119 private: 120 IAny::Ptr arg_; 121 IFutureContinuation::Ptr task_; 122 Promise promise_; 123 }; 124 125 META_END_NAMESPACE() 126 127 #endif 128