• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "delegate_tasks.h"
17 
18 #include <fcntl.h>
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 
22 #include "error_multimodal.h"
23 #include "util.h"
24 
25 namespace OHOS {
26 namespace MMI {
27 namespace {
28 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, MMI_LOG_DOMAIN, "DelegateTasks" };
29 } // namespace
30 
ProcessTask()31 void DelegateTasks::Task::ProcessTask()
32 {
33     CALL_DEBUG_ENTER;
34     if (hasWaited_) {
35         MMI_HILOGE("Expired tasks will be discarded. id:%{public}d", id_);
36         return;
37     }
38     int32_t ret = fun_();
39     std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
40     MMI_HILOGD("process %{public}s task id:%{public}d,ret:%{public}d", taskType.c_str(), id_, ret);
41     if (!hasWaited_ && promise_ != nullptr) {
42         promise_->set_value(ret);
43     }
44 }
45 
~DelegateTasks()46 DelegateTasks::~DelegateTasks()
47 {
48     if (fds_[0] >= 0) {
49         close(fds_[0]);
50         fds_[0] = -1;
51     }
52     if (fds_[1] >= 0) {
53         close(fds_[1]);
54         fds_[1] = -1;
55     }
56 }
57 
Init()58 bool DelegateTasks::Init()
59 {
60     CALL_DEBUG_ENTER;
61     int32_t res = pipe(fds_);
62     if (res == -1) {
63         MMI_HILOGE("The pipe create failed,errno:%{public}d", errno);
64         return false;
65     }
66     if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
67         MMI_HILOGE("The fcntl read failed,errno:%{public}d", errno);
68         close(fds_[0]);
69         return false;
70     }
71     if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
72         MMI_HILOGE("The fcntl write failed,errno:%{public}d", errno);
73         close(fds_[1]);
74         return false;
75     }
76     return true;
77 }
78 
ProcessTasks()79 void DelegateTasks::ProcessTasks()
80 {
81     CALL_DEBUG_ENTER;
82     std::vector<TaskPtr> tasks;
83     PopPendingTaskList(tasks);
84     for (const auto &it : tasks) {
85         it->ProcessTask();
86     }
87 }
88 
PostSyncTask(DTaskCallback callback)89 int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
90 {
91     CALL_DEBUG_ENTER;
92     CHKPR(callback, ERROR_NULL_POINTER);
93     if (IsCallFromWorkerThread()) {
94         return callback();
95     }
96     Promise promise;
97     Future future = promise.get_future();
98     auto task = PostTask(callback, &promise);
99     if (task == nullptr) {
100         MMI_HILOGE("Post sync task failed");
101         return ETASKS_POST_SYNCTASK_FAIL;
102     }
103 
104     static constexpr int32_t timeout = 3000;
105     std::chrono::milliseconds span(timeout);
106     auto res = future.wait_for(span);
107     task->SetWaited();
108     if (res == std::future_status::timeout) {
109         MMI_HILOGE("Task timeout");
110         return ETASKS_WAIT_TIMEOUT;
111     } else if (res == std::future_status::deferred) {
112         MMI_HILOGE("Task deferred");
113         return ETASKS_WAIT_DEFERRED;
114     }
115     return future.get();
116 }
117 
PostAsyncTask(DTaskCallback callback)118 int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
119 {
120     CHKPR(callback, ERROR_NULL_POINTER);
121     if (IsCallFromWorkerThread()) {
122         return callback();
123     }
124     auto task = PostTask(callback);
125     if (task == nullptr) {
126         MMI_HILOGE("Post async task failed");
127         return ETASKS_POST_ASYNCTASK_FAIL;
128     }
129     return RET_OK;
130 }
131 
PopPendingTaskList(std::vector<TaskPtr> & tasks)132 void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
133 {
134     std::lock_guard<std::mutex> guard(mux_);
135     static constexpr int32_t onceProcessTaskLimit = 10;
136     for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
137         if (tasks_.empty()) {
138             break;
139         }
140         auto task = tasks_.front();
141         CHKPB(task);
142         RecoveryId(task->GetId());
143         tasks.push_back(task->GetSharedPtr());
144         tasks_.pop();
145     }
146 }
147 
PostTask(DTaskCallback callback,Promise * promise)148 DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise)
149 {
150     if (IsCallFromWorkerThread()) {
151         MMI_HILOGE("This interface cannot be called from a worker thread.");
152         return nullptr;
153     }
154     std::lock_guard<std::mutex> guard(mux_);
155     MMI_HILOGD("tasks_ size %{public}d", static_cast<int32_t>(tasks_.size()));
156     static constexpr int32_t maxTasksLimit = 1000;
157     auto tsize = tasks_.size();
158     if (tsize > maxTasksLimit) {
159         MMI_HILOGE("The task queue is full. size:%{public}zu/%{public}d", tsize, maxTasksLimit);
160         return nullptr;
161     }
162     int32_t id = GenerateId();
163     TaskData data = { GetThisThreadId(), id };
164     auto res = write(fds_[1], &data, sizeof(data));
165     if (res == -1) {
166         RecoveryId(id);
167         MMI_HILOGE("Pipe write failed,errno:%{public}d", errno);
168         return nullptr;
169     }
170     TaskPtr task = std::make_shared<Task>(id, callback, promise);
171     tasks_.push(task);
172     std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
173     MMI_HILOGD("Post %{public}s", taskType.c_str());
174     return task->GetSharedPtr();
175 }
176 } // namespace MMI
177 } // namespace OHOS