• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "share_memory_block.h"
17 
18 #include <cstring>
19 #include <fcntl.h>
20 #include <sys/mman.h>
21 #include <sys/syscall.h>
22 #include <unistd.h>
23 #include "ashmem.h"
24 #include "logging.h"
25 #include "securec.h"
26 
27 namespace {
28 const int PIECE_HEAD_LEN = 4;
29 constexpr uint32_t INVALID_LENGTH = (uint32_t)-1;
30 constexpr uint32_t TIMEOUT_SEC = 1;
31 const int WAIT_RELEASE_TIMEOUT_US = 10;
32 #ifndef PAGE_SIZE
33 constexpr uint32_t PAGE_SIZE = 4096;
34 #endif
35 }  // namespace
36 
37 struct PthreadLocker {
PthreadLockerPthreadLocker38     explicit PthreadLocker(pthread_mutex_t& mutex) : mutex_(mutex)
39     {
40         pthread_mutex_lock(&mutex_);
41     }
42 
~PthreadLockerPthreadLocker43     ~PthreadLocker()
44     {
45         pthread_mutex_unlock(&mutex_);
46     }
47 
48 private:
49     pthread_mutex_t& mutex_;
50 };
51 
ShareMemoryBlock()52 ShareMemoryBlock::ShareMemoryBlock()
53     : fileDescriptor_(-1),
54       memoryPoint_(nullptr),
55       memorySize_(0),
56       memoryName_(),
57       header_(nullptr),
58       reusePloicy_(ReusePolicy::DROP_NONE)
59 {
60 }
61 
CreateBlockWithFd(std::string name,uint32_t size,int fd)62 bool ShareMemoryBlock::CreateBlockWithFd(std::string name, uint32_t size, int fd)
63 {
64     CHECK_TRUE(fd >= 0, false, "CreateBlock FAIL SYS_memfd_create");
65 
66     auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
67     if (ptr == MAP_FAILED) {
68         const int bufSize = 256;
69         char buf[bufSize] = {0};
70         strerror_r(errno, buf, bufSize);
71         PROFILER_LOG_ERROR(LOG_CORE, "CreateBlockWithFd mmap ERR : %s", buf);
72         return false;
73     }
74 
75     fileDescriptor_ = fd;
76     memoryPoint_ = ptr;
77     memorySize_ = size;
78 
79     memoryName_ = name;
80     header_ = reinterpret_cast<BlockHeader*>(ptr);
81 
82     // Reserve 4 bytes to fill the message length.
83     messageWriteOffset_ = PIECE_HEAD_LEN;
84     // Functions required to bind the BaseMessage class.
85     smbCtx_.block = this;
86     smbCtx_.ctx.getMemory = [](RandomWriteCtx* ctx, uint32_t size, uint8_t** memory, uint32_t* offset) -> bool {
87         ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
88         return smbCtx->block->GetMemory(size, memory, offset);
89     };
90     smbCtx_.ctx.seek = [](RandomWriteCtx* ctx, uint32_t offset) -> bool {
91         ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
92         return smbCtx->block->Seek(offset);
93     };
94     return true;
95 }
96 
CreateBlock(std::string name,uint32_t size)97 bool ShareMemoryBlock::CreateBlock(std::string name, uint32_t size)
98 {
99     PROFILER_LOG_INFO(LOG_CORE, "CreateBlock %s %d", name.c_str(), size);
100     CHECK_TRUE(size > sizeof(BlockHeader), false, "size %u too less!", size);
101     CHECK_TRUE(size % PAGE_SIZE == 0, false, "size %u not times of %d!", size, PAGE_SIZE);
102 
103     int fd = OHOS::AshmemCreate(name.c_str(), size);
104     CHECK_TRUE(fd >= 0, false, "OHOS::AshmemCreate fail.");
105 
106     int check = OHOS::AshmemSetProt(fd, PROT_READ | PROT_WRITE);
107     if (check < 0) {
108         close(fd);
109         const int bufSize = 256;
110         char buf[bufSize] = {0};
111         strerror_r(errno, buf, bufSize);
112         PROFILER_LOG_ERROR(LOG_CORE, "OHOS::AshmemSetProt ERR : %s", buf);
113         return false;
114     }
115 
116     auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
117     if (ptr == MAP_FAILED) {
118         close(fd);
119         const int bufSize = 256;
120         char buf[bufSize] = {0};
121         strerror_r(errno, buf, bufSize);
122         PROFILER_LOG_ERROR(LOG_CORE, "CreateBlock mmap ERR : %s", buf);
123         return false;
124     }
125 
126     fileDescriptor_ = fd;
127     memoryPoint_ = ptr;
128     memorySize_ = size;
129 
130     memoryName_ = name;
131     header_ = reinterpret_cast<BlockHeader*>(ptr);
132 
133     // initialize header infos
134     header_->info.readOffset_ = 0;
135     header_->info.writeOffset_ = 0;
136     header_->info.memorySize_ = size - sizeof(BlockHeader);
137     header_->info.bytesCount_ = 0;
138     header_->info.chunkCount_ = 0;
139 
140     pthread_mutexattr_t muAttr;
141     pthread_mutexattr_init(&muAttr);
142     pthread_mutexattr_setpshared(&muAttr, PTHREAD_PROCESS_SHARED);
143     pthread_mutexattr_settype(&muAttr, PTHREAD_MUTEX_RECURSIVE);
144     pthread_mutex_init(&header_->info.mutex_, &muAttr);
145     return true;
146 }
147 
Valid() const148 bool ShareMemoryBlock::Valid() const
149 {
150     return header_ != nullptr;
151 }
152 
ShareMemoryBlock(const std::string & name,uint32_t size)153 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size) : ShareMemoryBlock()
154 {
155     CreateBlock(name, size);
156 }
157 
ShareMemoryBlock(const std::string & name,uint32_t size,int fd)158 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size, int fd) : ShareMemoryBlock()
159 {
160     CreateBlockWithFd(name, size, fd);
161 }
162 
~ShareMemoryBlock()163 ShareMemoryBlock::~ShareMemoryBlock()
164 {
165     ReleaseBlock();
166 }
167 
ReleaseBlock()168 bool ShareMemoryBlock::ReleaseBlock()
169 {
170     if (memorySize_ > 0) {
171         munmap(memoryPoint_, memorySize_);
172         memoryPoint_ = nullptr;
173         memorySize_ = 0;
174     }
175 
176     if (fileDescriptor_ >= 0) {
177         close(fileDescriptor_);
178         fileDescriptor_ = -1;
179     }
180     return true;
181 }
182 
GetCurrentFreeMemory(uint32_t size)183 int8_t* ShareMemoryBlock::GetCurrentFreeMemory(uint32_t size)
184 {
185     CHECK_NOTNULL(header_, nullptr, "header not ready!");
186     uint32_t realSize = size + PIECE_HEAD_LEN + PIECE_HEAD_LEN;
187 
188     uint32_t wp = header_->info.writeOffset_.load();
189     if (wp + realSize > header_->info.memorySize_) {  // 后面部分放不下,从头开始放
190         if (header_->info.readOffset_.load() == 0) {
191             return nullptr;
192         }
193         *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
194         wp = 0;
195     }
196     if (wp < header_->info.readOffset_.load() && header_->info.readOffset_.load() < wp + realSize) {  //
197         return nullptr;
198     }
199 
200     return &header_->data[wp + PIECE_HEAD_LEN];
201 }
202 
GetFreeMemory(uint32_t size)203 int8_t* ShareMemoryBlock::GetFreeMemory(uint32_t size)
204 {
205     if (reusePloicy_ == ReusePolicy::DROP_NONE) {
206         return GetCurrentFreeMemory(size);
207     }
208     int8_t* ret = nullptr;
209     while (true) {
210         ret = GetCurrentFreeMemory(size);
211         if (ret != nullptr) {
212             break;
213         }
214         if (!Next()) {
215             return nullptr;
216         }
217     }
218     return ret;
219 }
220 
UseFreeMemory(int8_t * pmem,uint32_t size)221 bool ShareMemoryBlock::UseFreeMemory(int8_t* pmem, uint32_t size)
222 {
223     uint32_t wp = pmem - PIECE_HEAD_LEN - header_->data;
224     *((int*)(&header_->data[wp])) = size;
225 
226     header_->info.writeOffset_ = wp + PIECE_HEAD_LEN + size;
227     return true;
228 }
229 
PutRaw(const int8_t * data,uint32_t size)230 bool ShareMemoryBlock::PutRaw(const int8_t* data, uint32_t size)
231 {
232     CHECK_NOTNULL(header_, false, "header not ready!");
233     PthreadLocker locker(header_->info.mutex_);
234     int8_t* rawMemory = GetFreeMemory(size);
235     if (rawMemory == nullptr) {
236         PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
237         return false;
238     }
239     if (memcpy_s(rawMemory, size, data, size) != EOK) {
240         PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
241         return false;
242     }
243 
244     UseFreeMemory(rawMemory, size);
245     ++header_->info.bytesCount_;
246     ++header_->info.chunkCount_;
247     return true;
248 }
249 
PutRawTimeout(const int8_t * data,uint32_t size)250 bool ShareMemoryBlock::PutRawTimeout(const int8_t* data, uint32_t size)
251 {
252     CHECK_NOTNULL(header_, false, "header not ready!");
253 
254     struct timespec time_out;
255     clock_gettime(CLOCK_REALTIME, &time_out);
256     time_out.tv_sec += TIMEOUT_SEC;
257     if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
258         PROFILER_LOG_ERROR(LOG_CORE, "PutRawTimeout failed %d", errno);
259         return false;
260     }
261 
262     int8_t* rawMemory = GetFreeMemory(size);
263     if (rawMemory == nullptr) {
264         PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
265         pthread_mutex_unlock(&header_->info.mutex_);
266         return false;
267     }
268     if (memcpy_s(rawMemory, size, data, size) != EOK) {
269         PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
270         pthread_mutex_unlock(&header_->info.mutex_);
271         return false;
272     }
273 
274     UseFreeMemory(rawMemory, size);
275     ++header_->info.bytesCount_;
276     ++header_->info.chunkCount_;
277 
278     pthread_mutex_unlock(&header_->info.mutex_);
279     return true;
280 }
281 
PutWithPayloadTimeout(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize)282 bool ShareMemoryBlock::PutWithPayloadTimeout(const int8_t* header, uint32_t headerSize,
283     const int8_t* payload, uint32_t payloadSize)
284 {
285     CHECK_NOTNULL(header_, false, "header not ready!");
286     struct timespec time_out;
287     clock_gettime(CLOCK_REALTIME, &time_out);
288     time_out.tv_sec += TIMEOUT_SEC;
289     if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
290         return false;
291     }
292 
293     int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
294     if (rawMemory == nullptr) {
295         PROFILER_LOG_INFO(LOG_CORE, "%s: shared memory exhausted, discarding data", __FUNCTION__);
296         pthread_mutex_unlock(&header_->info.mutex_);
297         return false;
298     }
299     if (memcpy_s(rawMemory, headerSize, header, headerSize) != EOK) {
300         pthread_mutex_unlock(&header_->info.mutex_);
301         return false;
302     }
303     if (payloadSize > 0) {
304         if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
305             pthread_mutex_unlock(&header_->info.mutex_);
306             return false;
307         }
308     }
309     UseFreeMemory(rawMemory, headerSize + payloadSize);
310     ++header_->info.bytesCount_;
311     ++header_->info.chunkCount_;
312 
313     pthread_mutex_unlock(&header_->info.mutex_);
314     return true;
315 }
316 
317 #ifndef NO_PROTOBUF
PutMessage(const google::protobuf::Message & pmsg,const std::string & pluginName)318 bool ShareMemoryBlock::PutMessage(const google::protobuf::Message& pmsg, const std::string& pluginName)
319 {
320     size_t size = pmsg.ByteSizeLong();
321 
322     CHECK_NOTNULL(header_, false, "header not ready!");
323     PthreadLocker locker(header_->info.mutex_);
324     int8_t* rawMemory = GetFreeMemory(size);
325     if (rawMemory == nullptr) {
326         PROFILER_LOG_ERROR(LOG_CORE, "%s: PutMessage not enough space [%zu]", pluginName.c_str(), size);
327         return false;
328     }
329 
330     int ret = pmsg.SerializeToArray(rawMemory, size);
331     if (ret <= 0) {
332         PROFILER_LOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
333         return false;
334     }
335     UseFreeMemory(rawMemory, size);
336     ++header_->info.bytesCount_;
337     ++header_->info.chunkCount_;
338     return true;
339 }
340 #endif
341 
TakeData(const DataHandler & func,bool isProtobufSerialize)342 bool ShareMemoryBlock::TakeData(const DataHandler& func, bool isProtobufSerialize)
343 {
344     if (!isProtobufSerialize) {
345         return TakeDataOptimize(func);
346     }
347 
348     CHECK_NOTNULL(header_, false, "header not ready!");
349     CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
350 
351     auto size = GetDataSize();
352     if (size == 0) {
353         return false;
354     }
355     auto ptr = GetDataPoint();
356     CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
357     CHECK_TRUE(Next(), false, "move read pointer FAILED!");
358     --header_->info.chunkCount_;
359     return true;
360 }
361 
GetDataSize()362 uint32_t ShareMemoryBlock::GetDataSize()
363 {
364     if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
365         return 0;
366     }
367     uint32_t ret = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
368     if (ret == INVALID_LENGTH) {
369         ret = *((uint32_t*)(&header_->data[0]));
370     }
371     return ret;
372 }
373 
GetDataPoint()374 const int8_t* ShareMemoryBlock::GetDataPoint()
375 {
376     if (*((uint32_t*)(&header_->data[header_->info.readOffset_.load()])) == INVALID_LENGTH) {
377         return &header_->data[PIECE_HEAD_LEN];
378     }
379     return &header_->data[header_->info.readOffset_ .load()+ PIECE_HEAD_LEN];
380 }
381 
Next()382 bool ShareMemoryBlock::Next()
383 {
384     if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
385         return false;
386     }
387     uint32_t size = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
388     if (size == INVALID_LENGTH) {
389         size = *((uint32_t*)(&header_->data[0]));
390         header_->info.readOffset_ = size + PIECE_HEAD_LEN;
391     } else {
392         header_->info.readOffset_ += size + PIECE_HEAD_LEN;
393     }
394     return true;
395 }
396 
GetName()397 std::string ShareMemoryBlock::GetName()
398 {
399     return memoryName_;
400 }
401 
GetSize()402 uint32_t ShareMemoryBlock::GetSize()
403 {
404     return memorySize_;
405 }
406 
GetfileDescriptor()407 int ShareMemoryBlock::GetfileDescriptor()
408 {
409     return fileDescriptor_;
410 }
411 
PutWithPayloadSync(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize,const std::function<bool ()> & callback)412 bool ShareMemoryBlock::PutWithPayloadSync(const int8_t* header, uint32_t headerSize,
413     const int8_t* payload, uint32_t payloadSize, const std::function<bool()>& callback)
414 {
415     CHECK_NOTNULL(header_, false, "header not ready!");
416     pthread_mutex_lock(&header_->info.mutex_);
417     int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
418     if (rawMemory == nullptr) {
419         while (true) {
420             if (rawMemory == nullptr) {
421                 if (callback && callback()) {
422                     pthread_mutex_unlock(&header_->info.mutex_);
423                     return false;
424                 }
425                 pthread_mutex_unlock(&header_->info.mutex_);
426                 PROFILER_LOG_DEBUG(LOG_CORE, "%s:%d block", __FUNCTION__, __LINE__);
427                 usleep(WAIT_RELEASE_TIMEOUT_US);
428                 pthread_mutex_lock(&header_->info.mutex_);
429                 rawMemory = GetFreeMemory(headerSize + payloadSize);
430                 continue;
431             }
432             break;
433         }
434     }
435     if (memcpy_s(rawMemory, headerSize + payloadSize, header, headerSize) != EOK) {
436         pthread_mutex_unlock(&header_->info.mutex_);
437         return false;
438     }
439     if (payloadSize > 0) {
440         if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
441             pthread_mutex_unlock(&header_->info.mutex_);
442             return false;
443         }
444     }
445     UseFreeMemory(rawMemory, headerSize + payloadSize);
446     ++header_->info.bytesCount_;
447     ++header_->info.chunkCount_;
448     pthread_mutex_unlock(&header_->info.mutex_);
449     return true;
450 }
451 
UseMemory(int32_t size)452 void ShareMemoryBlock::UseMemory(int32_t size)
453 {
454     CHECK_TRUE(header_ != nullptr, NO_RETVAL, "header not ready!");
455     CHECK_TRUE(size > 0, NO_RETVAL, "size(%d) is invalid", size);
456 
457     uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
458     *((int*)(&header_->data[wp])) = size;
459     header_->info.writeOffset_.store(wp + PIECE_HEAD_LEN + size, std::memory_order_release);
460 }
461 
GetMemory(uint32_t size,uint8_t ** memory,uint32_t * offset)462 bool ShareMemoryBlock::GetMemory(uint32_t size, uint8_t** memory, uint32_t* offset)
463 {
464     CHECK_NOTNULL(header_, false, "header not ready!");
465 
466     // The actual size is to store data with a size of offset and a size of data and a four byte tail tag.
467     uint32_t realSize = messageWriteOffset_ + size + PIECE_HEAD_LEN;
468     uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
469     uint32_t rp = header_->info.readOffset_.load(std::memory_order_acquire);
470     if (rp <= wp) {
471         if (wp + realSize <= header_->info.memorySize_) {
472             // enough tail space to store data.
473             *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
474             *offset = messageWriteOffset_;
475             return true;
476         } else if (realSize <= rp) {
477             // there is data in the tail, and it is need to copy the data in the tail to the header for saving.
478             auto ret = memcpy_s(&header_->data[0], messageWriteOffset_, &header_->data[wp], messageWriteOffset_);
479             CHECK_TRUE(ret == EOK, false, "memcpy_s messageWriteOffset_(%d) data failed", messageWriteOffset_);
480             // set trailing data end tag.
481             *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
482             // set writeOffset_ to zero.
483             header_->info.writeOffset_.store(0, std::memory_order_release);
484             *memory = reinterpret_cast<uint8_t *>(&header_->data[messageWriteOffset_]);
485             *offset = messageWriteOffset_;
486             return true;
487         }
488     } else {
489         if (wp + realSize <= rp) {
490             // rp is after wp and there is enough space to store data.
491             *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
492             *offset = messageWriteOffset_;
493             return true;
494         }
495     }
496 
497     PROFILER_LOG_ERROR(LOG_CORE, "Write not enough space, realSize=%u, rp=%u, wp=%u", realSize, rp, wp);
498     return false;
499 }
500 
TakeDataOptimize(const DataHandler & func)501 bool ShareMemoryBlock::TakeDataOptimize(const DataHandler& func)
502 {
503     CHECK_NOTNULL(header_, false, "header not ready!");
504     CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
505 
506     uint32_t wp = header_->info.writeOffset_.load(std::memory_order_acquire);
507     uint32_t rp = header_->info.readOffset_.load(std::memory_order_relaxed);
508     int8_t* ptr = nullptr;
509     uint32_t size = 0;
510     if (rp < wp) {
511         // |---rp<---data--->wp---|
512         size = *((uint32_t*)(&header_->data[rp]));
513         ptr = &header_->data[rp + PIECE_HEAD_LEN];
514     } else if (wp < rp) {
515         // |<---data2--->wp---rp<---data1--->|
516         size = *((uint32_t*)(&header_->data[rp]));
517         // Size is the end tag of the tail and needs to be retrieved from the header.
518         if (size == INVALID_LENGTH) {
519             if (wp == 0) {
520                 // no data to read.
521                 return false;
522             }
523             rp = 0;
524             size = *((uint32_t*)(&header_->data[rp]));
525         }
526         ptr = &header_->data[rp + PIECE_HEAD_LEN];
527     } else {
528         // wp == rp
529         return false;
530     }
531     CHECK_NOTNULL(ptr, false, "ptr is nullptr");
532 
533     // Start writing file.
534     CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
535 
536     header_->info.readOffset_.store(rp + size + PIECE_HEAD_LEN, std::memory_order_release);
537     return true;
538 }
539 
Seek(uint32_t pos)540 bool ShareMemoryBlock::Seek(uint32_t pos)
541 {
542     messageWriteOffset_ = pos;
543     return true;
544 }
545 
ResetPos()546 void ShareMemoryBlock::ResetPos()
547 {
548     messageWriteOffset_ = PIECE_HEAD_LEN;
549 }