1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "share_memory_block.h"
17
18 #include <cstring>
19 #include <fcntl.h>
20 #include <sys/mman.h>
21 #include <sys/syscall.h>
22 #include <unistd.h>
23 #include "ashmem.h"
24 #include "logging.h"
25 #include "securec.h"
26
27 namespace {
28 const int PIECE_HEAD_LEN = 4;
29 constexpr uint32_t INVALID_LENGTH = (uint32_t)-1;
30 constexpr uint32_t TIMEOUT_SEC = 1;
31 const int WAIT_RELEASE_TIMEOUT_US = 10;
32 static std::atomic<int> g_timeCount = 0;
33 #ifndef PAGE_SIZE
34 constexpr uint32_t PAGE_SIZE = 4096;
35 #endif
36 } // namespace
37
38 struct PthreadLocker {
PthreadLockerPthreadLocker39 explicit PthreadLocker(pthread_mutex_t& mutex) : mutex_(mutex)
40 {
41 pthread_mutex_lock(&mutex_);
42 }
43
~PthreadLockerPthreadLocker44 ~PthreadLocker()
45 {
46 pthread_mutex_unlock(&mutex_);
47 }
48
49 private:
50 pthread_mutex_t& mutex_;
51 };
52
ShareMemoryBlock()53 ShareMemoryBlock::ShareMemoryBlock()
54 : fileDescriptor_(-1),
55 memoryPoint_(nullptr),
56 memorySize_(0),
57 memoryName_(),
58 header_(nullptr),
59 reusePloicy_(ReusePolicy::DROP_NONE)
60 {
61 }
62
CreateBlockWithFd(std::string name,uint32_t size,int fd)63 bool ShareMemoryBlock::CreateBlockWithFd(std::string name, uint32_t size, int fd)
64 {
65 CHECK_TRUE(fd >= 0, false, "CreateBlock FAIL SYS_memfd_create");
66
67 auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
68 if (ptr == MAP_FAILED) {
69 const int bufSize = 256;
70 char buf[bufSize] = {0};
71 strerror_r(errno, buf, bufSize);
72 PROFILER_LOG_ERROR(LOG_CORE, "CreateBlockWithFd mmap ERR : %s", buf);
73 return false;
74 }
75
76 fileDescriptor_ = fd;
77 memoryPoint_ = ptr;
78 memorySize_ = size;
79
80 memoryName_ = name;
81 header_ = reinterpret_cast<BlockHeader*>(ptr);
82
83 // Reserve 4 bytes to fill the message length.
84 messageWriteOffset_ = PIECE_HEAD_LEN;
85 // Functions required to bind the BaseMessage class.
86 smbCtx_.block = this;
87 smbCtx_.ctx.getMemory = [](RandomWriteCtx* ctx, uint32_t size, uint8_t** memory, uint32_t* offset) -> bool {
88 ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
89 return smbCtx->block->GetMemory(size, memory, offset);
90 };
91 smbCtx_.ctx.seek = [](RandomWriteCtx* ctx, uint32_t offset) -> bool {
92 ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
93 return smbCtx->block->Seek(offset);
94 };
95 return true;
96 }
97
CreateBlock(std::string name,uint32_t size)98 bool ShareMemoryBlock::CreateBlock(std::string name, uint32_t size)
99 {
100 PROFILER_LOG_INFO(LOG_CORE, "CreateBlock %s %d", name.c_str(), size);
101 CHECK_TRUE(size > sizeof(BlockHeader), false, "size %u too less!", size);
102 CHECK_TRUE(size % PAGE_SIZE == 0, false, "size %u not times of %d!", size, PAGE_SIZE);
103
104 int fd = OHOS::AshmemCreate(name.c_str(), size);
105 CHECK_TRUE(fd >= 0, false, "OHOS::AshmemCreate fail.");
106
107 int check = OHOS::AshmemSetProt(fd, PROT_READ | PROT_WRITE);
108 if (check < 0) {
109 close(fd);
110 const int bufSize = 256;
111 char buf[bufSize] = {0};
112 strerror_r(errno, buf, bufSize);
113 PROFILER_LOG_ERROR(LOG_CORE, "OHOS::AshmemSetProt ERR : %s", buf);
114 return false;
115 }
116
117 auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
118 if (ptr == MAP_FAILED) {
119 close(fd);
120 const int bufSize = 256;
121 char buf[bufSize] = {0};
122 strerror_r(errno, buf, bufSize);
123 PROFILER_LOG_ERROR(LOG_CORE, "CreateBlock mmap ERR : %s", buf);
124 return false;
125 }
126
127 fileDescriptor_ = fd;
128 memoryPoint_ = ptr;
129 memorySize_ = size;
130
131 memoryName_ = name;
132 header_ = reinterpret_cast<BlockHeader*>(ptr);
133 if (header_ == nullptr) {
134 return false;
135 }
136 // initialize header infos
137 header_->info.readOffset_ = 0;
138 header_->info.writeOffset_ = 0;
139 header_->info.memorySize_ = size - sizeof(BlockHeader);
140 header_->info.bytesCount_ = 0;
141 header_->info.chunkCount_ = 0;
142
143 pthread_mutexattr_t muAttr;
144 pthread_mutexattr_init(&muAttr);
145 pthread_mutexattr_settype(&muAttr, PTHREAD_MUTEX_RECURSIVE);
146 pthread_mutex_init(&header_->info.mutex_, &muAttr);
147 return true;
148 }
149
Valid() const150 bool ShareMemoryBlock::Valid() const
151 {
152 return header_ != nullptr;
153 }
154
ShareMemoryBlock(const std::string & name,uint32_t size)155 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size) : ShareMemoryBlock()
156 {
157 CreateBlock(name, size);
158 }
159
ShareMemoryBlock(const std::string & name,uint32_t size,int fd)160 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size, int fd) : ShareMemoryBlock()
161 {
162 CreateBlockWithFd(name, size, fd);
163 }
164
~ShareMemoryBlock()165 ShareMemoryBlock::~ShareMemoryBlock()
166 {
167 ReleaseBlock();
168 }
169
ReleaseBlock()170 bool ShareMemoryBlock::ReleaseBlock()
171 {
172 if (memorySize_ > 0) {
173 munmap(memoryPoint_, memorySize_);
174 memoryPoint_ = nullptr;
175 memorySize_ = 0;
176 }
177 g_timeCount = 0;
178 if (fileDescriptor_ >= 0) {
179 close(fileDescriptor_);
180 fileDescriptor_ = -1;
181 }
182 return true;
183 }
184
GetCurrentFreeMemory(uint32_t size)185 int8_t* ShareMemoryBlock::GetCurrentFreeMemory(uint32_t size)
186 {
187 CHECK_NOTNULL(header_, nullptr, "header not ready!");
188 uint32_t realSize = size + PIECE_HEAD_LEN + PIECE_HEAD_LEN;
189
190 uint32_t wp = header_->info.writeOffset_.load();
191 if (wp + realSize > header_->info.memorySize_) { // 后面部分放不下,从头开始放
192 if (header_->info.readOffset_.load() == 0) {
193 return nullptr;
194 }
195 *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
196 wp = 0;
197 }
198 if (wp < header_->info.readOffset_.load() && header_->info.readOffset_.load() < wp + realSize) { //
199 return nullptr;
200 }
201
202 return &header_->data[wp + PIECE_HEAD_LEN];
203 }
204
GetFreeMemory(uint32_t size)205 int8_t* ShareMemoryBlock::GetFreeMemory(uint32_t size)
206 {
207 if (reusePloicy_ == ReusePolicy::DROP_NONE) {
208 return GetCurrentFreeMemory(size);
209 }
210 int8_t* ret = nullptr;
211 while (true) {
212 ret = GetCurrentFreeMemory(size);
213 if (ret != nullptr) {
214 break;
215 }
216 if (!Next()) {
217 return nullptr;
218 }
219 }
220 return ret;
221 }
222
UseFreeMemory(int8_t * pmem,uint32_t size)223 bool ShareMemoryBlock::UseFreeMemory(int8_t* pmem, uint32_t size)
224 {
225 uint32_t wp = pmem - PIECE_HEAD_LEN - header_->data;
226 *((int*)(&header_->data[wp])) = size;
227
228 header_->info.writeOffset_ = wp + PIECE_HEAD_LEN + size;
229 return true;
230 }
231
PutRaw(const int8_t * data,uint32_t size)232 bool ShareMemoryBlock::PutRaw(const int8_t* data, uint32_t size)
233 {
234 CHECK_NOTNULL(header_, false, "header not ready!");
235 PthreadLocker locker(header_->info.mutex_);
236 int8_t* rawMemory = GetFreeMemory(size);
237 if (rawMemory == nullptr) {
238 PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
239 return false;
240 }
241 if (memcpy_s(rawMemory, size, data, size) != EOK) {
242 PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
243 return false;
244 }
245
246 UseFreeMemory(rawMemory, size);
247 ++header_->info.bytesCount_;
248 ++header_->info.chunkCount_;
249 return true;
250 }
251
PutRawTimeout(const int8_t * data,uint32_t size)252 bool ShareMemoryBlock::PutRawTimeout(const int8_t* data, uint32_t size)
253 {
254 CHECK_NOTNULL(header_, false, "header not ready!");
255
256 struct timespec time_out;
257 clock_gettime(CLOCK_REALTIME, &time_out);
258 time_out.tv_sec += TIMEOUT_SEC;
259 if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
260 PROFILER_LOG_ERROR(LOG_CORE, "PutRawTimeout failed %d", errno);
261 return false;
262 }
263
264 int8_t* rawMemory = GetFreeMemory(size);
265 if (rawMemory == nullptr) {
266 PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
267 pthread_mutex_unlock(&header_->info.mutex_);
268 return false;
269 }
270 if (memcpy_s(rawMemory, size, data, size) != EOK) {
271 PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
272 pthread_mutex_unlock(&header_->info.mutex_);
273 return false;
274 }
275
276 UseFreeMemory(rawMemory, size);
277 ++header_->info.bytesCount_;
278 ++header_->info.chunkCount_;
279
280 pthread_mutex_unlock(&header_->info.mutex_);
281 return true;
282 }
283
PutWithPayloadTimeout(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize)284 bool ShareMemoryBlock::PutWithPayloadTimeout(const int8_t* header, uint32_t headerSize,
285 const int8_t* payload, uint32_t payloadSize)
286 {
287 CHECK_NOTNULL(header_, false, "header not ready!");
288 struct timespec time_out;
289 clock_gettime(CLOCK_REALTIME, &time_out);
290 time_out.tv_sec += TIMEOUT_SEC;
291 if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
292 return false;
293 }
294
295 int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
296 if (rawMemory == nullptr) {
297 PROFILER_LOG_INFO(LOG_CORE, "%s: shared memory exhausted, discarding data", __FUNCTION__);
298 pthread_mutex_unlock(&header_->info.mutex_);
299 return false;
300 }
301 if (memcpy_s(rawMemory, headerSize, header, headerSize) != EOK) {
302 pthread_mutex_unlock(&header_->info.mutex_);
303 return false;
304 }
305 if (payloadSize > 0) {
306 if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
307 pthread_mutex_unlock(&header_->info.mutex_);
308 return false;
309 }
310 }
311 UseFreeMemory(rawMemory, headerSize + payloadSize);
312 ++header_->info.bytesCount_;
313 ++header_->info.chunkCount_;
314
315 pthread_mutex_unlock(&header_->info.mutex_);
316 return true;
317 }
318
319 #ifndef NO_PROTOBUF
PutMessage(const google::protobuf::Message & pmsg,const std::string & pluginName)320 bool ShareMemoryBlock::PutMessage(const google::protobuf::Message& pmsg, const std::string& pluginName)
321 {
322 size_t size = pmsg.ByteSizeLong();
323
324 CHECK_NOTNULL(header_, false, "header not ready!");
325 PthreadLocker locker(header_->info.mutex_);
326 int8_t* rawMemory = GetFreeMemory(size);
327 if (rawMemory == nullptr) {
328 PROFILER_LOG_ERROR(LOG_CORE, "%s: PutMessage not enough space [%zu]", pluginName.c_str(), size);
329 return false;
330 }
331
332 int ret = pmsg.SerializeToArray(rawMemory, size);
333 if (ret <= 0) {
334 PROFILER_LOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
335 return false;
336 }
337 UseFreeMemory(rawMemory, size);
338 ++header_->info.bytesCount_;
339 ++header_->info.chunkCount_;
340 return true;
341 }
342 #endif
343
TakeData(const DataHandler & func,bool isProtobufSerialize)344 bool ShareMemoryBlock::TakeData(const DataHandler& func, bool isProtobufSerialize)
345 {
346 if (!isProtobufSerialize) {
347 return TakeDataOptimize(func);
348 }
349
350 CHECK_NOTNULL(header_, false, "header not ready!");
351 CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
352
353 auto size = GetDataSize();
354 if (size == 0 || size > header_->info.memorySize_) {
355 return false;
356 }
357 auto ptr = GetDataPoint();
358 CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
359 CHECK_TRUE(Next(), false, "move read pointer FAILED!");
360 --header_->info.chunkCount_;
361 return true;
362 }
363
GetDataSize()364 uint32_t ShareMemoryBlock::GetDataSize()
365 {
366 if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
367 return 0;
368 }
369 uint32_t ret = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
370 if (ret == INVALID_LENGTH) {
371 ret = *((uint32_t*)(&header_->data[0]));
372 }
373 return ret;
374 }
375
GetDataPoint()376 const int8_t* ShareMemoryBlock::GetDataPoint()
377 {
378 if (*((uint32_t*)(&header_->data[header_->info.readOffset_.load()])) == INVALID_LENGTH) {
379 return &header_->data[PIECE_HEAD_LEN];
380 }
381 return &header_->data[header_->info.readOffset_ .load()+ PIECE_HEAD_LEN];
382 }
383
Next()384 bool ShareMemoryBlock::Next()
385 {
386 if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
387 return false;
388 }
389 uint32_t size = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
390 if (size == INVALID_LENGTH) {
391 size = *((uint32_t*)(&header_->data[0]));
392 header_->info.readOffset_ = size + PIECE_HEAD_LEN;
393 } else {
394 header_->info.readOffset_ += size + PIECE_HEAD_LEN;
395 }
396 return true;
397 }
398
GetName()399 std::string ShareMemoryBlock::GetName()
400 {
401 return memoryName_;
402 }
403
GetSize()404 uint32_t ShareMemoryBlock::GetSize()
405 {
406 return memorySize_;
407 }
408
GetfileDescriptor()409 int ShareMemoryBlock::GetfileDescriptor()
410 {
411 return fileDescriptor_;
412 }
413
PutWithPayloadSync(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize,const std::function<bool ()> & callback)414 bool ShareMemoryBlock::PutWithPayloadSync(const int8_t* header, uint32_t headerSize,
415 const int8_t* payload, uint32_t payloadSize, const std::function<bool()>& callback)
416 {
417 CHECK_NOTNULL(header_, false, "header not ready!");
418 pthread_mutex_lock(&header_->info.mutex_);
419 int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
420 if (rawMemory == nullptr) {
421 while (true) {
422 if (rawMemory == nullptr) {
423 if ((callback && callback()) || (g_timeCount > waitTime_)) {
424 PROFILER_LOG_ERROR(LOG_CORE, "PutWithPayloadSync exit with g_timeCount %d", g_timeCount.load());
425 pthread_mutex_unlock(&header_->info.mutex_);
426 return false;
427 }
428 pthread_mutex_unlock(&header_->info.mutex_);
429 usleep(WAIT_RELEASE_TIMEOUT_US);
430 g_timeCount += WAIT_RELEASE_TIMEOUT_US;
431 pthread_mutex_lock(&header_->info.mutex_);
432 rawMemory = GetFreeMemory(headerSize + payloadSize);
433 continue;
434 }
435 g_timeCount = 0;
436 break;
437 }
438 }
439 if (memcpy_s(rawMemory, headerSize + payloadSize, header, headerSize) != EOK) {
440 pthread_mutex_unlock(&header_->info.mutex_);
441 return false;
442 }
443 if (payloadSize > 0) {
444 if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
445 pthread_mutex_unlock(&header_->info.mutex_);
446 return false;
447 }
448 }
449 UseFreeMemory(rawMemory, headerSize + payloadSize);
450 ++header_->info.bytesCount_;
451 ++header_->info.chunkCount_;
452 pthread_mutex_unlock(&header_->info.mutex_);
453 return true;
454 }
455
UseMemory(int32_t size)456 void ShareMemoryBlock::UseMemory(int32_t size)
457 {
458 CHECK_TRUE(header_ != nullptr, NO_RETVAL, "header not ready!");
459 CHECK_TRUE(size > 0, NO_RETVAL, "size(%d) is invalid", size);
460
461 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
462 *((int*)(&header_->data[wp])) = size;
463 header_->info.writeOffset_.store(wp + PIECE_HEAD_LEN + size, std::memory_order_release);
464 }
465
GetMemory(uint32_t size,uint8_t ** memory,uint32_t * offset)466 bool ShareMemoryBlock::GetMemory(uint32_t size, uint8_t** memory, uint32_t* offset)
467 {
468 CHECK_NOTNULL(header_, false, "header not ready!");
469
470 // The actual size is to store data with a size of offset and a size of data and a four byte tail tag.
471 uint32_t realSize = messageWriteOffset_ + size + PIECE_HEAD_LEN;
472 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
473 uint32_t rp = header_->info.readOffset_.load(std::memory_order_acquire);
474 if (rp <= wp) {
475 if (wp + realSize <= header_->info.memorySize_) {
476 // enough tail space to store data.
477 *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
478 *offset = messageWriteOffset_;
479 return true;
480 } else if (realSize <= rp) {
481 // there is data in the tail, and it is need to copy the data in the tail to the header for saving.
482 auto ret = memcpy_s(&header_->data[0], messageWriteOffset_, &header_->data[wp], messageWriteOffset_);
483 CHECK_TRUE(ret == EOK, false, "memcpy_s messageWriteOffset_(%d) data failed", messageWriteOffset_);
484 // set trailing data end tag.
485 *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
486 // set writeOffset_ to zero.
487 header_->info.writeOffset_.store(0, std::memory_order_release);
488 *memory = reinterpret_cast<uint8_t *>(&header_->data[messageWriteOffset_]);
489 *offset = messageWriteOffset_;
490 return true;
491 }
492 } else {
493 if (wp + realSize <= rp) {
494 // rp is after wp and there is enough space to store data.
495 *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
496 *offset = messageWriteOffset_;
497 return true;
498 }
499 }
500
501 PROFILER_LOG_ERROR(LOG_CORE, "Write not enough space, realSize=%u, rp=%u, wp=%u", realSize, rp, wp);
502 return false;
503 }
504
TakeDataOptimize(const DataHandler & func)505 bool ShareMemoryBlock::TakeDataOptimize(const DataHandler& func)
506 {
507 CHECK_NOTNULL(header_, false, "header not ready!");
508 CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
509
510 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_acquire);
511 uint32_t rp = header_->info.readOffset_.load(std::memory_order_relaxed);
512 int8_t* ptr = nullptr;
513 uint32_t size = 0;
514 if (rp < wp) {
515 // |---rp<---data--->wp---|
516 size = *((uint32_t*)(&header_->data[rp]));
517 ptr = &header_->data[rp + PIECE_HEAD_LEN];
518 } else if (wp < rp) {
519 // |<---data2--->wp---rp<---data1--->|
520 size = *((uint32_t*)(&header_->data[rp]));
521 // Size is the end tag of the tail and needs to be retrieved from the header.
522 if (size == INVALID_LENGTH) {
523 if (wp == 0) {
524 // no data to read.
525 return false;
526 }
527 rp = 0;
528 size = *((uint32_t*)(&header_->data[rp]));
529 }
530 ptr = &header_->data[rp + PIECE_HEAD_LEN];
531 } else {
532 // wp == rp
533 return false;
534 }
535 CHECK_NOTNULL(ptr, false, "ptr is nullptr");
536
537 // Start writing file.
538 CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
539
540 header_->info.readOffset_.store(rp + size + PIECE_HEAD_LEN, std::memory_order_release);
541 return true;
542 }
543
Seek(uint32_t pos)544 bool ShareMemoryBlock::Seek(uint32_t pos)
545 {
546 messageWriteOffset_ = pos;
547 return true;
548 }
549
ResetPos()550 void ShareMemoryBlock::ResetPos()
551 {
552 messageWriteOffset_ = PIECE_HEAD_LEN;
553 }