• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "delegate_tasks.h"
17 
18 #include <fcntl.h>
19 #include <unistd.h>
20 
21 #include "error_multimodal.h"
22 
23 #undef MMI_LOG_DOMAIN
24 #define MMI_LOG_DOMAIN MMI_LOG_SERVER
25 #undef MMI_LOG_TAG
26 #define MMI_LOG_TAG "DelegateTasks"
27 
28 namespace OHOS {
29 namespace MMI {
ProcessTask()30 void DelegateTasks::Task::ProcessTask()
31 {
32     CALL_DEBUG_ENTER;
33     if (hasWaited_) {
34         MMI_HILOGE("Expired tasks will be discarded. id:%{public}d", id_);
35         return;
36     }
37     int32_t ret = fun_();
38     std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
39     MMI_HILOGD("Process taskType:%{public}s, taskId:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
40     if (!hasWaited_ && promise_ != nullptr) {
41         promise_->set_value(ret);
42     }
43 }
44 
~DelegateTasks()45 DelegateTasks::~DelegateTasks()
46 {
47     if (fds_[0] >= 0) {
48         close(fds_[0]);
49         fds_[0] = -1;
50     }
51     if (fds_[1] >= 0) {
52         close(fds_[1]);
53         fds_[1] = -1;
54     }
55 }
56 
Init()57 bool DelegateTasks::Init()
58 {
59     CALL_DEBUG_ENTER;
60     if (pipe(fds_) == -1) {
61         MMI_HILOGE("The pipe create failed, errno:%{public}d", errno);
62         return false;
63     }
64     if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
65         MMI_HILOGE("The fcntl read failed, errno:%{public}d", errno);
66         close(fds_[0]);
67         return false;
68     }
69     if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
70         MMI_HILOGE("The fcntl write failed, errno:%{public}d", errno);
71         close(fds_[1]);
72         return false;
73     }
74     return true;
75 }
76 
ProcessTasks()77 void DelegateTasks::ProcessTasks()
78 {
79     CALL_DEBUG_ENTER;
80     std::vector<TaskPtr> tasks;
81     PopPendingTaskList(tasks);
82     for (const auto &it : tasks) {
83         it->ProcessTask();
84     }
85 }
86 
PostSyncTask(DTaskCallback callback)87 int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
88 {
89     CALL_DEBUG_ENTER;
90     CHKPR(callback, ERROR_NULL_POINTER);
91     if (IsCallFromWorkerThread()) {
92         return callback();
93     }
94     std::shared_ptr<Promise> promise = std::make_shared<Promise>();
95     Future future = promise->get_future();
96     auto task = PostTask(callback, promise);
97     CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
98 
99     static constexpr int32_t timeout = 3000;
100     std::chrono::milliseconds span(timeout);
101     auto res = future.wait_for(span);
102     task->SetWaited();
103     if (res == std::future_status::timeout) {
104         MMI_HILOGE("Task timeout");
105         return ETASKS_WAIT_TIMEOUT;
106     } else if (res == std::future_status::deferred) {
107         MMI_HILOGE("Task deferred");
108         return ETASKS_WAIT_DEFERRED;
109     }
110     return future.get();
111 }
112 
PostAsyncTask(DTaskCallback callback)113 int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
114 {
115     CHKPR(callback, ERROR_NULL_POINTER);
116     if (IsCallFromWorkerThread()) {
117         return callback();
118     }
119     CHKPR(PostTask(callback), ETASKS_POST_ASYNCTASK_FAIL);
120     return RET_OK;
121 }
122 
PopPendingTaskList(std::vector<TaskPtr> & tasks)123 void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
124 {
125     std::lock_guard<std::mutex> guard(mux_);
126     static constexpr int32_t onceProcessTaskLimit = 10;
127     for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
128         if (tasks_.empty()) {
129             break;
130         }
131         auto task = tasks_.front();
132         CHKPB(task);
133         RecoveryId(task->GetId());
134         tasks.push_back(task->GetSharedPtr());
135         tasks_.pop();
136     }
137 }
138 
PostTask(DTaskCallback callback,std::shared_ptr<Promise> promise)139 DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, std::shared_ptr<Promise> promise)
140 {
141     if (IsCallFromWorkerThread()) {
142         MMI_HILOGE("This interface cannot be called from a worker thread");
143         return nullptr;
144     }
145     std::lock_guard<std::mutex> guard(mux_);
146     MMI_HILOGD("tasks_ size:%{public}d", static_cast<int32_t>(tasks_.size()));
147     static constexpr int32_t maxTasksLimit = 1000;
148     auto tsize = tasks_.size();
149     if (tsize > maxTasksLimit) {
150         MMI_HILOGE("The task queue is full. size:%{public}zu, maxTasksLimit:%{public}d", tsize, maxTasksLimit);
151         return nullptr;
152     }
153     int32_t id = GenerateId();
154     TaskData data = { GetThisThreadId(), id };
155     auto res = write(fds_[1], &data, sizeof(data));
156     if (res == -1) {
157         RecoveryId(id);
158         MMI_HILOGE("Pipe write failed, errno:%{public}d", errno);
159         return nullptr;
160     }
161     TaskPtr task = std::make_shared<Task>(id, callback, promise);
162     tasks_.push(task);
163     std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
164     MMI_HILOGD("Post taskType:%{public}s", taskType.c_str());
165     return task->GetSharedPtr();
166 }
167 } // namespace MMI
168 } // namespace OHOS