1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "share_memory_block.h"
17
18 #include <cstring>
19 #include <fcntl.h>
20 #include <sys/mman.h>
21 #include <sys/syscall.h>
22 #include <unistd.h>
23 #include "ashmem.h"
24 #include "logging.h"
25 #include "securec.h"
26
27 namespace {
28 const int PIECE_HEAD_LEN = 4;
29 constexpr uint32_t INVALID_LENGTH = (uint32_t)-1;
30 constexpr uint32_t TIMEOUT_SEC = 1;
31 const int WAIT_RELEASE_TIMEOUT_US = 10;
32 static std::atomic<int> g_timeCount = 0;
33 #ifndef PAGE_SIZE
34 constexpr uint32_t PAGE_SIZE = 4096;
35 #endif
36 } // namespace
37
38 struct PthreadLocker {
PthreadLockerPthreadLocker39 explicit PthreadLocker(pthread_mutex_t& mutex) : mutex_(mutex)
40 {
41 pthread_mutex_lock(&mutex_);
42 }
43
~PthreadLockerPthreadLocker44 ~PthreadLocker()
45 {
46 pthread_mutex_unlock(&mutex_);
47 }
48
49 private:
50 pthread_mutex_t& mutex_;
51 };
52
ShareMemoryBlock()53 ShareMemoryBlock::ShareMemoryBlock()
54 : fileDescriptor_(-1),
55 memoryPoint_(nullptr),
56 memorySize_(0),
57 memoryName_(),
58 header_(nullptr),
59 reusePloicy_(ReusePolicy::DROP_NONE)
60 {
61 }
62
CreateBlockWithFd(std::string name,uint32_t size,int fd)63 bool ShareMemoryBlock::CreateBlockWithFd(std::string name, uint32_t size, int fd)
64 {
65 CHECK_TRUE(fd >= 0, false, "CreateBlock FAIL SYS_memfd_create");
66
67 auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
68 if (ptr == MAP_FAILED) {
69 const int bufSize = 256;
70 char buf[bufSize] = {0};
71 strerror_r(errno, buf, bufSize);
72 PROFILER_LOG_ERROR(LOG_CORE, "CreateBlockWithFd mmap ERR : %s", buf);
73 return false;
74 }
75
76 fileDescriptor_ = fd;
77 memoryPoint_ = ptr;
78 memorySize_ = size;
79
80 memoryName_ = name;
81 header_ = reinterpret_cast<BlockHeader*>(ptr);
82
83 // Reserve 4 bytes to fill the message length.
84 messageWriteOffset_ = PIECE_HEAD_LEN;
85 // Functions required to bind the BaseMessage class.
86 smbCtx_.block = this;
87 smbCtx_.ctx.getMemory = [](RandomWriteCtx* ctx, uint32_t size, uint8_t** memory, uint32_t* offset) -> bool {
88 ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
89 return smbCtx->block->GetMemory(size, memory, offset);
90 };
91 smbCtx_.ctx.seek = [](RandomWriteCtx* ctx, uint32_t offset) -> bool {
92 ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
93 return smbCtx->block->Seek(offset);
94 };
95 return true;
96 }
97
CreateBlock(std::string name,uint32_t size)98 bool ShareMemoryBlock::CreateBlock(std::string name, uint32_t size)
99 {
100 PROFILER_LOG_INFO(LOG_CORE, "CreateBlock %s %d", name.c_str(), size);
101 CHECK_TRUE(size > sizeof(BlockHeader), false, "size %u too less!", size);
102 CHECK_TRUE(size % PAGE_SIZE == 0, false, "size %u not times of %d!", size, PAGE_SIZE);
103
104 int fd = OHOS::AshmemCreate(name.c_str(), size);
105 CHECK_TRUE(fd >= 0, false, "OHOS::AshmemCreate fail.");
106
107 int check = OHOS::AshmemSetProt(fd, PROT_READ | PROT_WRITE);
108 if (check < 0) {
109 close(fd);
110 const int bufSize = 256;
111 char buf[bufSize] = {0};
112 strerror_r(errno, buf, bufSize);
113 PROFILER_LOG_ERROR(LOG_CORE, "OHOS::AshmemSetProt ERR : %s", buf);
114 return false;
115 }
116
117 auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
118 if (ptr == MAP_FAILED) {
119 close(fd);
120 const int bufSize = 256;
121 char buf[bufSize] = {0};
122 strerror_r(errno, buf, bufSize);
123 PROFILER_LOG_ERROR(LOG_CORE, "CreateBlock mmap ERR : %s", buf);
124 return false;
125 }
126
127 fileDescriptor_ = fd;
128 memoryPoint_ = ptr;
129 memorySize_ = size;
130
131 memoryName_ = name;
132 header_ = reinterpret_cast<BlockHeader*>(ptr);
133
134 // initialize header infos
135 header_->info.readOffset_ = 0;
136 header_->info.writeOffset_ = 0;
137 header_->info.memorySize_ = size - sizeof(BlockHeader);
138 header_->info.bytesCount_ = 0;
139 header_->info.chunkCount_ = 0;
140
141 pthread_mutexattr_t muAttr;
142 pthread_mutexattr_init(&muAttr);
143 pthread_mutexattr_settype(&muAttr, PTHREAD_MUTEX_RECURSIVE);
144 pthread_mutex_init(&header_->info.mutex_, &muAttr);
145 return true;
146 }
147
Valid() const148 bool ShareMemoryBlock::Valid() const
149 {
150 return header_ != nullptr;
151 }
152
ShareMemoryBlock(const std::string & name,uint32_t size)153 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size) : ShareMemoryBlock()
154 {
155 CreateBlock(name, size);
156 }
157
ShareMemoryBlock(const std::string & name,uint32_t size,int fd)158 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size, int fd) : ShareMemoryBlock()
159 {
160 CreateBlockWithFd(name, size, fd);
161 }
162
~ShareMemoryBlock()163 ShareMemoryBlock::~ShareMemoryBlock()
164 {
165 ReleaseBlock();
166 }
167
ReleaseBlock()168 bool ShareMemoryBlock::ReleaseBlock()
169 {
170 if (memorySize_ > 0) {
171 munmap(memoryPoint_, memorySize_);
172 memoryPoint_ = nullptr;
173 memorySize_ = 0;
174 }
175 g_timeCount = 0;
176 if (fileDescriptor_ >= 0) {
177 close(fileDescriptor_);
178 fileDescriptor_ = -1;
179 }
180 return true;
181 }
182
GetCurrentFreeMemory(uint32_t size)183 int8_t* ShareMemoryBlock::GetCurrentFreeMemory(uint32_t size)
184 {
185 CHECK_NOTNULL(header_, nullptr, "header not ready!");
186 uint32_t realSize = size + PIECE_HEAD_LEN + PIECE_HEAD_LEN;
187
188 uint32_t wp = header_->info.writeOffset_.load();
189 if (wp + realSize > header_->info.memorySize_) { // 后面部分放不下,从头开始放
190 if (header_->info.readOffset_.load() == 0) {
191 return nullptr;
192 }
193 *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
194 wp = 0;
195 }
196 if (wp < header_->info.readOffset_.load() && header_->info.readOffset_.load() < wp + realSize) { //
197 return nullptr;
198 }
199
200 return &header_->data[wp + PIECE_HEAD_LEN];
201 }
202
GetFreeMemory(uint32_t size)203 int8_t* ShareMemoryBlock::GetFreeMemory(uint32_t size)
204 {
205 if (reusePloicy_ == ReusePolicy::DROP_NONE) {
206 return GetCurrentFreeMemory(size);
207 }
208 int8_t* ret = nullptr;
209 while (true) {
210 ret = GetCurrentFreeMemory(size);
211 if (ret != nullptr) {
212 break;
213 }
214 if (!Next()) {
215 return nullptr;
216 }
217 }
218 return ret;
219 }
220
UseFreeMemory(int8_t * pmem,uint32_t size)221 bool ShareMemoryBlock::UseFreeMemory(int8_t* pmem, uint32_t size)
222 {
223 uint32_t wp = pmem - PIECE_HEAD_LEN - header_->data;
224 *((int*)(&header_->data[wp])) = size;
225
226 header_->info.writeOffset_ = wp + PIECE_HEAD_LEN + size;
227 return true;
228 }
229
PutRaw(const int8_t * data,uint32_t size)230 bool ShareMemoryBlock::PutRaw(const int8_t* data, uint32_t size)
231 {
232 CHECK_NOTNULL(header_, false, "header not ready!");
233 PthreadLocker locker(header_->info.mutex_);
234 int8_t* rawMemory = GetFreeMemory(size);
235 if (rawMemory == nullptr) {
236 PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
237 return false;
238 }
239 if (memcpy_s(rawMemory, size, data, size) != EOK) {
240 PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
241 return false;
242 }
243
244 UseFreeMemory(rawMemory, size);
245 ++header_->info.bytesCount_;
246 ++header_->info.chunkCount_;
247 return true;
248 }
249
PutRawTimeout(const int8_t * data,uint32_t size)250 bool ShareMemoryBlock::PutRawTimeout(const int8_t* data, uint32_t size)
251 {
252 CHECK_NOTNULL(header_, false, "header not ready!");
253
254 struct timespec time_out;
255 clock_gettime(CLOCK_REALTIME, &time_out);
256 time_out.tv_sec += TIMEOUT_SEC;
257 if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
258 PROFILER_LOG_ERROR(LOG_CORE, "PutRawTimeout failed %d", errno);
259 return false;
260 }
261
262 int8_t* rawMemory = GetFreeMemory(size);
263 if (rawMemory == nullptr) {
264 PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
265 pthread_mutex_unlock(&header_->info.mutex_);
266 return false;
267 }
268 if (memcpy_s(rawMemory, size, data, size) != EOK) {
269 PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
270 pthread_mutex_unlock(&header_->info.mutex_);
271 return false;
272 }
273
274 UseFreeMemory(rawMemory, size);
275 ++header_->info.bytesCount_;
276 ++header_->info.chunkCount_;
277
278 pthread_mutex_unlock(&header_->info.mutex_);
279 return true;
280 }
281
PutWithPayloadTimeout(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize)282 bool ShareMemoryBlock::PutWithPayloadTimeout(const int8_t* header, uint32_t headerSize,
283 const int8_t* payload, uint32_t payloadSize)
284 {
285 CHECK_NOTNULL(header_, false, "header not ready!");
286 struct timespec time_out;
287 clock_gettime(CLOCK_REALTIME, &time_out);
288 time_out.tv_sec += TIMEOUT_SEC;
289 if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
290 return false;
291 }
292
293 int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
294 if (rawMemory == nullptr) {
295 PROFILER_LOG_INFO(LOG_CORE, "%s: shared memory exhausted, discarding data", __FUNCTION__);
296 pthread_mutex_unlock(&header_->info.mutex_);
297 return false;
298 }
299 if (memcpy_s(rawMemory, headerSize, header, headerSize) != EOK) {
300 pthread_mutex_unlock(&header_->info.mutex_);
301 return false;
302 }
303 if (payloadSize > 0) {
304 if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
305 pthread_mutex_unlock(&header_->info.mutex_);
306 return false;
307 }
308 }
309 UseFreeMemory(rawMemory, headerSize + payloadSize);
310 ++header_->info.bytesCount_;
311 ++header_->info.chunkCount_;
312
313 pthread_mutex_unlock(&header_->info.mutex_);
314 return true;
315 }
316
317 #ifndef NO_PROTOBUF
PutMessage(const google::protobuf::Message & pmsg,const std::string & pluginName)318 bool ShareMemoryBlock::PutMessage(const google::protobuf::Message& pmsg, const std::string& pluginName)
319 {
320 size_t size = pmsg.ByteSizeLong();
321
322 CHECK_NOTNULL(header_, false, "header not ready!");
323 PthreadLocker locker(header_->info.mutex_);
324 int8_t* rawMemory = GetFreeMemory(size);
325 if (rawMemory == nullptr) {
326 PROFILER_LOG_ERROR(LOG_CORE, "%s: PutMessage not enough space [%zu]", pluginName.c_str(), size);
327 return false;
328 }
329
330 int ret = pmsg.SerializeToArray(rawMemory, size);
331 if (ret <= 0) {
332 PROFILER_LOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
333 return false;
334 }
335 UseFreeMemory(rawMemory, size);
336 ++header_->info.bytesCount_;
337 ++header_->info.chunkCount_;
338 return true;
339 }
340 #endif
341
TakeData(const DataHandler & func,bool isProtobufSerialize)342 bool ShareMemoryBlock::TakeData(const DataHandler& func, bool isProtobufSerialize)
343 {
344 if (!isProtobufSerialize) {
345 return TakeDataOptimize(func);
346 }
347
348 CHECK_NOTNULL(header_, false, "header not ready!");
349 CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
350
351 auto size = GetDataSize();
352 if (size == 0 || size > header_->info.memorySize_) {
353 return false;
354 }
355 auto ptr = GetDataPoint();
356 CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
357 CHECK_TRUE(Next(), false, "move read pointer FAILED!");
358 --header_->info.chunkCount_;
359 return true;
360 }
361
GetDataSize()362 uint32_t ShareMemoryBlock::GetDataSize()
363 {
364 if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
365 return 0;
366 }
367 uint32_t ret = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
368 if (ret == INVALID_LENGTH) {
369 ret = *((uint32_t*)(&header_->data[0]));
370 }
371 return ret;
372 }
373
GetDataPoint()374 const int8_t* ShareMemoryBlock::GetDataPoint()
375 {
376 if (*((uint32_t*)(&header_->data[header_->info.readOffset_.load()])) == INVALID_LENGTH) {
377 return &header_->data[PIECE_HEAD_LEN];
378 }
379 return &header_->data[header_->info.readOffset_ .load()+ PIECE_HEAD_LEN];
380 }
381
Next()382 bool ShareMemoryBlock::Next()
383 {
384 if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
385 return false;
386 }
387 uint32_t size = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
388 if (size == INVALID_LENGTH) {
389 size = *((uint32_t*)(&header_->data[0]));
390 header_->info.readOffset_ = size + PIECE_HEAD_LEN;
391 } else {
392 header_->info.readOffset_ += size + PIECE_HEAD_LEN;
393 }
394 return true;
395 }
396
GetName()397 std::string ShareMemoryBlock::GetName()
398 {
399 return memoryName_;
400 }
401
GetSize()402 uint32_t ShareMemoryBlock::GetSize()
403 {
404 return memorySize_;
405 }
406
GetfileDescriptor()407 int ShareMemoryBlock::GetfileDescriptor()
408 {
409 return fileDescriptor_;
410 }
411
PutWithPayloadSync(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize,const std::function<bool ()> & callback)412 bool ShareMemoryBlock::PutWithPayloadSync(const int8_t* header, uint32_t headerSize,
413 const int8_t* payload, uint32_t payloadSize, const std::function<bool()>& callback)
414 {
415 CHECK_NOTNULL(header_, false, "header not ready!");
416 pthread_mutex_lock(&header_->info.mutex_);
417 int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
418 if (rawMemory == nullptr) {
419 while (true) {
420 if (rawMemory == nullptr) {
421 if ((callback && callback()) || (g_timeCount > waitTime_)) {
422 PROFILER_LOG_ERROR(LOG_CORE, "PutWithPayloadSync exit with g_timeCount %d", g_timeCount.load());
423 pthread_mutex_unlock(&header_->info.mutex_);
424 return false;
425 }
426 pthread_mutex_unlock(&header_->info.mutex_);
427 usleep(WAIT_RELEASE_TIMEOUT_US);
428 g_timeCount += WAIT_RELEASE_TIMEOUT_US;
429 pthread_mutex_lock(&header_->info.mutex_);
430 rawMemory = GetFreeMemory(headerSize + payloadSize);
431 continue;
432 }
433 g_timeCount = 0;
434 break;
435 }
436 }
437 if (memcpy_s(rawMemory, headerSize + payloadSize, header, headerSize) != EOK) {
438 pthread_mutex_unlock(&header_->info.mutex_);
439 return false;
440 }
441 if (payloadSize > 0) {
442 if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
443 pthread_mutex_unlock(&header_->info.mutex_);
444 return false;
445 }
446 }
447 UseFreeMemory(rawMemory, headerSize + payloadSize);
448 ++header_->info.bytesCount_;
449 ++header_->info.chunkCount_;
450 pthread_mutex_unlock(&header_->info.mutex_);
451 return true;
452 }
453
UseMemory(int32_t size)454 void ShareMemoryBlock::UseMemory(int32_t size)
455 {
456 CHECK_TRUE(header_ != nullptr, NO_RETVAL, "header not ready!");
457 CHECK_TRUE(size > 0, NO_RETVAL, "size(%d) is invalid", size);
458
459 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
460 *((int*)(&header_->data[wp])) = size;
461 header_->info.writeOffset_.store(wp + PIECE_HEAD_LEN + size, std::memory_order_release);
462 }
463
GetMemory(uint32_t size,uint8_t ** memory,uint32_t * offset)464 bool ShareMemoryBlock::GetMemory(uint32_t size, uint8_t** memory, uint32_t* offset)
465 {
466 CHECK_NOTNULL(header_, false, "header not ready!");
467
468 // The actual size is to store data with a size of offset and a size of data and a four byte tail tag.
469 uint32_t realSize = messageWriteOffset_ + size + PIECE_HEAD_LEN;
470 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
471 uint32_t rp = header_->info.readOffset_.load(std::memory_order_acquire);
472 if (rp <= wp) {
473 if (wp + realSize <= header_->info.memorySize_) {
474 // enough tail space to store data.
475 *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
476 *offset = messageWriteOffset_;
477 return true;
478 } else if (realSize <= rp) {
479 // there is data in the tail, and it is need to copy the data in the tail to the header for saving.
480 auto ret = memcpy_s(&header_->data[0], messageWriteOffset_, &header_->data[wp], messageWriteOffset_);
481 CHECK_TRUE(ret == EOK, false, "memcpy_s messageWriteOffset_(%d) data failed", messageWriteOffset_);
482 // set trailing data end tag.
483 *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
484 // set writeOffset_ to zero.
485 header_->info.writeOffset_.store(0, std::memory_order_release);
486 *memory = reinterpret_cast<uint8_t *>(&header_->data[messageWriteOffset_]);
487 *offset = messageWriteOffset_;
488 return true;
489 }
490 } else {
491 if (wp + realSize <= rp) {
492 // rp is after wp and there is enough space to store data.
493 *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
494 *offset = messageWriteOffset_;
495 return true;
496 }
497 }
498
499 PROFILER_LOG_ERROR(LOG_CORE, "Write not enough space, realSize=%u, rp=%u, wp=%u", realSize, rp, wp);
500 return false;
501 }
502
TakeDataOptimize(const DataHandler & func)503 bool ShareMemoryBlock::TakeDataOptimize(const DataHandler& func)
504 {
505 CHECK_NOTNULL(header_, false, "header not ready!");
506 CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
507
508 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_acquire);
509 uint32_t rp = header_->info.readOffset_.load(std::memory_order_relaxed);
510 int8_t* ptr = nullptr;
511 uint32_t size = 0;
512 if (rp < wp) {
513 // |---rp<---data--->wp---|
514 size = *((uint32_t*)(&header_->data[rp]));
515 ptr = &header_->data[rp + PIECE_HEAD_LEN];
516 } else if (wp < rp) {
517 // |<---data2--->wp---rp<---data1--->|
518 size = *((uint32_t*)(&header_->data[rp]));
519 // Size is the end tag of the tail and needs to be retrieved from the header.
520 if (size == INVALID_LENGTH) {
521 if (wp == 0) {
522 // no data to read.
523 return false;
524 }
525 rp = 0;
526 size = *((uint32_t*)(&header_->data[rp]));
527 }
528 ptr = &header_->data[rp + PIECE_HEAD_LEN];
529 } else {
530 // wp == rp
531 return false;
532 }
533 CHECK_NOTNULL(ptr, false, "ptr is nullptr");
534
535 // Start writing file.
536 CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
537
538 header_->info.readOffset_.store(rp + size + PIECE_HEAD_LEN, std::memory_order_release);
539 return true;
540 }
541
Seek(uint32_t pos)542 bool ShareMemoryBlock::Seek(uint32_t pos)
543 {
544 messageWriteOffset_ = pos;
545 return true;
546 }
547
ResetPos()548 void ShareMemoryBlock::ResetPos()
549 {
550 messageWriteOffset_ = PIECE_HEAD_LEN;
551 }