• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef HDI_SHARED_MEM_QUEUE_INF_H
17 #define HDI_SHARED_MEM_QUEUE_INF_H
18 
19 #include <ashmem.h>
20 #include <atomic>
21 #include <cerrno>
22 #include <datetime_ex.h>
23 #include <hdf_base.h>
24 #include <hdf_log.h>
25 #include <hdi_smq_meta.h>
26 #include <hdi_smq_syncer.h>
27 #include <memory>
28 #include <securec.h>
29 #include <stdint.h>
30 #include <string.h>
31 #include <sys/mman.h>
32 
33 #ifndef PAGE_SIZE
34 #define PAGE_SIZE 4096
35 #endif
36 
37 #ifndef HDF_LOG_TAG
38 #define HDF_LOG_TAG smq
39 #endif
40 
41 namespace OHOS {
42 namespace HDI {
43 namespace Base {
44 template <typename T>
45 class SharedMemQueue {
46 public:
47     SharedMemQueue(uint32_t elementCount, SmqType type);
48     explicit SharedMemQueue(const SharedMemQueueMeta<T> &meta);
49     ~SharedMemQueue();
50 
51     int Write(const T *data, size_t count);
52     int Read(T *data, size_t count);
53 
54     int Write(const T *data);
55     int Read(T *data);
56 
57     int Write(const T *data, size_t count, int64_t waitTimeNanoSec);
58     int Read(T *data, size_t count, int64_t waitTimeNanoSec);
59 
60     int WriteNonBlocking(const T *data);
61     int ReadNonBlocking(T *data);
62 
63     int WriteNonBlocking(const T *data, size_t count);
64     int ReadNonBlocking(T *data, size_t count);
65 
66     size_t GetAvalidWriteSize();
67     size_t GetAvalidReadSize();
68     size_t GetSize();
69     std::shared_ptr<SharedMemQueueMeta<T>> GetMeta();
70     bool IsGood();
GetNanoTime()71     static inline int64_t GetNanoTime()
72     {
73         struct timespec ts;
74         clock_gettime(CLOCK_MONOTONIC, &ts);
75         return (ts.tv_sec * SEC_TO_NANOSEC + ts.tv_nsec);
76     }
77 
78 private:
79     void Init(bool resetWriteOffset);
80     uintptr_t MapMemZone(uint32_t zoneType);
81     void UnMapMemZone(void *addr, uint32_t zoneType);
82     size_t Align(size_t num, size_t alignSize);
83 
84     int32_t status = HDF_FAILURE;
85     size_t alignedElmtSize_;
86     uint8_t *queueBuffer_ = nullptr;
87     std::atomic<uint64_t> *readOffset_ = nullptr;
88     std::atomic<uint64_t> *writeOffset_ = nullptr;
89     std::atomic<uint32_t> *syncerPtr_ = nullptr;
90     std::unique_ptr<SharedMemQueueSyncer> syncer_ = nullptr;
91     std::shared_ptr<SharedMemQueueMeta<T>> meta_ = nullptr;
92 };
93 
94 template <typename T>
SharedMemQueue(uint32_t elementCount,SmqType type)95 SharedMemQueue<T>::SharedMemQueue(uint32_t elementCount, SmqType type) : alignedElmtSize_(0)
96 {
97     if (elementCount > UINT16_MAX) {
98         return;
99     }
100 
101     meta_ = std::make_shared<SharedMemQueueMeta<T>>(elementCount, type);
102     HDF_LOGI("create SharedMemQueue, count=%{public}u, size=%{public}u", elementCount, meta_->GetSize());
103     int ashmemFd = AshmemCreate("hdi_smq", Align(meta_->GetSize(), PAGE_SIZE));
104     if (ashmemFd < 0) {
105         HDF_LOGE("failed to create ashmem");
106         return;
107     }
108     meta_->SetFd(ashmemFd);
109     alignedElmtSize_ = meta_->GetElemenetSize();
110     Init(true);
111 }
112 
113 template <typename T>
SharedMemQueue(const SharedMemQueueMeta<T> & meta)114 SharedMemQueue<T>::SharedMemQueue(const SharedMemQueueMeta<T> &meta) : alignedElmtSize_(meta.GetElemenetSize())
115 {
116     meta_ = std::make_shared<SharedMemQueueMeta<T>>(meta);
117     Init(false);
118 }
119 
120 template <typename T>
~SharedMemQueue()121 SharedMemQueue<T>::~SharedMemQueue()
122 {
123     if (meta_ != nullptr && meta_->GetType() == SYNCED_SMQ && readOffset_ != nullptr) {
124         UnMapMemZone(readOffset_, SharedMemQueueMeta<T>::MemZoneType::MEMZONE_RPTR);
125     } else {
126         delete readOffset_;
127         readOffset_ = nullptr;
128     }
129 
130     if (writeOffset_ != nullptr) {
131         UnMapMemZone(writeOffset_, SharedMemQueueMeta<T>::MEMZONE_WPTR);
132     }
133 
134     if (syncerPtr_ != nullptr) {
135         UnMapMemZone(syncerPtr_, SharedMemQueueMeta<T>::MEMZONE_SYNCER);
136     }
137 
138     if (queueBuffer_ != nullptr) {
139         UnMapMemZone(queueBuffer_, SharedMemQueueMeta<T>::MEMZONE_DATA);
140     }
141 }
142 
143 template <typename T>
Init(bool resetWriteOffset)144 void SharedMemQueue<T>::Init(bool resetWriteOffset)
145 {
146     if (meta_ == nullptr || meta_->GetElemenetSize() != sizeof(T)) {
147         HDF_LOGE("invalid smq meta for init");
148         return;
149     }
150 
151     if (meta_->GetType() == SYNCED_SMQ) {
152         readOffset_ = reinterpret_cast<std::atomic<uint64_t> *>(MapMemZone(SharedMemQueueMeta<T>::MEMZONE_RPTR));
153     } else {
154         readOffset_ = new std::atomic<uint64_t>;
155     }
156 
157     if (readOffset_ == nullptr) {
158         HDF_LOGE("failed to map read offset");
159         return;
160     }
161 
162     writeOffset_ = reinterpret_cast<std::atomic<uint64_t> *>(MapMemZone(SharedMemQueueMeta<T>::MEMZONE_WPTR));
163     if (writeOffset_ == nullptr) {
164         HDF_LOGE("failed to map write offset");
165         return;
166     }
167 
168     syncerPtr_ = reinterpret_cast<std::atomic<uint32_t> *>(MapMemZone(SharedMemQueueMeta<T>::MEMZONE_SYNCER));
169     if (syncerPtr_ == nullptr) {
170         HDF_LOGE("failed to map sync ptr");
171         return;
172     }
173 
174     queueBuffer_ = reinterpret_cast<uint8_t *>(MapMemZone(SharedMemQueueMeta<T>::MEMZONE_DATA));
175     if (queueBuffer_ == nullptr) {
176         HDF_LOGE("failed to map queue buffer");
177         return;
178     }
179 
180     syncer_ = std::make_unique<SharedMemQueueSyncer>(syncerPtr_);
181 
182     if (resetWriteOffset) {
183         writeOffset_->store(0, std::memory_order_release);
184     }
185     readOffset_->store(0, std::memory_order_release);
186     HDF_LOGI("smq init succ");
187     status = HDF_SUCCESS;
188 }
189 
190 template <typename T>
MapMemZone(uint32_t zoneType)191 uintptr_t SharedMemQueue<T>::MapMemZone(uint32_t zoneType)
192 {
193     auto memzone = meta_->GetMemZone(zoneType);
194     if (memzone == nullptr) {
195         HDF_LOGE("invalid smq mem zone type %{public}u", zoneType);
196         return reinterpret_cast<uintptr_t>(nullptr);
197     }
198 
199     int offset = (memzone->offset / PAGE_SIZE) * PAGE_SIZE;
200     int length = memzone->offset - offset + memzone->size;
201 
202     void *ptr = mmap(0, length, PROT_READ | PROT_WRITE, MAP_SHARED, meta_->GetFd(), offset);
203     if (ptr == MAP_FAILED) {
204         HDF_LOGE(
205             "failed to map memzone %{public}u, size %{public}u, offset %{public}u , fd %{public}d, errnor=%{public}d",
206             zoneType, length, offset, meta_->GetFd(), errno);
207         return reinterpret_cast<uintptr_t>(nullptr);
208     }
209     return (reinterpret_cast<uintptr_t>(ptr) + (memzone->offset - offset));
210 }
211 
212 template <typename T>
UnMapMemZone(void * addr,uint32_t zoneType)213 void SharedMemQueue<T>::UnMapMemZone(void *addr, uint32_t zoneType)
214 {
215     auto memzone = meta_->GetMemZone(zoneType);
216     if (memzone == nullptr) {
217         return;
218     }
219     int offset = (memzone->offset / PAGE_SIZE) * PAGE_SIZE;
220     int length = memzone->offset - offset + memzone->size;
221     uint8_t *ptr = reinterpret_cast<uint8_t *>(addr) - (memzone->offset - offset);
222     if (ptr == nullptr) {
223         return;
224     }
225     munmap(ptr, length);
226 }
227 
228 template <typename T>
IsGood()229 bool SharedMemQueue<T>::IsGood()
230 {
231     return status == HDF_SUCCESS;
232 }
233 
234 template <typename T>
Align(size_t num,size_t alignSize)235 size_t SharedMemQueue<T>::Align(size_t num, size_t alignSize)
236 {
237     return (num + alignSize - 1) & ~(alignSize - 1);
238 }
239 
240 template <typename T>
Write(const T * data,size_t count)241 int SharedMemQueue<T>::Write(const T *data, size_t count)
242 {
243     return Write(data, count, 0);
244 }
245 
246 template <typename T>
Read(T * data,size_t count)247 int SharedMemQueue<T>::Read(T *data, size_t count)
248 {
249     return Read(data, count, 0);
250 }
251 
252 template <typename T>
Write(const T * data)253 int SharedMemQueue<T>::Write(const T *data)
254 {
255     return Write(data, 1, 0);
256 }
257 
258 template <typename T>
Read(T * data)259 int SharedMemQueue<T>::Read(T *data)
260 {
261     return Read(data, 1, 0);
262 }
263 
264 template <typename T>
Write(const T * data,size_t count,int64_t waitTimeNanoSec)265 int SharedMemQueue<T>::Write(const T *data, size_t count, int64_t waitTimeNanoSec)
266 {
267     if (meta_->GetType() != SmqType::SYNCED_SMQ) {
268         HDF_LOGE("unsynecd smq not support blocking write");
269         return HDF_ERR_NOT_SUPPORT;
270     }
271 
272     if (WriteNonBlocking(data, count) == 0) {
273         return syncer_->Wake(SharedMemQueueSyncer::SYNC_WORD_READ);
274     }
275 
276     int ret = 0;
277     auto startTime = GetNanoTime();
278     uint64_t currentTime = startTime;
279     while (true) {
280         if (waitTimeNanoSec != 0) {
281             currentTime = GetNanoTime();
282             if (GetNanoTime() - startTime >= waitTimeNanoSec) {
283                 ret = WriteNonBlocking(data, count);
284                 break;
285             }
286             waitTimeNanoSec -= currentTime - startTime;
287         }
288         ret = syncer_->Wait(SharedMemQueueSyncer::SYNC_WORD_WRITE, waitTimeNanoSec);
289         if (ret != 0 && ret != -ETIMEDOUT) {
290             break;
291         }
292 
293         ret = WriteNonBlocking(data, count);
294         if (ret == 0) {
295             break;
296         }
297         HDF_LOGE("failed to write %{public}zu, retry", count);
298     }
299 
300     if (ret == 0) {
301         ret = syncer_->Wake(SharedMemQueueSyncer::SYNC_WORD_READ);
302     } else {
303         HDF_LOGE("failed to write %{public}zu, ret=%{public}d", count, ret);
304     }
305 
306     return ret;
307 }
308 
309 template <typename T>
Read(T * data,size_t count,int64_t waitTimeNanoSec)310 int SharedMemQueue<T>::Read(T *data, size_t count, int64_t waitTimeNanoSec)
311 {
312     if (meta_->GetType() != SmqType::SYNCED_SMQ) {
313         HDF_LOGE("unsynecd smq not support blocking read");
314         return HDF_ERR_NOT_SUPPORT;
315     }
316 
317     if (ReadNonBlocking(data, count) == 0) {
318         return syncer_->Wake(SharedMemQueueSyncer::SYNC_WORD_WRITE);
319     }
320 
321     int ret = -ENODATA;
322     auto startTime = GetNanoTime();
323     int64_t currentTime;
324     while (true) {
325         if (waitTimeNanoSec != 0) {
326             currentTime = GetNanoTime();
327             if (currentTime - startTime >= waitTimeNanoSec) {
328                 ret = ReadNonBlocking(data, count);
329                 break;
330             }
331             waitTimeNanoSec -= currentTime - startTime;
332         }
333         ret = syncer_->Wait(SharedMemQueueSyncer::SYNC_WORD_READ, waitTimeNanoSec);
334         if (ret != 0 && ret != -ETIMEDOUT) {
335             break;
336         }
337 
338         ret = ReadNonBlocking(data, count);
339         if (ret == 0) {
340             break;
341         }
342     }
343     if (ret == 0) {
344         ret = syncer_->Wake(SharedMemQueueSyncer::SYNC_WORD_WRITE);
345     } else {
346         HDF_LOGE("failed to read %{public}zu, ret=%{public}d", count, ret);
347     }
348 
349     return ret;
350 }
351 
352 template <typename T>
WriteNonBlocking(const T * data)353 int SharedMemQueue<T>::WriteNonBlocking(const T *data)
354 {
355     return WriteNonBlocking(data, 1);
356 }
357 
358 template <typename T>
ReadNonBlocking(T * data)359 int SharedMemQueue<T>::ReadNonBlocking(T *data)
360 {
361     return ReadNonBlocking(data, 1);
362 }
363 
364 template <typename T>
WriteNonBlocking(const T * data,size_t count)365 int SharedMemQueue<T>::WriteNonBlocking(const T *data, size_t count)
366 {
367     auto avalidWrite = GetAvalidWriteSize();
368     if (count >= avalidWrite && meta_->GetType() == SmqType::SYNCED_SMQ) {
369         // synced smq can not overflow write
370         return -E2BIG;
371     }
372 
373     auto wOffset = writeOffset_->load(std::memory_order_acquire);
374     auto rOffset = readOffset_->load(std::memory_order_acquire);
375     uint64_t newWriteOffset;
376     auto qCount = meta_->GetElementCount();
377     if (wOffset + count <= qCount) {
378         if (memcpy_s(queueBuffer_ + (wOffset * sizeof(T)), (qCount - wOffset) * sizeof(T),
379             data, count * sizeof(T)) != EOK) {
380             return HDF_FAILURE;
381         };
382         newWriteOffset = (wOffset + count) % qCount;
383     } else {
384         size_t firstPartSize = qCount - wOffset;
385         size_t secParcSize = count - firstPartSize;
386         if (memcpy_s(queueBuffer_ + (wOffset * sizeof(T)), (qCount - wOffset) * sizeof(T),
387             data, firstPartSize * sizeof(T)) != EOK) {
388             return HDF_FAILURE;
389         }
390         if (memcpy_s(queueBuffer_, qCount * sizeof(T), data + firstPartSize, secParcSize * sizeof(T)) != EOK) {
391             return HDF_FAILURE;
392         }
393         newWriteOffset = secParcSize;
394     }
395 
396     writeOffset_->store(newWriteOffset, std::memory_order_release);
397     if (wOffset < rOffset && newWriteOffset >= rOffset) {
398         HDF_LOGW("warning:smp ring buffer overflow");
399     }
400     return 0;
401 }
402 
403 template <typename T>
ReadNonBlocking(T * data,size_t count)404 int SharedMemQueue<T>::ReadNonBlocking(T *data, size_t count)
405 {
406     if (count == 0) {
407         return -EINVAL;
408     }
409 
410     if (count > GetAvalidReadSize()) {
411         return -ENODATA;
412     }
413 
414     auto qCount = meta_->GetElementCount();
415     auto rOffset = readOffset_->load(std::memory_order_acquire);
416     if (rOffset + count <= qCount) {
417         if (memcpy_s(data, count * sizeof(T), queueBuffer_ + (rOffset * sizeof(T)), count * sizeof(T)) != EOK) {
418             return HDF_FAILURE;
419         }
420         readOffset_->store((rOffset + count) % qCount, std::memory_order_release);
421         return 0;
422     }
423 
424     size_t firstPartSize = qCount - rOffset;
425     size_t secPartSize = count - firstPartSize;
426 
427     if (memcpy_s(data, count * sizeof(T), queueBuffer_ + (rOffset * sizeof(T)), firstPartSize * sizeof(T)) != EOK) {
428         return HDF_FAILURE;
429     }
430     if (memcpy_s(data + firstPartSize, (count - firstPartSize) * sizeof(T),
431         queueBuffer_, secPartSize * sizeof(T)) != EOK) {
432         return HDF_FAILURE;
433     };
434     readOffset_->store(secPartSize, std::memory_order_release);
435 
436     return 0;
437 }
438 
439 template <typename T>
GetAvalidWriteSize()440 size_t SharedMemQueue<T>::GetAvalidWriteSize()
441 {
442     return meta_->GetElementCount() - GetAvalidReadSize();
443 }
444 
445 template <typename T>
GetAvalidReadSize()446 size_t SharedMemQueue<T>::GetAvalidReadSize()
447 {
448     auto wOffset = writeOffset_->load(std::memory_order_acquire);
449     auto rOffset = readOffset_->load(std::memory_order_acquire);
450     auto size = wOffset >= rOffset ? (wOffset - rOffset) : (wOffset + meta_->GetElementCount() - rOffset);
451     return size;
452 }
453 
454 template <typename T>
GetSize()455 size_t SharedMemQueue<T>::GetSize()
456 {
457     return meta_->GetSize();
458 }
459 
460 template <typename T>
GetMeta()461 std::shared_ptr<SharedMemQueueMeta<T>> SharedMemQueue<T>::GetMeta()
462 {
463     return meta_;
464 }
465 } // namespace Base
466 } // namespace HDI
467 } // namespace OHOS
468 
469 #ifdef HDF_LOG_TAG
470 #undef HDF_LOG_TAG
471 #endif
472 
473 #endif /* HDI_SHARED_MEM_QUEUEHDI_INF_H */
474