• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "hyperaio.h"
17 
18 #include <chrono>
19 #include <thread>
20 #include "accesstoken_kit.h"
21 #include "hyperaio_trace.h"
22 #include "libhilog.h"
23 #include "ipc_skeleton.h"
24 #ifdef HYPERAIO_USE_LIBURING
25 #include "liburing.h"
26 #endif
27 namespace OHOS {
28 namespace HyperAio {
29 #ifdef HYPERAIO_USE_LIBURING
30 const uint32_t URING_QUEUE_SIZE = 512;
31 const uint32_t DELAY = 20;
32 const uint32_t BATCH_SIZE = 128;
33 const uint32_t RETRIES = 3;
34 std::atomic<uint32_t> openReqCount_{0};
35 std::atomic<uint32_t> readReqCount_{0};
36 std::atomic<uint32_t> cancelReqCount_{0};
37 std::atomic<uint32_t> cqeCount_{0};
38 class HyperAio::Impl {
39 public:
40     io_uring uring_;
41 };
42 
HasAccessIouringPermission()43 static bool HasAccessIouringPermission()
44 {
45     Security::AccessToken::AccessTokenID tokenCaller = IPCSkeleton::GetCallingTokenID();
46     const std::string permissionName = "ohos.permission.ALLOW_IOURING";
47     int32_t res = Security::AccessToken::AccessTokenKit::VerifyAccessToken(tokenCaller, permissionName);
48     if (res != Security::AccessToken::PermissionState::PERMISSION_GRANTED) {
49         HILOGE("have no ALLOW_IOURING permission");
50         return false;
51     }
52 
53     return true;
54 }
55 
ValidateReqNum(uint32_t reqNum)56 static bool ValidateReqNum(uint32_t reqNum)
57 {
58     return reqNum > 0 && reqNum <= URING_QUEUE_SIZE;
59 }
60 
SupportIouring()61 uint32_t HyperAio::SupportIouring()
62 {
63     HyperaioTrace trace("SupportIouring");
64     uint32_t flags = 0;
65     if (HasAccessIouringPermission()) {
66         flags |= IOURING_APP_PERMISSION;
67     }
68 
69     return flags;
70 }
71 
GetSqeWithRetry(struct io_uring * ring)72 struct io_uring_sqe* GetSqeWithRetry(struct io_uring *ring)
73 {
74     struct io_uring_sqe *sqe;
75     for (uint32_t i = 0; i < RETRIES; i++) {
76         sqe = io_uring_get_sqe(ring);
77         if (sqe != nullptr) {
78             return sqe;
79         }
80         io_uring_submit(ring);
81         std::this_thread::sleep_for(std::chrono::milliseconds(DELAY));
82     }
83     return nullptr;
84 }
85 
CtxInit(ProcessIoResultCallBack * callBack)86 int32_t HyperAio::CtxInit(ProcessIoResultCallBack *callBack)
87 {
88     HyperaioTrace trace("CtxInit");
89     if (initialized_.load()) {
90         HILOGE("HyperAio has been initialized");
91         return EOK;
92     }
93 
94     if (callBack == nullptr) {
95         HILOGE("callBack is null");
96         return -EINVAL;
97     }
98 
99     if (pImpl_ == nullptr) {
100         pImpl_ = std::make_shared<Impl>();
101     }
102 
103     int32_t ret = io_uring_queue_init(URING_QUEUE_SIZE, &pImpl_->uring_, 0);
104     if (ret < 0) {
105         HILOGE("init io_uring failed, ret = %{public}d", ret);
106         return ret;
107     }
108 
109     ioResultCallBack_ = *callBack;
110     stopThread_.store(false);
111     harvestThread_ = std::thread(&HyperAio::HarvestRes, this);
112     initialized_.store(true);
113     HILOGI("init hyperaio success");
114     return EOK;
115 }
116 
HandleRequestError(std::vector<uint64_t> & errorVec,int32_t errorcode)117 void HyperAio::HandleRequestError(std::vector<uint64_t> &errorVec, int32_t errorcode)
118 {
119     if (errorVec.empty()) {
120         HILOGE("errorVec is empty");
121         return;
122     }
123     for (auto &userdata : errorVec) {
124         HILOGE("HandleRequestError: userData = %{private}lu", userdata);
125         auto response = std::make_unique<IoResponse>(userdata, errorcode, 0);
126         ioResultCallBack_(std::move(response));
127     }
128     errorVec.clear();
129 }
130 
HandleSqeError(uint32_t count,std::vector<uint64_t> & infoVec)131 void HyperAio::HandleSqeError(uint32_t count, std::vector<uint64_t> &infoVec)
132 {
133     if (count > 0) {
134         int32_t ret = io_uring_submit(&pImpl_->uring_);
135         if (ret < 0) {
136             HILOGE("submit read reqs failed, ret = %{public}d", ret);
137             HandleRequestError(infoVec, ret);
138         }
139         readReqCount_ += count;
140     }
141 }
142 
CheckParameter(uint32_t reqNum)143 int32_t HyperAio::CheckParameter(uint32_t reqNum)
144 {
145     if (pImpl_ == nullptr) {
146         HILOGE("pImpl is not initialized");
147         return -EINVAL;
148     }
149     if (!initialized_.load()) {
150         HILOGE("HyperAio is not initialized");
151         return -EPERM;
152     }
153     if (!ValidateReqNum(reqNum)) {
154         HILOGE("reqNum is out of range: %{public}u", reqNum);
155         return -EINVAL;
156     }
157     return EOK;
158 }
159 
StartOpenReqs(OpenReqs * req)160 int32_t HyperAio::StartOpenReqs(OpenReqs *req)
161 {
162     if (req == nullptr || req->reqs == nullptr) {
163         HILOGE("the request is empty");
164         return -EINVAL;
165     }
166 
167     int32_t ret = CheckParameter(req->reqNum);
168     if (ret < 0) {
169         return ret;
170     }
171 
172     HyperaioTrace trace("StartOpenReqs" + std::to_string(req->reqNum));
173     uint32_t totalReqs = req->reqNum;
174     uint32_t count = 0;
175     std::vector<uint64_t> errorVec;
176     std::vector<uint64_t> openInfoVec;
177     for (uint32_t i = 0; i < totalReqs; i++) {
178         struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_);
179         if (sqe == nullptr) {
180             HILOGE("get sqe failed");
181             for (; i < totalReqs; ++i) {
182                 errorVec.push_back(req->reqs[i].userData);
183             }
184             HandleSqeError(count, openInfoVec);
185             HandleRequestError(errorVec, -EBUSY);
186             break;
187         }
188         struct OpenInfo *openInfo = &req->reqs[i];
189         io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(openInfo->userData));
190         io_uring_prep_openat(sqe, openInfo->dfd, static_cast<const char *>(openInfo->path),
191             openInfo->flags, openInfo->mode);
192         HILOGD("open flags = %{public}d, mode = %{public}u, userData = %{private}lu",
193             openInfo->flags, openInfo->mode, openInfo->userData);
194         HyperaioTrace trace("open flags:" + std::to_string(openInfo->flags) + "mode:" + std::to_string(openInfo->mode)
195             + "userData:" + std::to_string(openInfo->userData));
196         count++;
197         openInfoVec.push_back(openInfo->userData);
198         if (count >= BATCH_SIZE || i == totalReqs - 1) {
199             int32_t ret = io_uring_submit(&pImpl_->uring_);
200             if (ret < 0) {
201                 HILOGE("submit open reqs failed, ret = %{public}d", ret);
202                 HandleRequestError(openInfoVec, -EBUSY);
203             }
204             openReqCount_ += count;
205             count = 0;
206         }
207     }
208     return EOK;
209 }
210 
StartReadReqs(ReadReqs * req)211 int32_t HyperAio::StartReadReqs(ReadReqs *req)
212 {
213     if (req == nullptr || req->reqs == nullptr) {
214         HILOGE("the request is empty");
215         return -EINVAL;
216     }
217 
218     int32_t ret = CheckParameter(req->reqNum);
219     if (ret < 0) {
220         return ret;
221     }
222     HyperaioTrace trace("StartReadReqs" + std::to_string(req->reqNum));
223     uint32_t totalReqs = req->reqNum;
224     uint32_t count = 0;
225     std::vector<uint64_t> errorVec;
226     std::vector<uint64_t> readInfoVec;
227     for (uint32_t i = 0; i < totalReqs; i++) {
228         struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_);
229         if (sqe == nullptr) {
230             HILOGE("get sqe failed");
231             for (; i < totalReqs; ++i) {
232                 errorVec.push_back(req->reqs[i].userData);
233             }
234             HandleSqeError(count, readInfoVec);
235             HandleRequestError(errorVec, -EBUSY);
236             break;
237         }
238         struct ReadInfo *readInfo = &req->reqs[i];
239         io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(readInfo->userData));
240         io_uring_prep_read(sqe, readInfo->fd, readInfo->buf, readInfo->len, readInfo->offset);
241         HILOGD("read len = %{public}u, offset = %{public}lu, userData = %{private}lu",
242             readInfo->len, readInfo->offset, readInfo->userData);
243         HyperaioTrace trace("read len:" + std::to_string(readInfo->len) + "offset:" + std::to_string(readInfo->offset)
244             + "userData:" + std::to_string(readInfo->userData));
245         count++;
246         readInfoVec.push_back(readInfo->userData);
247         if (count >= BATCH_SIZE || i == totalReqs - 1) {
248             int32_t ret = io_uring_submit(&pImpl_->uring_);
249             if (ret < 0) {
250                 HILOGE("submit read reqs failed, ret = %{public}d", ret);
251                 HandleRequestError(readInfoVec, -EBUSY);
252             }
253             readReqCount_ += count;
254             count = 0;
255         }
256     }
257     return EOK;
258 }
259 
StartCancelReqs(CancelReqs * req)260 int32_t HyperAio::StartCancelReqs(CancelReqs *req)
261 {
262     if (req == nullptr || req->reqs == nullptr) {
263         HILOGE("the request is empty");
264         return -EINVAL;
265     }
266 
267     int32_t ret = CheckParameter(req->reqNum);
268     if (ret < 0) {
269         return ret;
270     }
271     HyperaioTrace trace("StartCancelReqs" + std::to_string(req->reqNum));
272     uint32_t totalReqs = req->reqNum;
273     uint32_t count = 0;
274     std::vector<uint64_t> errorVec;
275     std::vector<uint64_t> cancelInfoVec;
276     for (uint32_t i = 0; i < totalReqs; i++) {
277         struct io_uring_sqe *sqe = GetSqeWithRetry(&pImpl_->uring_);
278         if (sqe == nullptr) {
279             HILOGE("get sqe failed");
280             for (; i < totalReqs; ++i) {
281                 errorVec.push_back(req->reqs[i].userData);
282             }
283             HandleSqeError(count, cancelInfoVec);
284             HandleRequestError(errorVec, -EBUSY);
285             break;
286         }
287         struct CancelInfo *cancelInfo = &req->reqs[i];
288         io_uring_sqe_set_data(sqe, reinterpret_cast<void *>(cancelInfo->userData));
289         io_uring_prep_cancel(sqe, reinterpret_cast<void *>(cancelInfo->targetUserData), 0);
290         HILOGD("cancel userData = %{private}lu,  targetUserData = %{private}lu",
291             cancelInfo->userData, cancelInfo->targetUserData);
292         HyperaioTrace trace("cancel userData:" + std::to_string(cancelInfo->userData)
293             + "targetUserData:" + std::to_string(cancelInfo->targetUserData));
294         count++;
295         cancelInfoVec.push_back(cancelInfo->userData);
296         if (count >= BATCH_SIZE || i == totalReqs - 1) {
297             int32_t ret = io_uring_submit(&pImpl_->uring_);
298             if (ret < 0) {
299                 HILOGE("submit cancel reqs failed, ret = %{public}d", ret);
300                 HandleRequestError(cancelInfoVec, -EBUSY);
301             }
302             cancelReqCount_ += count;
303             count = 0;
304         }
305     }
306     return EOK;
307 }
308 
HarvestRes()309 void HyperAio::HarvestRes()
310 {
311     if (pImpl_ == nullptr) {
312         HILOGI("pImpl is null");
313         return;
314     }
315 
316     while (!stopThread_.load()) {
317         struct io_uring_cqe *cqe;
318         int32_t ret = io_uring_wait_cqe(&pImpl_->uring_, &cqe);
319         if (ret < 0 || cqe == nullptr) {
320             HILOGI("wait cqe failed, ret = %{public}d", ret);
321             continue;
322         }
323         cqeCount_++;
324         if (cqe->res < 0) {
325             HILOGI("cqe failed, cqe->res = %{public}d", cqe->res);
326         }
327         auto response = std::make_unique<IoResponse>(cqe->user_data, cqe->res, cqe->flags);
328         HyperaioTrace trace("harvest: userdata " + std::to_string(cqe->user_data)
329             + " res " + std::to_string(cqe->res) + "flags " + std::to_string(cqe->flags));
330         io_uring_cqe_seen(&pImpl_->uring_, cqe);
331         if (ioResultCallBack_) {
332             ioResultCallBack_(std::move(response));
333         }
334     }
335     HILOGI("exit harvest thread");
336 }
337 
DestroyCtx()338 int32_t HyperAio::DestroyCtx()
339 {
340     HILOGI("openReqCount = %{public}u, readReqCount = %{public}u, cancelReqCount = %{public}u, cqeCount = %{public}u",
341         openReqCount_.load(), readReqCount_.load(), cancelReqCount_.load(), cqeCount_.load());
342     if (!initialized_.load()) {
343         HILOGI("not initialized");
344         return EOK;
345     }
346 
347     stopThread_.store(true);
348     if (harvestThread_.joinable()) {
349         HILOGI("start harvest thread join");
350         harvestThread_.join();
351         // This log is only printed after join() completes successfully
352         HILOGI("join success");
353     }
354 
355     if (pImpl_ != nullptr) {
356         io_uring_queue_exit(&pImpl_->uring_);
357     }
358 
359     initialized_.store(false);
360     HILOGI("destroy hyperaio success");
361     return EOK;
362 }
363 #else
364 
365 uint32_t HyperAio::SupportIouring()
366 {
367     return 0;
368 }
369 int32_t HyperAio::CtxInit(ProcessIoResultCallBack *callBack)
370 {
371     return -ENOTSUP;
372 }
373 int32_t HyperAio::StartReadReqs(ReadReqs *req)
374 {
375     return -ENOTSUP;
376 }
377 int32_t HyperAio::StartOpenReqs(OpenReqs *req)
378 {
379     return -ENOTSUP;
380 }
381 int32_t HyperAio::StartCancelReqs(CancelReqs *req)
382 {
383     return -ENOTSUP;
384 }
385 int32_t HyperAio::DestroyCtx()
386 {
387     return -ENOTSUP;
388 }
389 #endif
390 }
391 }