• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "delegate_tasks.h"
17 
18 #include <fcntl.h>
19 #include <unistd.h>
20 
21 #include "error_multimodal.h"
22 
23 #undef MMI_LOG_DOMAIN
24 #define MMI_LOG_DOMAIN MMI_LOG_SERVER
25 #undef MMI_LOG_TAG
26 #define MMI_LOG_TAG "DelegateTasks"
27 
28 namespace OHOS {
29 namespace MMI {
30 namespace {
31     constexpr int32_t TIMED_WAIT_MS = 2;
32 } // namespace
ProcessTask()33 void DelegateTasks::Task::ProcessTask()
34 {
35     CALL_DEBUG_ENTER;
36     if (hasWaited_) {
37         MMI_HILOGE("Expired tasks will be discarded. id:%{public}" PRId64, id_);
38         return;
39     }
40     int32_t ret = fun_();
41     std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
42     MMI_HILOGD("Process taskType:%{public}s, taskId:%{public}" PRId64 ", ret:%{public}d", taskType.c_str(), id_, ret);
43     if (!hasWaited_ && promise_ != nullptr) {
44         promise_->set_value(ret);
45     }
46 }
47 
~DelegateTasks()48 DelegateTasks::~DelegateTasks()
49 {
50     if (fds_[0] >= 0) {
51         close(fds_[0]);
52         fds_[0] = -1;
53     }
54     if (fds_[1] >= 0) {
55         close(fds_[1]);
56         fds_[1] = -1;
57     }
58 }
59 
Init()60 bool DelegateTasks::Init()
61 {
62     CALL_DEBUG_ENTER;
63     if (pipe(fds_) == -1) {
64         MMI_HILOGE("The pipe create failed, errno:%{public}d", errno);
65         return false;
66     }
67     if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
68         MMI_HILOGE("The fcntl read failed, errno:%{public}d", errno);
69         close(fds_[0]);
70         return false;
71     }
72     if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
73         MMI_HILOGE("The fcntl write failed, errno:%{public}d", errno);
74         close(fds_[1]);
75         return false;
76     }
77     return true;
78 }
79 
ProcessTasks()80 void DelegateTasks::ProcessTasks()
81 {
82     CALL_DEBUG_ENTER;
83     std::vector<TaskPtr> tasks;
84     PopPendingTaskList(tasks);
85     size_t count = tasks.size();
86     if (count == 0) {
87         return;
88     }
89     for (const auto &it : tasks) {
90         it->ProcessTask();
91     }
92     std::vector<DelegateTasks::TaskData> datas = {};
93     datas.resize(count);
94     auto res = read(fds_[0], datas.data(), sizeof(DelegateTasks::TaskData) * count);
95     if (res == -1) {
96         MMI_HILOGW("Read failed erron:%{public}d", errno);
97     }
98     MMI_HILOGD("count:%{public}zu", count);
99 }
100 
PostSyncTask(DTaskCallback callback)101 int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
102 {
103     CALL_DEBUG_ENTER;
104     CHKPR(callback, ERROR_NULL_POINTER);
105     if (IsCallFromWorkerThread()) {
106         return callback();
107     }
108     std::shared_ptr<Promise> promise = std::make_shared<Promise>();
109     Future future = promise->get_future();
110     auto task = PostTask(callback, promise);
111     CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
112 
113     static constexpr int32_t timeout = 3000;
114     std::chrono::milliseconds span(timeout);
115     auto res = future.wait_for(span);
116     task->SetWaited();
117     if (res == std::future_status::timeout) {
118         MMI_HILOGE("Task timeout");
119         return ETASKS_WAIT_TIMEOUT;
120     } else if (res == std::future_status::deferred) {
121         MMI_HILOGE("Task deferred");
122         return ETASKS_WAIT_DEFERRED;
123     }
124     return future.get();
125 }
126 
PostAsyncTask(DTaskCallback callback)127 int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
128 {
129     CHKPR(callback, ERROR_NULL_POINTER);
130     if (IsCallFromWorkerThread()) {
131         return callback();
132     }
133     CHKPR(PostTask(callback), ETASKS_POST_ASYNCTASK_FAIL);
134     return RET_OK;
135 }
136 
PopPendingTaskList(std::vector<TaskPtr> & tasks)137 void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
138 {
139     static constexpr int32_t onceProcessTaskLimit = 10;
140     if (mux_.try_lock_for(std::chrono::milliseconds(TIMED_WAIT_MS))) {
141         for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
142             if (tasks_.empty()) {
143                 break;
144             }
145             auto task = tasks_.front();
146             CHKPB(task);
147             tasks.push_back(task->GetSharedPtr());
148             tasks_.pop();
149         }
150         mux_.unlock();
151     }
152 }
153 
PostTask(DTaskCallback callback,std::shared_ptr<Promise> promise)154 DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, std::shared_ptr<Promise> promise)
155 {
156     if (IsCallFromWorkerThread()) {
157         MMI_HILOGE("This interface cannot be called from a worker thread");
158         return nullptr;
159     }
160     TaskPtr taskCopy = nullptr;
161     {
162         std::lock_guard<std::timed_mutex> guard(mux_);
163         MMI_HILOGD("tasks_ size:%{public}d", static_cast<int32_t>(tasks_.size()));
164         static constexpr int32_t maxTasksLimit = 1000;
165         auto tsize = tasks_.size();
166         if (tsize > maxTasksLimit) {
167             MMI_HILOGE("The task queue is full. size:%{public}zu, maxTasksLimit:%{public}d", tsize, maxTasksLimit);
168             return nullptr;
169         }
170         id_++;
171         TaskData data = { GetThisThreadId(), id_};
172         auto res = write(fds_[1], &data, sizeof(data));
173         if (res == -1) {
174             MMI_HILOGE("Pipe write failed, errno:%{public}d", errno);
175             return nullptr;
176         }
177         TaskPtr task = std::make_shared<Task>(id_, callback, promise);
178         tasks_.push(task);
179         taskCopy = task->GetSharedPtr();
180     }
181     return taskCopy;
182 }
183 } // namespace MMI
184 } // namespace OHOS