1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "hyperaio.h"
17
18 #include <chrono>
19 #include <thread>
20 #include "accesstoken_kit.h"
21 #include "hyperaio_trace.h"
22 #include "libhilog.h"
23 #include "ipc_skeleton.h"
24 #ifdef HYPERAIO_USE_LIBURING
25 #include "liburing.h"
26 #endif
27 namespace OHOS {
28 namespace HyperAio {
29 #ifdef HYPERAIO_USE_LIBURING
30 const uint32_t URING_QUEUE_SIZE = 512;
31 const uint32_t DELAY = 20;
32 const uint32_t BATCH_SIZE = 128;
33 const uint32_t RETRIES = 3;
34 std::atomic<uint32_t> openReqCount_{0};
35 std::atomic<uint32_t> readReqCount_{0};
36 std::atomic<uint32_t> cancelReqCount_{0};
37 std::atomic<uint32_t> cqeCount_{0};
38 class HyperAio::Impl {
39 public:
40 io_uring uring_;
41 };
42
HasAccessIouringPermission()43 static bool HasAccessIouringPermission()
44 {
45 Security::AccessToken::AccessTokenID tokenCaller = IPCSkeleton::GetCallingTokenID();
46 const std::string permissionName = "ohos.permission.ALLOW_IOURING";
47 int32_t res = Security::AccessToken::AccessTokenKit::VerifyAccessToken(tokenCaller, permissionName);
48 if (res != Security::AccessToken::PermissionState::PERMISSION_GRANTED) {
49 HILOGE("have no ALLOW_IOURING permission");
50 return false;
51 }
52
53 return true;
54 }
55
ValidateReqNum(uint32_t reqNum)56 static bool ValidateReqNum(uint32_t reqNum)
57 {
58 return reqNum > 0 && reqNum <= URING_QUEUE_SIZE;
59 }
60
SupportIouring()61 uint32_t HyperAio::SupportIouring()
62 {
63 HyperaioTrace trace("SupportIouring");
64 uint32_t flags = 0;
65 if (HasAccessIouringPermission()) {
66 flags |= IOURING_APP_PERMISSION;
67 }
68
69 return flags;
70 }
71
GetSqeWithRetry(struct io_uring * ring)72 struct io_uring_sqe* GetSqeWithRetry(struct io_uring *ring)
73 {
74 struct io_uring_sqe *sqe;
75 for (uint32_t i = 0; i < RETRIES; i++) {
76 sqe = io_uring_get_sqe(ring);
77 if (sqe != nullptr) {
78 return sqe;
79 }
80 io_uring_submit(ring);
81 std::this_thread::sleep_for(std::chrono::milliseconds(DELAY));
82 }
83 return nullptr;
84 }
85
CtxInit(ProcessIoResultCallBack * callBack)86 int32_t HyperAio::CtxInit(ProcessIoResultCallBack *callBack)
87 {
88 HyperaioTrace trace("CtxInit");
89 if (initialized_.load()) {
90 HILOGE("HyperAio has been initialized");
91 return EOK;
92 }
93
94 if (callBack == nullptr) {
95 HILOGE("callBack is null");
96 return -EINVAL;
97 }
98
99 if (pImpl_ == nullptr) {
100 pImpl_ = std::make_shared<Impl>();
101 }
102
103 int32_t ret = io_uring_queue_init(URING_QUEUE_SIZE, &pImpl_->uring_, 0);
104 if (ret < 0) {
105 HILOGE("init io_uring failed, ret = %{public}d", ret);
106 return ret;
107 }
108
109 ioResultCallBack_ = *callBack;
110 stopThread_.store(false);
111 harvestThread_ = std::thread(&HyperAio::HarvestRes, this);
112 initialized_.store(true);
113 HILOGI("init hyperaio success");
114 return EOK;
115 }
116
HandleRequestError(std::vector<uint64_t> & errorVec,int32_t errorcode)117 void HyperAio::HandleRequestError(std::vector<uint64_t> &errorVec, int32_t errorcode)
118 {
119 if (errorVec.empty()) {
120 HILOGE("errorVec is empty");
121 return;
122 }
123 for (auto &userdata : errorVec) {
124 HILOGE("HandleRequestError: userData = %{private}lu", userdata);
125 auto response = std::make_unique<IoResponse>(userdata, errorcode, 0);
126 ioResultCallBack_(std::move(response));
127 }
128 errorVec.clear();
129 }
130
HandleSqeError(uint32_t count,std::vector<uint64_t> & infoVec)131 void HyperAio::HandleSqeError(uint32_t count, std::vector<uint64_t> &infoVec)
132 {
133 if (count > 0) {
134 int32_t ret = io_uring_submit(&pImpl_->uring_);
135 if (ret < 0) {
136 HILOGE("submit read reqs failed, ret = %{public}d", ret);
137 HandleRequestError(infoVec, ret);
138 }
139 readReqCount_ += count;
140 }
141 }
142
CheckParameter(uint32_t reqNum)143 int32_t HyperAio::CheckParameter(uint32_t reqNum)
144 {
145 if (pImpl_ == nullptr) {
146 HILOGE("pImpl is not initialized");
147 return -EINVAL;
148 }
149 if (!initialized_.load()) {
150 HILOGE("HyperAio is not initialized");
151 return -EPERM;
152 }
153 if (!ValidateReqNum(reqNum)) {
154 HILOGE("reqNum is out of range: %{public}u", reqNum);
155 return -EINVAL;
156 }
157 return EOK;
158 }
159
StartOpenReqs(OpenReqs * req)160 int32_t HyperAio::StartOpenReqs(OpenReqs *req)
161 {
162 if (req == nullptr || req->reqs == nullptr) {
163 HILOGE("the request is empty");
164 return -EINVAL;
165 }
166
167 int32_t ret = CheckParameter(req->reqNum);
168 if (ret < 0) {
169 return ret;
170 }
171
172 HyperaioTrace trace("StartOpenReqs" + std::to_string(req->reqNum));
173 uint32_t totalReqs = req->reqNum;
174 uint32_t count = 0;
175 std::vector<uint64_t> errorVec;
176 std::vector<uint64_t> openInfoVec;
177 for (uint32_t i = 0; i < totalReqs; i++) {
178 struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_);
179 if (sqe == nullptr) {
180 HILOGE("get sqe failed");
181 for (; i < totalReqs; ++i) {
182 errorVec.push_back(req->reqs[i].userData);
183 }
184 HandleSqeError(count, openInfoVec);
185 HandleRequestError(errorVec, -EBUSY);
186 break;
187 }
188 struct OpenInfo *openInfo = &req->reqs[i];
189 io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(openInfo->userData));
190 io_uring_prep_openat(sqe, openInfo->dfd, static_cast<const char *>(openInfo->path),
191 openInfo->flags, openInfo->mode);
192 HILOGD("open flags = %{public}d, mode = %{public}u, userData = %{private}lu",
193 openInfo->flags, openInfo->mode, openInfo->userData);
194 HyperaioTrace trace("open flags:" + std::to_string(openInfo->flags) + "mode:" + std::to_string(openInfo->mode)
195 + "userData:" + std::to_string(openInfo->userData));
196 count++;
197 openInfoVec.push_back(openInfo->userData);
198 if (count >= BATCH_SIZE || i == totalReqs - 1) {
199 int32_t ret = io_uring_submit(&pImpl_->uring_);
200 if (ret < 0) {
201 HILOGE("submit open reqs failed, ret = %{public}d", ret);
202 HandleRequestError(openInfoVec, -EBUSY);
203 }
204 openReqCount_ += count;
205 count = 0;
206 }
207 }
208 return EOK;
209 }
210
StartReadReqs(ReadReqs * req)211 int32_t HyperAio::StartReadReqs(ReadReqs *req)
212 {
213 if (req == nullptr || req->reqs == nullptr) {
214 HILOGE("the request is empty");
215 return -EINVAL;
216 }
217
218 int32_t ret = CheckParameter(req->reqNum);
219 if (ret < 0) {
220 return ret;
221 }
222 HyperaioTrace trace("StartReadReqs" + std::to_string(req->reqNum));
223 uint32_t totalReqs = req->reqNum;
224 uint32_t count = 0;
225 std::vector<uint64_t> errorVec;
226 std::vector<uint64_t> readInfoVec;
227 for (uint32_t i = 0; i < totalReqs; i++) {
228 struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_);
229 if (sqe == nullptr) {
230 HILOGE("get sqe failed");
231 for (; i < totalReqs; ++i) {
232 errorVec.push_back(req->reqs[i].userData);
233 }
234 HandleSqeError(count, readInfoVec);
235 HandleRequestError(errorVec, -EBUSY);
236 break;
237 }
238 struct ReadInfo *readInfo = &req->reqs[i];
239 io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(readInfo->userData));
240 io_uring_prep_read(sqe, readInfo->fd, readInfo->buf, readInfo->len, readInfo->offset);
241 HILOGD("read len = %{public}u, offset = %{public}lu, userData = %{private}lu",
242 readInfo->len, readInfo->offset, readInfo->userData);
243 HyperaioTrace trace("read len:" + std::to_string(readInfo->len) + "offset:" + std::to_string(readInfo->offset)
244 + "userData:" + std::to_string(readInfo->userData));
245 count++;
246 readInfoVec.push_back(readInfo->userData);
247 if (count >= BATCH_SIZE || i == totalReqs - 1) {
248 int32_t ret = io_uring_submit(&pImpl_->uring_);
249 if (ret < 0) {
250 HILOGE("submit read reqs failed, ret = %{public}d", ret);
251 HandleRequestError(readInfoVec, -EBUSY);
252 }
253 readReqCount_ += count;
254 count = 0;
255 }
256 }
257 return EOK;
258 }
259
StartCancelReqs(CancelReqs * req)260 int32_t HyperAio::StartCancelReqs(CancelReqs *req)
261 {
262 if (req == nullptr || req->reqs == nullptr) {
263 HILOGE("the request is empty");
264 return -EINVAL;
265 }
266
267 int32_t ret = CheckParameter(req->reqNum);
268 if (ret < 0) {
269 return ret;
270 }
271 HyperaioTrace trace("StartCancelReqs" + std::to_string(req->reqNum));
272 uint32_t totalReqs = req->reqNum;
273 uint32_t count = 0;
274 std::vector<uint64_t> errorVec;
275 std::vector<uint64_t> cancelInfoVec;
276 for (uint32_t i = 0; i < totalReqs; i++) {
277 struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_);
278 if (sqe == nullptr) {
279 HILOGE("get sqe failed");
280 for (; i < totalReqs; ++i) {
281 errorVec.push_back(req->reqs[i].userData);
282 }
283 HandleSqeError(count, cancelInfoVec);
284 HandleRequestError(errorVec, -EBUSY);
285 break;
286 }
287 struct CancelInfo *cancelInfo = &req->reqs[i];
288 io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(cancelInfo->userData));
289 io_uring_prep_cancel(sqe, reinterpret_cast<void *>(cancelInfo->targetUserData), 0);
290 HILOGD("cancel userData = %{private}lu, targetUserData = %{private}lu",
291 cancelInfo->userData, cancelInfo->targetUserData);
292 HyperaioTrace trace("cancel userData:" + std::to_string(cancelInfo->userData)
293 + "targetUserData:" + std::to_string(cancelInfo->targetUserData));
294 count++;
295 cancelInfoVec.push_back(cancelInfo->userData);
296 if (count >= BATCH_SIZE || i == totalReqs - 1) {
297 int32_t ret = io_uring_submit(&pImpl_->uring_);
298 if (ret < 0) {
299 HILOGE("submit cancel reqs failed, ret = %{public}d", ret);
300 HandleRequestError(cancelInfoVec, -EBUSY);
301 }
302 cancelReqCount_ += count;
303 count = 0;
304 }
305 }
306 return EOK;
307 }
308
HarvestRes()309 void HyperAio::HarvestRes()
310 {
311 if (pImpl_ == nullptr) {
312 HILOGI("pImpl is null");
313 return;
314 }
315
316 while (!stopThread_.load()) {
317 struct io_uring_cqe *cqe;
318 int32_t ret = io_uring_wait_cqe(&pImpl_->uring_, &cqe);
319 if (ret < 0 || cqe == nullptr) {
320 HILOGI("wait cqe failed, ret = %{public}d", ret);
321 continue;
322 }
323 cqeCount_++;
324 if (cqe->res < 0) {
325 HILOGI("cqe failed, cqe->res = %{public}d", cqe->res);
326 }
327 auto response = std::make_unique<IoResponse>(cqe->user_data, cqe->res, cqe->flags);
328 HyperaioTrace trace("harvest: userdata " + std::to_string(cqe->user_data)
329 + " res " + std::to_string(cqe->res) + "flags " + std::to_string(cqe->flags));
330 io_uring_cqe_seen(&pImpl_->uring_, cqe);
331 if (ioResultCallBack_) {
332 ioResultCallBack_(std::move(response));
333 }
334 }
335 HILOGI("exit harvest thread");
336 }
337
DestroyCtx()338 int32_t HyperAio::DestroyCtx()
339 {
340 HILOGI("openReqCount = %{public}u, readReqCount = %{public}u, cancelReqCount = %{public}u, cqeCount = %{public}u",
341 openReqCount_.load(), readReqCount_.load(), cancelReqCount_.load(), cqeCount_.load());
342 if (!initialized_.load()) {
343 HILOGI("not initialized");
344 return EOK;
345 }
346
347 stopThread_.store(true);
348 if (harvestThread_.joinable()) {
349 HILOGI("start harvest thread join");
350 harvestThread_.join();
351 // This log is only printed after join() completes successfully
352 HILOGI("join success");
353 }
354
355 if (pImpl_ != nullptr) {
356 io_uring_queue_exit(&pImpl_->uring_);
357 }
358
359 initialized_.store(false);
360 HILOGI("destroy hyperaio success");
361 return EOK;
362 }
363 #else
364
365 uint32_t HyperAio::SupportIouring()
366 {
367 return 0;
368 }
369 int32_t HyperAio::CtxInit(ProcessIoResultCallBack *callBack)
370 {
371 return -ENOTSUP;
372 }
373 int32_t HyperAio::StartReadReqs(ReadReqs *req)
374 {
375 return -ENOTSUP;
376 }
377 int32_t HyperAio::StartOpenReqs(OpenReqs *req)
378 {
379 return -ENOTSUP;
380 }
381 int32_t HyperAio::StartCancelReqs(CancelReqs *req)
382 {
383 return -ENOTSUP;
384 }
385 int32_t HyperAio::DestroyCtx()
386 {
387 return -ENOTSUP;
388 }
389 #endif
390 }
391 }