• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "share_memory_block.h"
17 
18 #include <cstring>
19 #include <fcntl.h>
20 #include <sys/mman.h>
21 #include <sys/syscall.h>
22 #include <unistd.h>
23 #include "ashmem.h"
24 #include "logging.h"
25 #include "securec.h"
26 
27 namespace {
28 const int PIECE_HEAD_LEN = 4;
29 constexpr uint32_t INVALID_LENGTH = (uint32_t)-1;
30 constexpr uint32_t TIMEOUT_SEC = 1;
31 const int WAIT_RELEASE_TIMEOUT_US = 10;
32 static std::atomic<int> g_timeCount = 0;
33 #ifndef PAGE_SIZE
34 constexpr uint32_t PAGE_SIZE = 4096;
35 #endif
36 }  // namespace
37 
38 struct PthreadLocker {
PthreadLockerPthreadLocker39     explicit PthreadLocker(pthread_mutex_t& mutex) : mutex_(mutex)
40     {
41         pthread_mutex_lock(&mutex_);
42     }
43 
~PthreadLockerPthreadLocker44     ~PthreadLocker()
45     {
46         pthread_mutex_unlock(&mutex_);
47     }
48 
49 private:
50     pthread_mutex_t& mutex_;
51 };
52 
ShareMemoryBlock()53 ShareMemoryBlock::ShareMemoryBlock()
54     : fileDescriptor_(-1),
55       memoryPoint_(nullptr),
56       memorySize_(0),
57       memoryName_(),
58       header_(nullptr),
59       reusePloicy_(ReusePolicy::DROP_NONE)
60 {
61 }
62 
CreateBlockWithFd(std::string name,uint32_t size,int fd)63 bool ShareMemoryBlock::CreateBlockWithFd(std::string name, uint32_t size, int fd)
64 {
65     CHECK_TRUE(fd >= 0, false, "CreateBlock FAIL SYS_memfd_create");
66 
67     auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
68     if (ptr == MAP_FAILED) {
69         const int bufSize = 256;
70         char buf[bufSize] = {0};
71         strerror_r(errno, buf, bufSize);
72         PROFILER_LOG_ERROR(LOG_CORE, "CreateBlockWithFd mmap ERR : %s", buf);
73         return false;
74     }
75 
76     fileDescriptor_ = fd;
77     memoryPoint_ = ptr;
78     memorySize_ = size;
79 
80     memoryName_ = name;
81     header_ = reinterpret_cast<BlockHeader*>(ptr);
82 
83     // Reserve 4 bytes to fill the message length.
84     messageWriteOffset_ = PIECE_HEAD_LEN;
85     // Functions required to bind the BaseMessage class.
86     smbCtx_.block = this;
87     smbCtx_.ctx.getMemory = [](RandomWriteCtx* ctx, uint32_t size, uint8_t** memory, uint32_t* offset) -> bool {
88         ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
89         return smbCtx->block->GetMemory(size, memory, offset);
90     };
91     smbCtx_.ctx.seek = [](RandomWriteCtx* ctx, uint32_t offset) -> bool {
92         ShareMemoryBlockCtx* smbCtx = reinterpret_cast<ShareMemoryBlockCtx*>(ctx);
93         return smbCtx->block->Seek(offset);
94     };
95     return true;
96 }
97 
CreateBlock(std::string name,uint32_t size)98 bool ShareMemoryBlock::CreateBlock(std::string name, uint32_t size)
99 {
100     PROFILER_LOG_INFO(LOG_CORE, "CreateBlock %s %d", name.c_str(), size);
101     CHECK_TRUE(size > sizeof(BlockHeader), false, "size %u too less!", size);
102     CHECK_TRUE(size % PAGE_SIZE == 0, false, "size %u not times of %d!", size, PAGE_SIZE);
103 
104     int fd = OHOS::AshmemCreate(name.c_str(), size);
105     CHECK_TRUE(fd >= 0, false, "OHOS::AshmemCreate fail.");
106 
107     int check = OHOS::AshmemSetProt(fd, PROT_READ | PROT_WRITE);
108     if (check < 0) {
109         close(fd);
110         const int bufSize = 256;
111         char buf[bufSize] = {0};
112         strerror_r(errno, buf, bufSize);
113         PROFILER_LOG_ERROR(LOG_CORE, "OHOS::AshmemSetProt ERR : %s", buf);
114         return false;
115     }
116 
117     auto ptr = mmap(nullptr, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
118     if (ptr == MAP_FAILED) {
119         close(fd);
120         const int bufSize = 256;
121         char buf[bufSize] = {0};
122         strerror_r(errno, buf, bufSize);
123         PROFILER_LOG_ERROR(LOG_CORE, "CreateBlock mmap ERR : %s", buf);
124         return false;
125     }
126 
127     fileDescriptor_ = fd;
128     memoryPoint_ = ptr;
129     memorySize_ = size;
130 
131     memoryName_ = name;
132     header_ = reinterpret_cast<BlockHeader*>(ptr);
133     if (header_ == nullptr) {
134         return false;
135     }
136     // initialize header infos
137     header_->info.readOffset_ = 0;
138     header_->info.writeOffset_ = 0;
139     header_->info.memorySize_ = size - sizeof(BlockHeader);
140     header_->info.bytesCount_ = 0;
141     header_->info.chunkCount_ = 0;
142 
143     pthread_mutexattr_t muAttr;
144     pthread_mutexattr_init(&muAttr);
145     pthread_mutexattr_settype(&muAttr, PTHREAD_MUTEX_RECURSIVE);
146     pthread_mutex_init(&header_->info.mutex_, &muAttr);
147     return true;
148 }
149 
Valid() const150 bool ShareMemoryBlock::Valid() const
151 {
152     return header_ != nullptr;
153 }
154 
ShareMemoryBlock(const std::string & name,uint32_t size)155 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size) : ShareMemoryBlock()
156 {
157     CreateBlock(name, size);
158 }
159 
ShareMemoryBlock(const std::string & name,uint32_t size,int fd)160 ShareMemoryBlock::ShareMemoryBlock(const std::string& name, uint32_t size, int fd) : ShareMemoryBlock()
161 {
162     CreateBlockWithFd(name, size, fd);
163 }
164 
~ShareMemoryBlock()165 ShareMemoryBlock::~ShareMemoryBlock()
166 {
167     ReleaseBlock();
168 }
169 
ReleaseBlock()170 bool ShareMemoryBlock::ReleaseBlock()
171 {
172     if (memorySize_ > 0) {
173         munmap(memoryPoint_, memorySize_);
174         memoryPoint_ = nullptr;
175         memorySize_ = 0;
176     }
177     g_timeCount = 0;
178     if (fileDescriptor_ >= 0) {
179         close(fileDescriptor_);
180         fileDescriptor_ = -1;
181     }
182     return true;
183 }
184 
GetCurrentFreeMemory(uint32_t size)185 int8_t* ShareMemoryBlock::GetCurrentFreeMemory(uint32_t size)
186 {
187     CHECK_NOTNULL(header_, nullptr, "header not ready!");
188     uint32_t realSize = size + PIECE_HEAD_LEN + PIECE_HEAD_LEN;
189 
190     uint32_t wp = header_->info.writeOffset_.load();
191     if (wp + realSize > header_->info.memorySize_) {  // 后面部分放不下,从头开始放
192         if (header_->info.readOffset_.load() == 0) {
193             return nullptr;
194         }
195         *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
196         wp = 0;
197     }
198     if (wp < header_->info.readOffset_.load() && header_->info.readOffset_.load() < wp + realSize) {  //
199         return nullptr;
200     }
201 
202     return &header_->data[wp + PIECE_HEAD_LEN];
203 }
204 
GetFreeMemory(uint32_t size)205 int8_t* ShareMemoryBlock::GetFreeMemory(uint32_t size)
206 {
207     if (reusePloicy_ == ReusePolicy::DROP_NONE) {
208         return GetCurrentFreeMemory(size);
209     }
210     int8_t* ret = nullptr;
211     while (true) {
212         ret = GetCurrentFreeMemory(size);
213         if (ret != nullptr) {
214             break;
215         }
216         if (!Next()) {
217             return nullptr;
218         }
219     }
220     return ret;
221 }
222 
UseFreeMemory(int8_t * pmem,uint32_t size)223 bool ShareMemoryBlock::UseFreeMemory(int8_t* pmem, uint32_t size)
224 {
225     uint32_t wp = pmem - PIECE_HEAD_LEN - header_->data;
226     *((int*)(&header_->data[wp])) = size;
227 
228     header_->info.writeOffset_ = wp + PIECE_HEAD_LEN + size;
229     return true;
230 }
231 
PutRaw(const int8_t * data,uint32_t size)232 bool ShareMemoryBlock::PutRaw(const int8_t* data, uint32_t size)
233 {
234     CHECK_NOTNULL(header_, false, "header not ready!");
235     PthreadLocker locker(header_->info.mutex_);
236     int8_t* rawMemory = GetFreeMemory(size);
237     if (rawMemory == nullptr) {
238         PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
239         return false;
240     }
241     if (memcpy_s(rawMemory, size, data, size) != EOK) {
242         PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
243         return false;
244     }
245 
246     UseFreeMemory(rawMemory, size);
247     ++header_->info.bytesCount_;
248     ++header_->info.chunkCount_;
249     return true;
250 }
251 
PutRawTimeout(const int8_t * data,uint32_t size)252 bool ShareMemoryBlock::PutRawTimeout(const int8_t* data, uint32_t size)
253 {
254     CHECK_NOTNULL(header_, false, "header not ready!");
255 
256     struct timespec time_out;
257     clock_gettime(CLOCK_REALTIME, &time_out);
258     time_out.tv_sec += TIMEOUT_SEC;
259     if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
260         PROFILER_LOG_ERROR(LOG_CORE, "PutRawTimeout failed %d", errno);
261         return false;
262     }
263 
264     int8_t* rawMemory = GetFreeMemory(size);
265     if (rawMemory == nullptr) {
266         PROFILER_LOG_ERROR(LOG_CORE, "PutRaw not enough space [%d]", size);
267         pthread_mutex_unlock(&header_->info.mutex_);
268         return false;
269     }
270     if (memcpy_s(rawMemory, size, data, size) != EOK) {
271         PROFILER_LOG_ERROR(LOG_CORE, "memcpy_s error");
272         pthread_mutex_unlock(&header_->info.mutex_);
273         return false;
274     }
275 
276     UseFreeMemory(rawMemory, size);
277     ++header_->info.bytesCount_;
278     ++header_->info.chunkCount_;
279 
280     pthread_mutex_unlock(&header_->info.mutex_);
281     return true;
282 }
283 
PutWithPayloadTimeout(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize)284 bool ShareMemoryBlock::PutWithPayloadTimeout(const int8_t* header, uint32_t headerSize,
285     const int8_t* payload, uint32_t payloadSize)
286 {
287     CHECK_NOTNULL(header_, false, "header not ready!");
288     struct timespec time_out;
289     clock_gettime(CLOCK_REALTIME, &time_out);
290     time_out.tv_sec += TIMEOUT_SEC;
291     if (pthread_mutex_timedlock(&header_->info.mutex_, &time_out) != 0) {
292         return false;
293     }
294 
295     int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
296     if (rawMemory == nullptr) {
297         PROFILER_LOG_INFO(LOG_CORE, "%s: shared memory exhausted, discarding data", __FUNCTION__);
298         pthread_mutex_unlock(&header_->info.mutex_);
299         return false;
300     }
301     if (memcpy_s(rawMemory, headerSize, header, headerSize) != EOK) {
302         pthread_mutex_unlock(&header_->info.mutex_);
303         return false;
304     }
305     if (payloadSize > 0) {
306         if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
307             pthread_mutex_unlock(&header_->info.mutex_);
308             return false;
309         }
310     }
311     UseFreeMemory(rawMemory, headerSize + payloadSize);
312     ++header_->info.bytesCount_;
313     ++header_->info.chunkCount_;
314 
315     pthread_mutex_unlock(&header_->info.mutex_);
316     return true;
317 }
318 
319 #ifndef NO_PROTOBUF
PutMessage(const google::protobuf::Message & pmsg,const std::string & pluginName)320 bool ShareMemoryBlock::PutMessage(const google::protobuf::Message& pmsg, const std::string& pluginName)
321 {
322     size_t size = pmsg.ByteSizeLong();
323 
324     CHECK_NOTNULL(header_, false, "header not ready!");
325     PthreadLocker locker(header_->info.mutex_);
326     int8_t* rawMemory = GetFreeMemory(size);
327     if (rawMemory == nullptr) {
328         PROFILER_LOG_ERROR(LOG_CORE, "%s: PutMessage not enough space [%zu]", pluginName.c_str(), size);
329         return false;
330     }
331 
332     int ret = pmsg.SerializeToArray(rawMemory, size);
333     if (ret <= 0) {
334         PROFILER_LOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
335         return false;
336     }
337     UseFreeMemory(rawMemory, size);
338     ++header_->info.bytesCount_;
339     ++header_->info.chunkCount_;
340     return true;
341 }
342 #endif
343 
TakeData(const DataHandler & func,bool isProtobufSerialize)344 bool ShareMemoryBlock::TakeData(const DataHandler& func, bool isProtobufSerialize)
345 {
346     if (!isProtobufSerialize) {
347         return TakeDataOptimize(func);
348     }
349 
350     CHECK_NOTNULL(header_, false, "header not ready!");
351     CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
352 
353     auto size = GetDataSize();
354     if (size == 0 || size > header_->info.memorySize_) {
355         return false;
356     }
357     auto ptr = GetDataPoint();
358     CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
359     CHECK_TRUE(Next(), false, "move read pointer FAILED!");
360     --header_->info.chunkCount_;
361     return true;
362 }
363 
GetDataSize()364 uint32_t ShareMemoryBlock::GetDataSize()
365 {
366     if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
367         return 0;
368     }
369     uint32_t ret = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
370     if (ret == INVALID_LENGTH) {
371         ret = *((uint32_t*)(&header_->data[0]));
372     }
373     return ret;
374 }
375 
GetDataPoint()376 const int8_t* ShareMemoryBlock::GetDataPoint()
377 {
378     if (*((uint32_t*)(&header_->data[header_->info.readOffset_.load()])) == INVALID_LENGTH) {
379         return &header_->data[PIECE_HEAD_LEN];
380     }
381     return &header_->data[header_->info.readOffset_ .load()+ PIECE_HEAD_LEN];
382 }
383 
Next()384 bool ShareMemoryBlock::Next()
385 {
386     if (header_->info.readOffset_.load() == header_->info.writeOffset_.load()) {
387         return false;
388     }
389     uint32_t size = *((uint32_t*)(&header_->data[header_->info.readOffset_.load()]));
390     if (size == INVALID_LENGTH) {
391         size = *((uint32_t*)(&header_->data[0]));
392         header_->info.readOffset_ = size + PIECE_HEAD_LEN;
393     } else {
394         header_->info.readOffset_ += size + PIECE_HEAD_LEN;
395     }
396     return true;
397 }
398 
GetName()399 std::string ShareMemoryBlock::GetName()
400 {
401     return memoryName_;
402 }
403 
GetSize()404 uint32_t ShareMemoryBlock::GetSize()
405 {
406     return memorySize_;
407 }
408 
GetfileDescriptor()409 int ShareMemoryBlock::GetfileDescriptor()
410 {
411     return fileDescriptor_;
412 }
413 
PutWithPayloadSync(const int8_t * header,uint32_t headerSize,const int8_t * payload,uint32_t payloadSize,const std::function<bool ()> & callback)414 bool ShareMemoryBlock::PutWithPayloadSync(const int8_t* header, uint32_t headerSize,
415     const int8_t* payload, uint32_t payloadSize, const std::function<bool()>& callback)
416 {
417     CHECK_NOTNULL(header_, false, "header not ready!");
418     pthread_mutex_lock(&header_->info.mutex_);
419     int8_t* rawMemory = GetFreeMemory(headerSize + payloadSize);
420     if (rawMemory == nullptr) {
421         while (true) {
422             if (rawMemory == nullptr) {
423                 if ((callback && callback()) || (g_timeCount > waitTime_)) {
424                     PROFILER_LOG_ERROR(LOG_CORE, "PutWithPayloadSync exit with g_timeCount %d", g_timeCount.load());
425                     pthread_mutex_unlock(&header_->info.mutex_);
426                     return false;
427                 }
428                 pthread_mutex_unlock(&header_->info.mutex_);
429                 usleep(WAIT_RELEASE_TIMEOUT_US);
430                 g_timeCount += WAIT_RELEASE_TIMEOUT_US;
431                 pthread_mutex_lock(&header_->info.mutex_);
432                 rawMemory = GetFreeMemory(headerSize + payloadSize);
433                 continue;
434             }
435             g_timeCount = 0;
436             break;
437         }
438     }
439     if (memcpy_s(rawMemory, headerSize + payloadSize, header, headerSize) != EOK) {
440         pthread_mutex_unlock(&header_->info.mutex_);
441         return false;
442     }
443     if (payloadSize > 0) {
444         if (memcpy_s(rawMemory + headerSize, payloadSize, payload, payloadSize) != EOK) {
445             pthread_mutex_unlock(&header_->info.mutex_);
446             return false;
447         }
448     }
449     UseFreeMemory(rawMemory, headerSize + payloadSize);
450     ++header_->info.bytesCount_;
451     ++header_->info.chunkCount_;
452     pthread_mutex_unlock(&header_->info.mutex_);
453     return true;
454 }
455 
UseMemory(int32_t size)456 void ShareMemoryBlock::UseMemory(int32_t size)
457 {
458     CHECK_TRUE(header_ != nullptr, NO_RETVAL, "header not ready!");
459     CHECK_TRUE(size > 0, NO_RETVAL, "size(%d) is invalid", size);
460 
461     uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
462     *((int*)(&header_->data[wp])) = size;
463     header_->info.writeOffset_.store(wp + PIECE_HEAD_LEN + size, std::memory_order_release);
464 }
465 
GetMemory(uint32_t size,uint8_t ** memory,uint32_t * offset)466 bool ShareMemoryBlock::GetMemory(uint32_t size, uint8_t** memory, uint32_t* offset)
467 {
468     CHECK_NOTNULL(header_, false, "header not ready!");
469 
470     // The actual size is to store data with a size of offset and a size of data and a four byte tail tag.
471     uint32_t realSize = messageWriteOffset_ + size + PIECE_HEAD_LEN;
472     uint32_t wp = header_->info.writeOffset_.load(std::memory_order_relaxed);
473     uint32_t rp = header_->info.readOffset_.load(std::memory_order_acquire);
474     if (rp <= wp) {
475         if (wp + realSize <= header_->info.memorySize_) {
476             // enough tail space to store data.
477             *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
478             *offset = messageWriteOffset_;
479             return true;
480         } else if (realSize <= rp) {
481             // there is data in the tail, and it is need to copy the data in the tail to the header for saving.
482             auto ret = memcpy_s(&header_->data[0], messageWriteOffset_, &header_->data[wp], messageWriteOffset_);
483             CHECK_TRUE(ret == EOK, false, "memcpy_s messageWriteOffset_(%d) data failed", messageWriteOffset_);
484             // set trailing data end tag.
485             *((uint32_t*)(&header_->data[wp])) = INVALID_LENGTH;
486             // set writeOffset_ to zero.
487             header_->info.writeOffset_.store(0, std::memory_order_release);
488             *memory = reinterpret_cast<uint8_t *>(&header_->data[messageWriteOffset_]);
489             *offset = messageWriteOffset_;
490             return true;
491         }
492     } else {
493         if (wp + realSize <= rp) {
494             // rp is after wp and there is enough space to store data.
495             *memory = reinterpret_cast<uint8_t *>(&header_->data[wp + messageWriteOffset_]);
496             *offset = messageWriteOffset_;
497             return true;
498         }
499     }
500 
501     PROFILER_LOG_ERROR(LOG_CORE, "Write not enough space, realSize=%u, rp=%u, wp=%u", realSize, rp, wp);
502     return false;
503 }
504 
TakeDataOptimize(const DataHandler & func)505 bool ShareMemoryBlock::TakeDataOptimize(const DataHandler& func)
506 {
507     CHECK_NOTNULL(header_, false, "header not ready!");
508     CHECK_TRUE(static_cast<bool>(func), false, "func invalid!");
509 
510     uint32_t wp = header_->info.writeOffset_.load(std::memory_order_acquire);
511     uint32_t rp = header_->info.readOffset_.load(std::memory_order_relaxed);
512     int8_t* ptr = nullptr;
513     uint32_t size = 0;
514     if (rp < wp) {
515         // |---rp<---data--->wp---|
516         size = *((uint32_t*)(&header_->data[rp]));
517         ptr = &header_->data[rp + PIECE_HEAD_LEN];
518     } else if (wp < rp) {
519         // |<---data2--->wp---rp<---data1--->|
520         size = *((uint32_t*)(&header_->data[rp]));
521         // Size is the end tag of the tail and needs to be retrieved from the header.
522         if (size == INVALID_LENGTH) {
523             if (wp == 0) {
524                 // no data to read.
525                 return false;
526             }
527             rp = 0;
528             size = *((uint32_t*)(&header_->data[rp]));
529         }
530         ptr = &header_->data[rp + PIECE_HEAD_LEN];
531     } else {
532         // wp == rp
533         return false;
534     }
535     CHECK_NOTNULL(ptr, false, "ptr is nullptr");
536 
537     // Start writing file.
538     CHECK_TRUE(func(ptr, size), false, "call func FAILED!");
539 
540     header_->info.readOffset_.store(rp + size + PIECE_HEAD_LEN, std::memory_order_release);
541     return true;
542 }
543 
Seek(uint32_t pos)544 bool ShareMemoryBlock::Seek(uint32_t pos)
545 {
546     messageWriteOffset_ = pos;
547     return true;
548 }
549 
ResetPos()550 void ShareMemoryBlock::ResetPos()
551 {
552     messageWriteOffset_ = PIECE_HEAD_LEN;
553 }