• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "log_persister.h"
17 
18 #include <sys/mman.h>
19 #include <sys/stat.h>
20 #include <sys/prctl.h>
21 
22 #include <fcntl.h>
23 #include <securec.h>
24 
25 #include <algorithm>
26 #include <array>
27 #include <chrono>
28 #include <climits>
29 #include <cstdio>
30 #include <cstdlib>
31 #include <cstring>
32 #include <iostream>
33 #include <mutex>
34 #include <sstream>
35 #include <string>
36 #include <thread>
37 #include <unistd.h>
38 
39 #include <hilog_common.h>
40 #include <log_buffer.h>
41 #include <log_compress.h>
42 #include <log_print.h>
43 #include <log_utils.h>
44 
45 namespace OHOS {
46 namespace HiviewDFX {
47 using namespace std;
48 
49 static const int MAX_LOG_WRITE_INTERVAL = 5;
50 
IsEmptyThread(const std::thread & th)51 static bool IsEmptyThread(const std::thread& th)
52 {
53     static const std::thread EMPTY_THREAD;
54     return th.get_id() == EMPTY_THREAD.get_id();
55 }
56 
57 std::recursive_mutex LogPersister::s_logPersistersMtx;
58 std::list<std::shared_ptr<LogPersister>> LogPersister::s_logPersisters;
59 
CreateLogPersister(HilogBuffer & buffer)60 std::shared_ptr<LogPersister> LogPersister::CreateLogPersister(HilogBuffer &buffer)
61 {
62     return std::shared_ptr<LogPersister>(new LogPersister(buffer));
63 }
64 
LogPersister(HilogBuffer & buffer)65 LogPersister::LogPersister(HilogBuffer &buffer) : m_hilogBuffer(buffer)
66 {
67     m_mappedPlainLogFile = nullptr;
68     m_bufReader = m_hilogBuffer.CreateBufReader([this]() { NotifyNewLogAvailable(); });
69     m_startMsg = { 0 };
70 }
71 
~LogPersister()72 LogPersister::~LogPersister()
73 {
74     m_hilogBuffer.RemoveBufReader(m_bufReader);
75     Deinit();
76 }
77 
InitCompression()78 int LogPersister::InitCompression()
79 {
80     m_compressBuffer = std::make_unique<LogPersisterBuffer>();
81     if (!m_compressBuffer) {
82         return RET_FAIL;
83     }
84     switch (m_startMsg.compressAlg) {
85         case COMPRESS_TYPE_NONE:
86             m_compressor = std::make_unique<NoneCompress>();
87             break;
88         case COMPRESS_TYPE_ZLIB:
89             m_compressor = std::make_unique<ZlibCompress>();
90             break;
91         case COMPRESS_TYPE_ZSTD:
92             m_compressor = std::make_unique<ZstdCompress>();
93             break;
94         default:
95             break;
96     }
97     if (!m_compressor) {
98         return RET_FAIL;
99     }
100     return RET_SUCCESS;
101 }
102 
InitFileRotator(const PersistRecoveryInfo & info,bool restore)103 int LogPersister::InitFileRotator(const PersistRecoveryInfo& info, bool restore)
104 {
105     std::string fileSuffix = "";
106     switch (m_startMsg.compressAlg) {
107         case CompressAlg::COMPRESS_TYPE_ZSTD:
108             fileSuffix = ".zst";
109             break;
110         case CompressAlg::COMPRESS_TYPE_ZLIB:
111             fileSuffix = ".gz";
112             break;
113         default:
114             break;
115     }
116     m_fileRotator = std::make_unique<LogPersisterRotator>(m_startMsg.filePath,
117                     m_startMsg.jobId, m_startMsg.fileNum, fileSuffix);
118     if (!m_fileRotator) {
119         std::cerr << "Not enough memory!\n";
120         return RET_FAIL;
121     }
122     return m_fileRotator->Init(info, restore);
123 }
124 
Init(const PersistRecoveryInfo & info,bool restore)125 int LogPersister::Init(const PersistRecoveryInfo& info, bool restore)
126 {
127     std::cout << "Persist init begin\n";
128     std::lock_guard<decltype(m_initMtx)> lock(m_initMtx);
129     if (m_inited) {
130         return 0;
131     }
132     m_startMsg = info.msg;
133 
134     std::string path = m_startMsg.filePath;
135     size_t separatorPos = path.find_last_of('/');
136     if (separatorPos == std::string::npos) {
137         return ERR_LOG_PERSIST_FILE_PATH_INVALID;
138     }
139 
140     std::string parentPath = path.substr(0, separatorPos);
141     if (access(parentPath.c_str(), F_OK) != 0) {
142         perror("persister directory does not exist.");
143         return ERR_LOG_PERSIST_FILE_PATH_INVALID;
144     }
145 
146     // below guard is needed to have sure only one Path and Id is reqistered till end of init!
147     std::lock_guard<decltype(s_logPersistersMtx)> guard(s_logPersistersMtx);
148     if (CheckRegistered(m_startMsg.jobId, path)) {
149         return ERR_LOG_PERSIST_TASK_EXISTED;
150     }
151     if (InitCompression() !=  RET_SUCCESS) {
152         return ERR_LOG_PERSIST_COMPRESS_INIT_FAIL;
153     }
154     int ret = InitFileRotator(info, restore);
155     if (ret !=  RET_SUCCESS) {
156         return ret;
157     }
158 
159     if (int result = PrepareUncompressedFile(parentPath, restore)) {
160         return result;
161     }
162 
163     RegisterLogPersister(shared_from_this());
164     m_inited = true;
165     std::cout << " Persist init done\n";
166     return 0;
167 }
168 
Deinit()169 int LogPersister::Deinit()
170 {
171     std::cout << "LogPersist deinit begin\n";
172     std::lock_guard<decltype(m_initMtx)> lock(m_initMtx);
173     if (!m_inited) {
174         return 0;
175     }
176 
177     Stop();
178 
179     munmap(m_mappedPlainLogFile, MAX_PERSISTER_BUFFER_SIZE);
180     std::cout << "Removing unmapped plain log file: " << m_plainLogFilePath << "\n";
181     if (remove(m_plainLogFilePath.c_str())) {
182         std::cerr << "File: " << m_plainLogFilePath << " can't be removed. ";
183         PrintErrorno(errno);
184     }
185 
186     DeregisterLogPersister(shared_from_this());
187     m_inited = false;
188     std::cout << "LogPersist deinit done\n";
189     return 0;
190 }
191 
PrepareUncompressedFile(const std::string & parentPath,bool restore)192 int LogPersister::PrepareUncompressedFile(const std::string& parentPath, bool restore)
193 {
194     std::string fileName = std::string(".") + AUXILLARY_PERSISTER_PREFIX + std::to_string(m_startMsg.jobId);
195     m_plainLogFilePath = parentPath + "/" + fileName;
196     FILE* plainTextFile = fopen(m_plainLogFilePath.c_str(), restore ? "r+" : "w+");
197 
198     if (!plainTextFile) {
199         std::cerr << " Open uncompressed log file(" << m_plainLogFilePath << ") failed: ";
200         PrintErrorno(errno);
201         return ERR_LOG_PERSIST_FILE_OPEN_FAIL;
202     }
203 
204     if (!restore) {
205         ftruncate(fileno(plainTextFile), sizeof(LogPersisterBuffer));
206         fflush(plainTextFile);
207         fsync(fileno(plainTextFile));
208     }
209     m_mappedPlainLogFile = reinterpret_cast<LogPersisterBuffer*>(mmap(nullptr, sizeof(LogPersisterBuffer),
210         PROT_READ | PROT_WRITE, MAP_SHARED, fileno(plainTextFile), 0));
211     if (fclose(plainTextFile)) {
212         std::cerr << "File: " << plainTextFile << " can't be closed. ";
213         PrintErrorno(errno);
214     }
215     if (m_mappedPlainLogFile == MAP_FAILED) {
216         std::cerr << " mmap file failed: ";
217         PrintErrorno(errno);
218         return RET_FAIL;
219     }
220     if (restore) {
221 #ifdef DEBUG
222         std::cout << " Recovered persister, Offset=" << m_mappedPlainLogFile->offset << "\n";
223 #endif
224         // try to store previous uncompressed logs
225         auto compressionResult = m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
226         if (compressionResult != 0) {
227             std::cerr << " Compression error. Result:" << compressionResult << "\n";
228             return RET_FAIL;
229         }
230         WriteCompressedLogs();
231     } else {
232         m_mappedPlainLogFile->offset = 0;
233     }
234     return 0;
235 }
236 
NotifyNewLogAvailable()237 void LogPersister::NotifyNewLogAvailable()
238 {
239     m_receiveLogCv.notify_one();
240 }
241 
WriteUncompressedLogs(std::string & logLine)242 bool LogPersister::WriteUncompressedLogs(std::string& logLine)
243 {
244     uint16_t size = logLine.length();
245     uint32_t remainingSpace = MAX_PERSISTER_BUFFER_SIZE - m_mappedPlainLogFile->offset;
246     if (remainingSpace < size) {
247         return false;
248     }
249     char* currentContentPos = m_mappedPlainLogFile->content + m_mappedPlainLogFile->offset;
250     int r = memcpy_s(currentContentPos, remainingSpace, logLine.c_str(), logLine.length());
251     if (r != 0) {
252         std::cout << " Can't copy part of memory!\n";
253         return true;
254     }
255     m_mappedPlainLogFile->offset += logLine.length();
256     return true;
257 }
258 
WriteLogData(const HilogData & logData)259 int LogPersister::WriteLogData(const HilogData& logData)
260 {
261     LogContent content = {
262         .level = logData.level,
263         .type = logData.type,
264         .pid = logData.pid,
265         .tid = logData.tid,
266         .domain = logData.domain,
267         .tv_sec = logData.tv_sec,
268         .tv_nsec = logData.tv_nsec,
269         .mono_sec = logData.mono_sec,
270         .tag = logData.tag,
271         .log = logData.content
272     };
273     LogFormat format = {
274         .colorful = false,
275         .timeFormat = FormatTime::TIME,
276         .timeAccuFormat = FormatTimeAccu::MSEC,
277         .year = false,
278         .zone = false,
279     };
280     std::ostringstream oss;
281     LogPrintWithFormat(content, format, oss);
282     std::string formatedLogStr = oss.str();
283     // Firstly gather uncompressed logs in auxiliary file
284     if (WriteUncompressedLogs(formatedLogStr))
285         return 0;
286     // Try to compress auxiliary file
287     auto compressionResult = m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
288     if (compressionResult != 0) {
289         std::cerr <<  " Compression error. Result:" << compressionResult << "\n";
290         return RET_FAIL;
291     }
292     // Write compressed buffor and clear counters
293     WriteCompressedLogs();
294     // Try again write data that wasn't written at the beginning
295     // If again fail then these logs are skipped
296     return WriteUncompressedLogs(formatedLogStr) ? 0 : RET_FAIL;
297 }
298 
WriteCompressedLogs()299 inline void LogPersister::WriteCompressedLogs()
300 {
301     if (m_mappedPlainLogFile->offset == 0)
302         return;
303     m_fileRotator->Input(m_compressBuffer->content, m_compressBuffer->offset);
304     m_plainLogSize += m_mappedPlainLogFile->offset;
305     if (m_plainLogSize >= m_startMsg.fileSize) {
306         m_plainLogSize = 0;
307         m_fileRotator->FinishInput();
308     }
309     m_compressBuffer->offset = 0;
310     m_mappedPlainLogFile->offset = 0;
311 }
312 
Start()313 void LogPersister::Start()
314 {
315     {
316         std::lock_guard<decltype(m_initMtx)> lock(m_initMtx);
317         if (!m_inited) {
318             std::cerr << " Log persister wasn't inited!\n";
319             return;
320         }
321     }
322 
323     if (IsEmptyThread(m_persisterThread)) {
324         m_persisterThread = std::thread(&LogPersister::ReceiveLogLoop, shared_from_this());
325     } else {
326         std::cout << " Persister thread already started!\n";
327     }
328 }
329 
ReceiveLogLoop()330 int LogPersister::ReceiveLogLoop()
331 {
332     prctl(PR_SET_NAME, "hilogd.pst");
333     std::cout << "Persist ReceiveLogLoop " << std::this_thread::get_id() << "\n";
334     for (;;) {
335         if (m_stopThread) {
336             break;
337         }
338         std::optional<HilogData> data = m_hilogBuffer.Query(m_startMsg.filter, m_bufReader);
339         if (data.has_value()) {
340             if (WriteLogData(data.value())) {
341                 std::cerr << " Can't write new log data!\n";
342             }
343         } else {
344             std::unique_lock<decltype(m_receiveLogCvMtx)> lk(m_receiveLogCvMtx);
345             static const std::chrono::seconds waitTime(MAX_LOG_WRITE_INTERVAL);
346             if (cv_status::timeout == m_receiveLogCv.wait_for(lk, waitTime)) {
347                 std::cout << "no log timeout, write log forcely" << std::endl;
348                 (void)m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
349                 WriteCompressedLogs();
350             }
351         }
352     }
353     // try to compress the remaining log in cache
354     (void)m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
355     WriteCompressedLogs();
356     m_fileRotator->FinishInput();
357     return 0;
358 }
359 
Query(std::list<LogPersistQueryResult> & results)360 int LogPersister::Query(std::list<LogPersistQueryResult> &results)
361 {
362     std::lock_guard<decltype(s_logPersistersMtx)> guard(s_logPersistersMtx);
363     for (auto& logPersister : s_logPersisters) {
364         LogPersistQueryResult response;
365         response.logType = logPersister->m_startMsg.filter.types;
366         logPersister->FillInfo(response);
367         results.push_back(response);
368     }
369     return 0;
370 }
371 
FillInfo(LogPersistQueryResult & response)372 void LogPersister::FillInfo(LogPersistQueryResult &response)
373 {
374     response.jobId = m_startMsg.jobId;
375     if (strcpy_s(response.filePath, FILE_PATH_MAX_LEN, m_startMsg.filePath)) {
376         return;
377     }
378     response.compressAlg = m_startMsg.compressAlg;
379     response.fileSize = m_startMsg.fileSize;
380     response.fileNum = m_startMsg.fileNum;
381 }
382 
Kill(uint32_t id)383 int LogPersister::Kill(uint32_t id)
384 {
385     auto logPersisterPtr = GetLogPersisterById(id);
386     if (logPersisterPtr) {
387         return logPersisterPtr->Deinit();
388     }
389     std::cerr << " Log persister with id: " << id << " does not exist.\n";
390     return ERR_LOG_PERSIST_JOBID_FAIL;
391 }
392 
Stop()393 void LogPersister::Stop()
394 {
395     std::cout << "Exiting LogPersister!\n";
396     if (IsEmptyThread(m_persisterThread)) {
397         std::cout << "Thread was exited or not started!\n";
398         return;
399     }
400 
401     m_stopThread = true;
402     m_receiveLogCv.notify_all();
403 
404     if (m_persisterThread.joinable()) {
405         m_persisterThread.join();
406     }
407 }
408 
CheckRegistered(uint32_t id,const std::string & logPath)409 bool LogPersister::CheckRegistered(uint32_t id, const std::string& logPath)
410 {
411     std::lock_guard<decltype(s_logPersistersMtx)> lock(s_logPersistersMtx);
412     auto it = std::find_if(s_logPersisters.begin(), s_logPersisters.end(),
413         [&](const std::shared_ptr<LogPersister>& logPersister) {
414             if (logPersister->m_startMsg.jobId == id || logPersister->m_startMsg.filePath == logPath) {
415                 return true;
416             }
417             return false;
418         });
419     return it != s_logPersisters.end();
420 }
421 
GetLogPersisterById(uint32_t id)422 std::shared_ptr<LogPersister> LogPersister::GetLogPersisterById(uint32_t id)
423 {
424     std::lock_guard<decltype(s_logPersistersMtx)> guard(s_logPersistersMtx);
425 
426     auto it = std::find_if(s_logPersisters.begin(), s_logPersisters.end(),
427         [&](const std::shared_ptr<LogPersister>& logPersister) {
428             if (logPersister->m_startMsg.jobId == id) {
429                 return true;
430             }
431             return false;
432         });
433     if (it == s_logPersisters.end()) {
434         return std::shared_ptr<LogPersister>();
435     }
436     return *it;
437 }
438 
RegisterLogPersister(const std::shared_ptr<LogPersister> & obj)439 void LogPersister::RegisterLogPersister(const std::shared_ptr<LogPersister>& obj)
440 {
441     std::lock_guard<decltype(s_logPersistersMtx)> lock(s_logPersistersMtx);
442     s_logPersisters.push_back(obj);
443 }
444 
DeregisterLogPersister(const std::shared_ptr<LogPersister> & obj)445 void LogPersister::DeregisterLogPersister(const std::shared_ptr<LogPersister>& obj)
446 {
447     if (!obj) {
448         std::cerr << " Invalid invoke - this should never happened!\n";
449         return;
450     }
451     std::lock_guard<decltype(s_logPersistersMtx)> lock(s_logPersistersMtx);
452     auto it = std::find_if(s_logPersisters.begin(), s_logPersisters.end(),
453         [&](const std::shared_ptr<LogPersister>& logPersister) {
454             if (logPersister->m_startMsg.jobId == obj->m_startMsg.jobId) {
455                 return true;
456             }
457             return false;
458         });
459     if (it == s_logPersisters.end()) {
460         std::cerr << " Inconsistent data - this should never happended!\n";
461         return;
462     }
463     s_logPersisters.erase(it);
464 }
465 } // namespace HiviewDFX
466 } // namespace OHOS
467