1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "share_memory_block.h"
17
18 #include <cstring>
19 #include <fcntl.h>
20 #include <sys/mman.h>
21 #include <sys/syscall.h>
22 #include <unistd.h>
23 #include "ashmem.h"
24 #include "logging.h"
25 #include "securec.h"
26
27 namespace {
28 const int PIECE_HEAD_LEN = 4;
29 constexpr uint32_t INVALID_LENGTH = (uint32_t)-1;
30 constexpr uint32_t TIMEOUT_SEC = 1;
31 const int WAIT_RELEASE_TIMEOUT_US = 10;
32 static std::atomic<int> g_timeCount = 0;
33 #ifndef PAGE_SIZE
34 constexpr uint32_t PAGE_SIZE = 4096;
35 #endif
36 } // namespace
37
38 struct PthreadLocker {
PthreadLockerPthreadLocker39 explicit PthreadLocker(pthread_mutex_t& mutex) : mutex_(mutex)
40 {
41 pthread_mutex_lock(&mutex_);
42 }
43
~PthreadLockerPthreadLocker44 ~PthreadLocker()
45 {
46 pthread_mutex_unlock(&mutex_);
47 }
48
49 private:
50 pthread_mutex_t& mutex_;
51 };
52
ShareMemoryBlock()53 ShareMemoryBlock::ShareMemoryBlock()
54 : fileDescriptor_(-1),
55 memoryPoint_(nullptr),
56 memorySize_(0),
57 memoryName_(),
58 header_(nullptr),
59 reusePloicy_(ReusePolicy::DROP_NONE)
60 {
61 }
62
CreateBlockWithFd(std::string name,uint32_t size,int fd)63 bool ShareMemoryBlock::CreateBlockWithFd(std::string name, uint32_t size, int fd)
64 {
65 CHECK_TRUE(fd >= 0, false, "CreateBlock FAIL SYS_memfd_create");
66
67 auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
68 if (ptr == MAP_FAILED) {
69 const int bufSize = 256;
70 char buf[bufSize] = {0};
71 strerror_r(errno, buf, bufSize);
72 PROFILER_LOG_ERROR(LOG_CORE, "CreateBlockWithFd mmap ERR : %s", buf);
73 return false;
74 }
75
76 fileDescriptor_ = fd;
77 memoryPoint_ = ptr;
78 memorySize_ = size;
79
80 memoryName_ = name;
81 header_ = reinterpret_cast<BlockHeader*>(ptr);
82
83 // Reserve 4 bytes to fill the message length.
84 messageWriteOffset_ = PIECE_HEAD_LEN;
85 // Functions required to bind the BaseMessage class.
86 smbCtx_.block = this;
87 smbCtx_.ctx.getMemory = [](RandomWriteCtx* ctx, uint32_t size, uint8_t** memory, uint32_t* offset) -> bool {
88 ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
89 return smbCtx->block->GetMemory(size, memory, offset);
90 };
91 smbCtx_.ctx.seek = [](RandomWriteCtx* ctx, uint32_t offset) -> bool {
92 ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
93 return smbCtx->block->Seek(offset);
94 };
95 return true;
96 }
97
CreateBlock(std::string name,uint32_t size)98 bool ShareMemoryBlock::CreateBlock(std::string name, uint32_t size)
99 {
100 PROFILER_LOG_INFO(LOG_CORE, "CreateBlock %s %d", name.c_str(), size);
101 CHECK_TRUE(size > sizeof(BlockHeader), false, "size %u too less!", size);
102 CHECK_TRUE(size % PAGE_SIZE == 0, false, "size %u not times of %d!", size, PAGE_SIZE);
103
104 int fd = OHOS::AshmemCreate(name.c_str(), size);
105 CHECK_TRUE(fd >= 0, false, "OHOS::AshmemCreate fail.");
106
107 int check = OHOS::AshmemSetProt(fd, PROT_READ | PROT_WRITE);
108 if (check < 0) {
109 close(fd);
110 const int bufSize = 256;
111 char buf[bufSize] = {0};
112 strerror_r(errno, buf, bufSize);
113 PROFILER_LOG_ERROR(LOG_CORE, "OHOS::AshmemSetProt ERR : %s", buf);
114 return false;
115 }
116
117 auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
118 if (ptr == MAP_FAILED) {
119 close(fd);
120 const int bufSize = 256;
121 char buf[bufSize] = {0};
122 strerror_r(errno, buf, bufSize);
123 PROFILER_LOG_ERROR(LOG_CORE, "CreateBlock mmap ERR : %s", buf);
124 return false;
125 }
126
127 fileDescriptor_ = fd;
128 memoryPoint_ = ptr;
129 memorySize_ = size;
130
131 memoryName_ = name;
132 header_ = reinterpret_cast<BlockHeader*>(ptr);
133 if (header_ == nullptr) {
134 return false;
135 }
136 // initialize header infos
137 header_->info.readOffset_ = 0;
138 header_->info.writeOffset_ = 0;
139 header_->info.memorySize_ = size - sizeof(BlockHeader);
140 header_->info.bytesCount_ = 0;
141 header_->info.chunkCount_ = 0;
142
143 pthread_mutexattr_t muAttr;
144 pthread_mutexattr_init(&muAttr);
145 pthread_mutexattr_settype(&muAttr, PTHREAD_MUTEX_RECURSIVE);
146 pthread_mutex_init(&header_->info.mutex_, &muAttr);
147 return true;
148 }
149
Valid() const150 bool ShareMemoryBlock::Valid() const
151 {
152 return header_ != nullptr;
153 }
154
ShareMemoryBlock(const std::string & name,uint32_t size)155 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size) : ShareMemoryBlock()
156 {
157 CreateBlock(name, size);
158 }
159
ShareMemoryBlock(const std::string & name,uint32_t size,int fd)160 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size, int fd) : ShareMemoryBlock()
161 {
162 CreateBlockWithFd(name, size, fd);
163 }
164
~ShareMemoryBlock()165 ShareMemoryBlock::~ShareMemoryBlock()
166 {
167 ReleaseBlock();
168 }
169
ReleaseBlock()170 bool ShareMemoryBlock::ReleaseBlock()
171 {
172 if (memorySize_ > 0) {
173 munmap(memoryPoint_, memorySize_);
174 memoryPoint_ = nullptr;
175 memorySize_ = 0;
176 }
177 g_timeCount = 0;
178 if (fileDescriptor_ >= 0) {
179 close(fileDescriptor_);
180 fileDescriptor_ = -1;
181 }
182 return true;
183 }
184
GetCurrentFreeMemory(uint32_t size)185 int8_t* ShareMemoryBlock::GetCurrentFreeMemory(uint32_t size)
186 {
187 CHECK_NOTNULL(header_, nullptr, "header not ready!");
188 uint32_t realSize = size + PIECE_HEAD_LEN + PIECE_HEAD_LEN;
189
190 uint32_t wp = header_->info.writeOffset_.load();
191 uint32_t rp = header_->info.readOffset_.load();
192 if (wp < rp && rp <= wp + realSize) {
193 return nullptr;
194 }
195 if (wp + realSize > header_->info.memorySize_) { // 后面部分放不下,从头开始放
196 if (rp == 0) {
197 return nullptr;
198 }
199 *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
200 wp = 0;
201 if (wp + realSize >= rp) {
202 return nullptr;
203 }
204 }
205 return &header_->data[wp + PIECE_HEAD_LEN];
206 }
207
GetFreeMemory(uint32_t size)208 int8_t* ShareMemoryBlock::GetFreeMemory(uint32_t size)
209 {
210 if (reusePloicy_ == ReusePolicy::DROP_NONE) {
211 return GetCurrentFreeMemory(size);
212 }
213 int8_t* ret = nullptr;
214 while (true) {
215 ret = GetCurrentFreeMemory(size);
216 if (ret != nullptr) {
217 break;
218 }
219 if (!Next()) {
220 return nullptr;
221 }
222 }
223 return ret;
224 }
225
UseFreeMemory(int8_t * pmem,uint32_t size)226 bool ShareMemoryBlock::UseFreeMemory(int8_t* pmem, uint32_t size)
227 {
228 uint32_t wp = pmem - PIECE_HEAD_LEN - header_->data;
229 *((int*)(&header_->data[wp])) = size;
230
231 header_->info.writeOffset_ = wp + PIECE_HEAD_LEN + size;
232 return true;
233 }
234
PutRaw(const int8_t * data,uint32_t size)235 bool ShareMemoryBlock::PutRaw(const int8_t* data, uint32_t size)
236 {
237 CHECK_NOTNULL(header_, false, "header not ready!");
238 PthreadLocker locker(header_->info.mutex_);
239 int8_t* rawMemory = GetFreeMemory(size);
240 if (rawMemory == nullptr) {
241 PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
242 return false;
243 }
244 if (memcpy_s(rawMemory, size, data, size) != EOK) {
245 PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
246 return false;
247 }
248
249 UseFreeMemory(rawMemory, size);
250 ++header_->info.bytesCount_;
251 ++header_->info.chunkCount_;
252 return true;
253 }
254
PutRawTimeout(const int8_t * data,uint32_t size)255 bool ShareMemoryBlock::PutRawTimeout(const int8_t* data, uint32_t size)
256 {
257 CHECK_NOTNULL(header_, false, "header not ready!");
258
259 struct timespec time_out;
260 clock_gettime(CLOCK_REALTIME, &time_out);
261 time_out.tv_sec += TIMEOUT_SEC;
262 if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
263 PROFILER_LOG_ERROR(LOG_CORE, "PutRawTimeout failed %d", errno);
264 return false;
265 }
266
267 int8_t* rawMemory = GetFreeMemory(size);
268 if (rawMemory == nullptr) {
269 PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
270 pthread_mutex_unlock(&header_->info.mutex_);
271 return false;
272 }
273 if (memcpy_s(rawMemory, size, data, size) != EOK) {
274 PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
275 pthread_mutex_unlock(&header_->info.mutex_);
276 return false;
277 }
278
279 UseFreeMemory(rawMemory, size);
280 ++header_->info.bytesCount_;
281 ++header_->info.chunkCount_;
282
283 pthread_mutex_unlock(&header_->info.mutex_);
284 return true;
285 }
286
PutWithPayloadTimeout(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize)287 bool ShareMemoryBlock::PutWithPayloadTimeout(const int8_t* header, uint32_t headerSize,
288 const int8_t* payload, uint32_t payloadSize)
289 {
290 CHECK_NOTNULL(header_, false, "header not ready!");
291 struct timespec time_out;
292 clock_gettime(CLOCK_REALTIME, &time_out);
293 time_out.tv_sec += TIMEOUT_SEC;
294 if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
295 return false;
296 }
297
298 int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
299 if (rawMemory == nullptr) {
300 PROFILER_LOG_INFO(LOG_CORE, "%s: shared memory exhausted, discarding data", __FUNCTION__);
301 pthread_mutex_unlock(&header_->info.mutex_);
302 return false;
303 }
304 if (memcpy_s(rawMemory, headerSize, header, headerSize) != EOK) {
305 pthread_mutex_unlock(&header_->info.mutex_);
306 return false;
307 }
308 if (payloadSize > 0) {
309 if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
310 pthread_mutex_unlock(&header_->info.mutex_);
311 return false;
312 }
313 }
314 UseFreeMemory(rawMemory, headerSize + payloadSize);
315 ++header_->info.bytesCount_;
316 ++header_->info.chunkCount_;
317
318 pthread_mutex_unlock(&header_->info.mutex_);
319 return true;
320 }
321
322 #ifndef NO_PROTOBUF
PutMessage(const google::protobuf::Message & pmsg,const std::string & pluginName)323 bool ShareMemoryBlock::PutMessage(const google::protobuf::Message& pmsg, const std::string& pluginName)
324 {
325 size_t size = pmsg.ByteSizeLong();
326
327 CHECK_NOTNULL(header_, false, "header not ready!");
328 PthreadLocker locker(header_->info.mutex_);
329 int8_t* rawMemory = GetFreeMemory(size);
330 if (rawMemory == nullptr) {
331 PROFILER_LOG_ERROR(LOG_CORE, "%s: PutMessage not enough space [%zu]", pluginName.c_str(), size);
332 return false;
333 }
334
335 int ret = pmsg.SerializeToArray(rawMemory, size);
336 if (ret <= 0) {
337 PROFILER_LOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
338 return false;
339 }
340 UseFreeMemory(rawMemory, size);
341 ++header_->info.bytesCount_;
342 ++header_->info.chunkCount_;
343 return true;
344 }
345 #endif
346
TakeData(const DataHandler & func,bool isProtobufSerialize)347 bool ShareMemoryBlock::TakeData(const DataHandler& func, bool isProtobufSerialize)
348 {
349 if (!isProtobufSerialize) {
350 return TakeDataOptimize(func);
351 }
352
353 CHECK_NOTNULL(header_, false, "header not ready!");
354 CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
355
356 auto size = GetDataSize();
357 if (size == 0 || size > header_->info.memorySize_) {
358 return false;
359 }
360 auto ptr = GetDataPoint();
361 CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
362 CHECK_TRUE(Next(), false, "move read pointer FAILED!");
363 --header_->info.chunkCount_;
364 return true;
365 }
366
GetDataSize()367 uint32_t ShareMemoryBlock::GetDataSize()
368 {
369 if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
370 return 0;
371 }
372 uint32_t ret = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
373 if (ret == INVALID_LENGTH) {
374 ret = *((uint32_t*)(&header_->data[0]));
375 }
376 return ret;
377 }
378
GetDataPoint()379 const int8_t* ShareMemoryBlock::GetDataPoint()
380 {
381 if (*((uint32_t*)(&header_->data[header_->info.readOffset_.load()])) == INVALID_LENGTH) {
382 return &header_->data[PIECE_HEAD_LEN];
383 }
384 return &header_->data[header_->info.readOffset_ .load()+ PIECE_HEAD_LEN];
385 }
386
Next()387 bool ShareMemoryBlock::Next()
388 {
389 if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
390 return false;
391 }
392 uint32_t size = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
393 if (size == INVALID_LENGTH) {
394 size = *((uint32_t*)(&header_->data[0]));
395 header_->info.readOffset_ = size + PIECE_HEAD_LEN;
396 } else {
397 header_->info.readOffset_ += size + PIECE_HEAD_LEN;
398 }
399 return true;
400 }
401
GetName()402 std::string ShareMemoryBlock::GetName()
403 {
404 return memoryName_;
405 }
406
GetSize()407 uint32_t ShareMemoryBlock::GetSize()
408 {
409 return memorySize_;
410 }
411
GetfileDescriptor()412 int ShareMemoryBlock::GetfileDescriptor()
413 {
414 return fileDescriptor_;
415 }
416
PutWithPayloadSync(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize,const std::function<bool ()> & callback)417 bool ShareMemoryBlock::PutWithPayloadSync(const int8_t* header, uint32_t headerSize,
418 const int8_t* payload, uint32_t payloadSize, const std::function<bool()>& callback)
419 {
420 if (header_ == nullptr) {
421 return false;
422 }
423 pthread_mutex_lock(&header_->info.mutex_);
424 int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
425 if (rawMemory == nullptr) {
426 while (true) {
427 if (rawMemory == nullptr) {
428 if ((callback && callback()) || (g_timeCount > waitTime_)) {
429 pthread_mutex_unlock(&header_->info.mutex_);
430 return false;
431 }
432 pthread_mutex_unlock(&header_->info.mutex_);
433 usleep(WAIT_RELEASE_TIMEOUT_US);
434 g_timeCount += WAIT_RELEASE_TIMEOUT_US;
435 pthread_mutex_lock(&header_->info.mutex_);
436 rawMemory = GetFreeMemory(headerSize + payloadSize);
437 continue;
438 }
439 g_timeCount = 0;
440 break;
441 }
442 }
443 if (memcpy_s(rawMemory, headerSize + payloadSize, header, headerSize) != EOK) {
444 pthread_mutex_unlock(&header_->info.mutex_);
445 return false;
446 }
447 if (payloadSize > 0) {
448 if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
449 pthread_mutex_unlock(&header_->info.mutex_);
450 return false;
451 }
452 }
453 UseFreeMemory(rawMemory, headerSize + payloadSize);
454 ++header_->info.bytesCount_;
455 ++header_->info.chunkCount_;
456 pthread_mutex_unlock(&header_->info.mutex_);
457 return true;
458 }
459
UseMemory(int32_t size)460 void ShareMemoryBlock::UseMemory(int32_t size)
461 {
462 CHECK_TRUE(header_ != nullptr, NO_RETVAL, "header not ready!");
463 CHECK_TRUE(size > 0, NO_RETVAL, "size(%d) is invalid", size);
464
465 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
466 *((int*)(&header_->data[wp])) = size;
467 header_->info.writeOffset_.store(wp + PIECE_HEAD_LEN + size, std::memory_order_release);
468 }
469
GetMemory(uint32_t size,uint8_t ** memory,uint32_t * offset)470 bool ShareMemoryBlock::GetMemory(uint32_t size, uint8_t** memory, uint32_t* offset)
471 {
472 CHECK_NOTNULL(header_, false, "header not ready!");
473
474 // The actual size is to store data with a size of offset and a size of data and a four byte tail tag.
475 uint32_t realSize = messageWriteOffset_ + size + PIECE_HEAD_LEN;
476 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
477 uint32_t rp = header_->info.readOffset_.load(std::memory_order_acquire);
478 if (rp <= wp) {
479 if (wp + realSize <= header_->info.memorySize_) {
480 // enough tail space to store data.
481 *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
482 *offset = messageWriteOffset_;
483 return true;
484 } else if (realSize <= rp) {
485 // there is data in the tail, and it is need to copy the data in the tail to the header for saving.
486 auto ret = memcpy_s(&header_->data[0], messageWriteOffset_, &header_->data[wp], messageWriteOffset_);
487 CHECK_TRUE(ret == EOK, false, "memcpy_s messageWriteOffset_(%d) data failed", messageWriteOffset_);
488 // set trailing data end tag.
489 *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
490 // set writeOffset_ to zero.
491 header_->info.writeOffset_.store(0, std::memory_order_release);
492 *memory = reinterpret_cast<uint8_t *>(&header_->data[messageWriteOffset_]);
493 *offset = messageWriteOffset_;
494 return true;
495 }
496 } else {
497 if (wp + realSize <= rp) {
498 // rp is after wp and there is enough space to store data.
499 *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
500 *offset = messageWriteOffset_;
501 return true;
502 }
503 }
504
505 PROFILER_LOG_ERROR(LOG_CORE, "Write not enough space, realSize=%u, rp=%u, wp=%u", realSize, rp, wp);
506 return false;
507 }
508
TakeDataOptimize(const DataHandler & func)509 bool ShareMemoryBlock::TakeDataOptimize(const DataHandler& func)
510 {
511 CHECK_NOTNULL(header_, false, "header not ready!");
512 CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
513
514 uint32_t wp = header_->info.writeOffset_.load(std::memory_order_acquire);
515 uint32_t rp = header_->info.readOffset_.load(std::memory_order_relaxed);
516 int8_t* ptr = nullptr;
517 uint32_t size = 0;
518 if (rp < wp) {
519 // |---rp<---data--->wp---|
520 size = *((uint32_t*)(&header_->data[rp]));
521 ptr = &header_->data[rp + PIECE_HEAD_LEN];
522 } else if (wp < rp) {
523 // |<---data2--->wp---rp<---data1--->|
524 size = *((uint32_t*)(&header_->data[rp]));
525 // Size is the end tag of the tail and needs to be retrieved from the header.
526 if (size == INVALID_LENGTH) {
527 if (wp == 0) {
528 // no data to read.
529 return false;
530 }
531 rp = 0;
532 size = *((uint32_t*)(&header_->data[rp]));
533 }
534 ptr = &header_->data[rp + PIECE_HEAD_LEN];
535 } else {
536 // wp == rp
537 return false;
538 }
539 CHECK_NOTNULL(ptr, false, "ptr is nullptr");
540
541 // Start writing file.
542 CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
543
544 header_->info.readOffset_.store(rp + size + PIECE_HEAD_LEN, std::memory_order_release);
545 return true;
546 }
547
Seek(uint32_t pos)548 bool ShareMemoryBlock::Seek(uint32_t pos)
549 {
550 messageWriteOffset_ = pos;
551 return true;
552 }
553
ResetPos()554 void ShareMemoryBlock::ResetPos()
555 {
556 messageWriteOffset_ = PIECE_HEAD_LEN;
557 }