• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "delegate_tasks.h"
16 
17 #include <sys/syscall.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20 
21 #include "error_multimodal.h"
22 #include "util.h"
23 
24 namespace OHOS {
25 namespace MMI {
26 namespace {
27 constexpr OHOS::HiviewDFX::HiLogLabel LABEL = { LOG_CORE, MMI_LOG_DOMAIN, "DelegateTasks" };
28 } // namespace
29 
ProcessTask()30 void DelegateTasks::Task::ProcessTask()
31 {
32     CALL_DEBUG_ENTER;
33     if (hasWaited_) {
34         MMI_HILOGE("Expired tasks will be discarded. id:%{public}d", id_);
35         return;
36     }
37     int32_t ret = fun_();
38     std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
39     MMI_HILOGD("process %{public}s task id:%{public}d,ret:%{public}d", taskType.c_str(), id_, ret);
40     if (!hasWaited_ && promise_ != nullptr) {
41         promise_->set_value(ret);
42     }
43 }
44 
Init()45 bool DelegateTasks::Init()
46 {
47     CALL_DEBUG_ENTER;
48     int32_t res = pipe(fds_);
49     if (res == -1) {
50         MMI_HILOGE("The pipe create failed,errno:%{public}d", errno);
51         return false;
52     }
53     if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
54         MMI_HILOGE("The fcntl read failed,errno:%{public}d", errno);
55         return false;
56     }
57     if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
58         MMI_HILOGE("The fcntl write failed,errno:%{public}d", errno);
59         return false;
60     }
61     return true;
62 }
63 
ProcessTasks()64 void DelegateTasks::ProcessTasks()
65 {
66     CALL_DEBUG_ENTER;
67     std::vector<TaskPtr> tasks;
68     PopPendingTaskList(tasks);
69     for (const auto &it : tasks) {
70         it->ProcessTask();
71     }
72 }
73 
PostSyncTask(DTaskCallback callback)74 int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
75 {
76     CALL_DEBUG_ENTER;
77     CHKPR(callback, ERROR_NULL_POINTER);
78     if (IsCallFromWorkerThread()) {
79         return callback();
80     }
81     Promise promise;
82     Future future = promise.get_future();
83     auto task = PostTask(callback, &promise);
84     if (task == nullptr) {
85         MMI_HILOGE("Post sync task failed");
86         return ETASKS_POST_SYNCTASK_FAIL;
87     }
88 
89     static constexpr int32_t timeout = 3000;
90     std::chrono::milliseconds span(timeout);
91     auto res = future.wait_for(span);
92     task->SetWaited();
93     if (res == std::future_status::timeout) {
94         MMI_HILOGE("Task timeout");
95         return ETASKS_WAIT_TIMEOUT;
96     } else if (res == std::future_status::deferred) {
97         MMI_HILOGE("Task deferred");
98         return ETASKS_WAIT_DEFERRED;
99     }
100     return future.get();
101 }
102 
PostAsyncTask(DTaskCallback callback)103 int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
104 {
105     CHKPR(callback, ERROR_NULL_POINTER);
106     if (IsCallFromWorkerThread()) {
107         return callback();
108     }
109     auto task = PostTask(callback);
110     if (task == nullptr) {
111         MMI_HILOGE("Post async task failed");
112         return ETASKS_POST_ASYNCTASK_FAIL;
113     }
114     return RET_OK;
115 }
116 
PopPendingTaskList(std::vector<TaskPtr> & tasks)117 void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
118 {
119     std::lock_guard<std::mutex> guard(mux_);
120     static constexpr int32_t onceProcessTaskLimit = 10;
121     for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
122         if (tasks_.empty()) {
123             break;
124         }
125         auto task = tasks_.front();
126         CHKPB(task);
127         RecoveryId(task->GetId());
128         tasks.push_back(task->GetSharedPtr());
129         tasks_.pop();
130     }
131 }
132 
PostTask(DTaskCallback callback,Promise * promise)133 DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise)
134 {
135     if (IsCallFromWorkerThread()) {
136         MMI_HILOGE("This interface cannot be called from a worker thread.");
137         return nullptr;
138     }
139     std::lock_guard<std::mutex> guard(mux_);
140     MMI_HILOGD("tasks_ size %{public}d", static_cast<int32_t>(tasks_.size()));
141     static constexpr int32_t maxTasksLimit = 1000;
142     auto tsize = tasks_.size();
143     if (tsize > maxTasksLimit) {
144         MMI_HILOGE("The task queue is full. size:%{public}zu/%{public}d", tsize, maxTasksLimit);
145         return nullptr;
146     }
147     int32_t id = GenerateId();
148     TaskData data = {GetThisThreadId(), id};
149     auto res = write(fds_[1], &data, sizeof(data));
150     if (res == -1) {
151         RecoveryId(id);
152         MMI_HILOGE("Pipe write failed,errno:%{public}d", errno);
153         return nullptr;
154     }
155     TaskPtr task = std::make_shared<Task>(id, callback, promise);
156     tasks_.push(task);
157     std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
158     MMI_HILOGD("Post %{public}s", taskType.c_str());
159     return task->GetSharedPtr();
160 }
161 } // namespace MMI
162 } // namespace OHOS