• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022-2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "delegate_tasks.h"
16 
17 #include <sys/syscall.h>
18 #include <fcntl.h>
19 #include <unistd.h>
20 
21 #include "devicestatus_define.h"
22 
23 namespace OHOS {
24 namespace Msdp {
25 namespace DeviceStatus {
26 namespace {
27 constexpr OHOS::HiviewDFX::HiLogLabel LABEL { LOG_CORE, MSDP_DOMAIN_ID, "DelegateTasks" };
28 } // namespace
29 
ProcessTask()30 void DelegateTasks::Task::ProcessTask()
31 {
32     CALL_DEBUG_ENTER;
33     if (hasWaited_) {
34         FI_HILOGE("Expired tasks will be discarded, id:%{public}d", id_);
35         return;
36     }
37     int32_t ret = fun_();
38     std::string taskType = ((promise_ == nullptr) ? "Async" : "Sync");
39     FI_HILOGD("process:%{public}s, task id:%{public}d, ret:%{public}d", taskType.c_str(), id_, ret);
40     if (!hasWaited_ && promise_ != nullptr) {
41         promise_->set_value(ret);
42     }
43 }
44 
~DelegateTasks()45 DelegateTasks::~DelegateTasks()
46 {
47     if (fds_[0] >= 0) {
48         if (close(fds_[0]) < 0) {
49             FI_HILOGE("Close fds_[0] failed, error:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]);
50         }
51         fds_[0] = -1;
52     }
53     if (fds_[1] >= 0) {
54         if (close(fds_[1]) < 0) {
55             FI_HILOGE("Close fds_[1] failed, error:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]);
56         }
57         fds_[1] = -1;
58     }
59 }
60 
Init()61 bool DelegateTasks::Init()
62 {
63     CALL_DEBUG_ENTER;
64     int32_t res = pipe(fds_);
65     if (res == -1) {
66         FI_HILOGE("The pipe create failed, errno:%{public}d", errno);
67         return false;
68     }
69     if (fcntl(fds_[0], F_SETFL, O_NONBLOCK) == -1) {
70         FI_HILOGE("The fcntl read failed, errno:%{public}d", errno);
71         if (close(fds_[0]) < 0) {
72             FI_HILOGE("Close fds_[0] failed, error:%{public}s, fds_[0]:%{public}d", strerror(errno), fds_[0]);
73             return false;
74         }
75     }
76     if (fcntl(fds_[1], F_SETFL, O_NONBLOCK) == -1) {
77         FI_HILOGE("The fcntl write failed, errno:%{public}d", errno);
78         if (close(fds_[1]) < 0) {
79             FI_HILOGE("Close fds_[1] failed, error:%{public}s, fds_[1]:%{public}d", strerror(errno), fds_[1]);
80             return false;
81         }
82     }
83     return true;
84 }
85 
ProcessTasks()86 void DelegateTasks::ProcessTasks()
87 {
88     CALL_DEBUG_ENTER;
89     std::vector<TaskPtr> tasks;
90     PopPendingTaskList(tasks);
91     for (const auto &it : tasks) {
92         it->ProcessTask();
93     }
94 }
95 
PostSyncTask(DTaskCallback callback)96 int32_t DelegateTasks::PostSyncTask(DTaskCallback callback)
97 {
98     CALL_DEBUG_ENTER;
99     CHKPR(callback, ERROR_NULL_POINTER);
100     if (IsCallFromWorkerThread()) {
101         return callback();
102     }
103     Promise promise;
104     Future future = promise.get_future();
105     auto task = PostTask(callback, &promise);
106     CHKPR(task, ETASKS_POST_SYNCTASK_FAIL);
107 
108     static constexpr int32_t timeout = 3000;
109     std::chrono::milliseconds span(timeout);
110     auto res = future.wait_for(span);
111     task->SetWaited();
112     if (res == std::future_status::timeout) {
113         FI_HILOGE("Task timeout");
114         return ETASKS_WAIT_TIMEOUT;
115     } else if (res == std::future_status::deferred) {
116         FI_HILOGE("Task deferred");
117         return ETASKS_WAIT_DEFERRED;
118     }
119     return future.get();
120 }
121 
PostAsyncTask(DTaskCallback callback)122 int32_t DelegateTasks::PostAsyncTask(DTaskCallback callback)
123 {
124     CHKPR(callback, ERROR_NULL_POINTER);
125     auto task = PostTask(callback);
126     CHKPR(task, ETASKS_POST_ASYNCTASK_FAIL);
127     return RET_OK;
128 }
129 
PopPendingTaskList(std::vector<TaskPtr> & tasks)130 void DelegateTasks::PopPendingTaskList(std::vector<TaskPtr> &tasks)
131 {
132     std::lock_guard<std::mutex> guard(mux_);
133     static constexpr int32_t onceProcessTaskLimit = 10;
134     for (int32_t count = 0; count < onceProcessTaskLimit; count++) {
135         if (tasks_.empty()) {
136             break;
137         }
138         auto task = tasks_.front();
139         CHKPB(task);
140         RecoveryId(task->GetId());
141         tasks.push_back(task->GetSharedPtr());
142         tasks_.pop();
143     }
144 }
145 
PostTask(DTaskCallback callback,Promise * promise)146 DelegateTasks::TaskPtr DelegateTasks::PostTask(DTaskCallback callback, Promise *promise)
147 {
148     std::lock_guard<std::mutex> guard(mux_);
149     FI_HILOGD("tasks_ size:%{public}zu", tasks_.size());
150     static constexpr int32_t maxTasksLimit = 1000;
151     size_t tsize = tasks_.size();
152     if (tsize > maxTasksLimit) {
153         FI_HILOGE("The task queue is full, size:%{public}zu/%{public}d", tsize, maxTasksLimit);
154         return nullptr;
155     }
156     int32_t id = GenerateId();
157     TaskData data = {GetThisThreadId(), id};
158     ssize_t res = write(fds_[1], &data, sizeof(data));
159     if (res == -1) {
160         RecoveryId(id);
161         FI_HILOGE("Pipe write failed, errno:%{public}d", errno);
162         return nullptr;
163     }
164     TaskPtr task = std::make_shared<Task>(id, callback, promise);
165     tasks_.push(task);
166     std::string taskType = ((promise == nullptr) ? "Async" : "Sync");
167     FI_HILOGD("Post %{public}s", taskType.c_str());
168     return task->GetSharedPtr();
169 }
170 } // namespace DeviceStatus
171 } // namespace Msdp
172 } // namespace OHOS
173