1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "log_persister.h"
17
18 #include <sys/mman.h>
19 #include <sys/stat.h>
20 #include <sys/prctl.h>
21
22 #include <fcntl.h>
23 #include <securec.h>
24
25 #include <algorithm>
26 #include <array>
27 #include <chrono>
28 #include <climits>
29 #include <cstdio>
30 #include <cstdlib>
31 #include <cstring>
32 #include <iostream>
33 #include <mutex>
34 #include <sstream>
35 #include <string>
36 #include <thread>
37 #include <unistd.h>
38
39 #include <hilog_common.h>
40 #include <log_buffer.h>
41 #include <log_compress.h>
42 #include <log_print.h>
43 #include <log_utils.h>
44
45 namespace OHOS {
46 namespace HiviewDFX {
47 using namespace std;
48
49 static const int MAX_LOG_WRITE_INTERVAL = 5;
50
IsEmptyThread(const std::thread & th)51 static bool IsEmptyThread(const std::thread& th)
52 {
53 static const std::thread EMPTY_THREAD;
54 return th.get_id() == EMPTY_THREAD.get_id();
55 }
56
57 std::recursive_mutex LogPersister::s_logPersistersMtx;
58 std::list<std::shared_ptr<LogPersister>> LogPersister::s_logPersisters;
59
CreateLogPersister(HilogBuffer & buffer)60 std::shared_ptr<LogPersister> LogPersister::CreateLogPersister(HilogBuffer &buffer)
61 {
62 return std::shared_ptr<LogPersister>(new LogPersister(buffer));
63 }
64
LogPersister(HilogBuffer & buffer)65 LogPersister::LogPersister(HilogBuffer &buffer) : m_hilogBuffer(buffer)
66 {
67 m_mappedPlainLogFile = nullptr;
68 m_bufReader = m_hilogBuffer.CreateBufReader([this]() { NotifyNewLogAvailable(); });
69 m_startMsg = { 0 };
70 }
71
~LogPersister()72 LogPersister::~LogPersister()
73 {
74 m_hilogBuffer.RemoveBufReader(m_bufReader);
75 Deinit();
76 }
77
InitCompression()78 int LogPersister::InitCompression()
79 {
80 m_compressBuffer = std::make_unique<LogPersisterBuffer>();
81 if (!m_compressBuffer) {
82 return RET_FAIL;
83 }
84 switch (m_startMsg.compressAlg) {
85 case COMPRESS_TYPE_NONE:
86 m_compressor = std::make_unique<NoneCompress>();
87 break;
88 case COMPRESS_TYPE_ZLIB:
89 m_compressor = std::make_unique<ZlibCompress>();
90 break;
91 case COMPRESS_TYPE_ZSTD:
92 m_compressor = std::make_unique<ZstdCompress>();
93 break;
94 default:
95 break;
96 }
97 if (!m_compressor) {
98 return RET_FAIL;
99 }
100 return RET_SUCCESS;
101 }
102
InitFileRotator(const PersistRecoveryInfo & info,bool restore)103 int LogPersister::InitFileRotator(const PersistRecoveryInfo& info, bool restore)
104 {
105 std::string fileSuffix = "";
106 switch (m_startMsg.compressAlg) {
107 case CompressAlg::COMPRESS_TYPE_ZSTD:
108 fileSuffix = ".zst";
109 break;
110 case CompressAlg::COMPRESS_TYPE_ZLIB:
111 fileSuffix = ".gz";
112 break;
113 default:
114 break;
115 }
116 m_fileRotator = std::make_unique<LogPersisterRotator>(m_startMsg.filePath,
117 m_startMsg.jobId, m_startMsg.fileNum, fileSuffix);
118 if (!m_fileRotator) {
119 std::cerr << "Not enough memory!\n";
120 return RET_FAIL;
121 }
122 return m_fileRotator->Init(info, restore);
123 }
124
Init(const PersistRecoveryInfo & info,bool restore)125 int LogPersister::Init(const PersistRecoveryInfo& info, bool restore)
126 {
127 std::cout << "Persist init begin\n";
128 std::lock_guard<decltype(m_initMtx)> lock(m_initMtx);
129 if (m_inited) {
130 return 0;
131 }
132 m_startMsg = info.msg;
133
134 std::string path = m_startMsg.filePath;
135 size_t separatorPos = path.find_last_of('/');
136 if (separatorPos == std::string::npos) {
137 return ERR_LOG_PERSIST_FILE_PATH_INVALID;
138 }
139
140 std::string parentPath = path.substr(0, separatorPos);
141 if (access(parentPath.c_str(), F_OK) != 0) {
142 perror("persister directory does not exist.");
143 return ERR_LOG_PERSIST_FILE_PATH_INVALID;
144 }
145
146 // below guard is needed to have sure only one Path and Id is reqistered till end of init!
147 std::lock_guard<decltype(s_logPersistersMtx)> guard(s_logPersistersMtx);
148 if (CheckRegistered(m_startMsg.jobId, path)) {
149 return ERR_LOG_PERSIST_TASK_EXISTED;
150 }
151 if (InitCompression() != RET_SUCCESS) {
152 return ERR_LOG_PERSIST_COMPRESS_INIT_FAIL;
153 }
154 int ret = InitFileRotator(info, restore);
155 if (ret != RET_SUCCESS) {
156 return ret;
157 }
158
159 if (int result = PrepareUncompressedFile(parentPath, restore)) {
160 return result;
161 }
162
163 RegisterLogPersister(shared_from_this());
164 m_inited = true;
165 std::cout << " Persist init done\n";
166 return 0;
167 }
168
Deinit()169 int LogPersister::Deinit()
170 {
171 std::cout << "LogPersist deinit begin\n";
172 std::lock_guard<decltype(m_initMtx)> lock(m_initMtx);
173 if (!m_inited) {
174 return 0;
175 }
176
177 Stop();
178
179 munmap(m_mappedPlainLogFile, sizeof(LogPersisterBuffer));
180 std::cout << "Removing unmapped plain log file: " << m_plainLogFilePath << "\n";
181 if (remove(m_plainLogFilePath.c_str())) {
182 std::cerr << "File: " << m_plainLogFilePath << " can't be removed. ";
183 PrintErrorno(errno);
184 }
185
186 DeregisterLogPersister(shared_from_this());
187 m_inited = false;
188 std::cout << "LogPersist deinit done\n";
189 return 0;
190 }
191
PrepareUncompressedFile(const std::string & parentPath,bool restore)192 int LogPersister::PrepareUncompressedFile(const std::string& parentPath, bool restore)
193 {
194 std::string fileName = std::string(".") + AUXILLARY_PERSISTER_PREFIX + std::to_string(m_startMsg.jobId);
195 m_plainLogFilePath = parentPath + "/" + fileName;
196 FILE* plainTextFile = fopen(m_plainLogFilePath.c_str(), restore ? "r+" : "w+");
197
198 if (!plainTextFile) {
199 std::cerr << " Open uncompressed log file(" << m_plainLogFilePath << ") failed: ";
200 PrintErrorno(errno);
201 return ERR_LOG_PERSIST_FILE_OPEN_FAIL;
202 }
203
204 if (!restore) {
205 ftruncate(fileno(plainTextFile), sizeof(LogPersisterBuffer));
206 fflush(plainTextFile);
207 fsync(fileno(plainTextFile));
208 }
209 m_mappedPlainLogFile = reinterpret_cast<LogPersisterBuffer*>(mmap(nullptr, sizeof(LogPersisterBuffer),
210 PROT_READ | PROT_WRITE, MAP_SHARED, fileno(plainTextFile), 0));
211 if (fclose(plainTextFile)) {
212 std::cerr << "File: " << plainTextFile << " can't be closed. ";
213 PrintErrorno(errno);
214 }
215 if (m_mappedPlainLogFile == MAP_FAILED) {
216 std::cerr << " mmap file failed: ";
217 PrintErrorno(errno);
218 return RET_FAIL;
219 }
220 if (restore) {
221 #ifdef DEBUG
222 std::cout << " Recovered persister, Offset=" << m_mappedPlainLogFile->offset << "\n";
223 #endif
224 // try to store previous uncompressed logs
225 auto compressionResult = m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
226 if (compressionResult != 0) {
227 std::cerr << " Compression error. Result:" << compressionResult << "\n";
228 return RET_FAIL;
229 }
230 WriteCompressedLogs();
231 } else {
232 m_mappedPlainLogFile->offset = 0;
233 }
234 return 0;
235 }
236
NotifyNewLogAvailable()237 void LogPersister::NotifyNewLogAvailable()
238 {
239 m_receiveLogCv.notify_one();
240 }
241
WriteUncompressedLogs(std::string & logLine)242 bool LogPersister::WriteUncompressedLogs(std::string& logLine)
243 {
244 uint16_t size = logLine.length();
245 uint32_t remainingSpace = MAX_PERSISTER_BUFFER_SIZE - m_mappedPlainLogFile->offset;
246 if (remainingSpace < size) {
247 return false;
248 }
249 char* currentContentPos = m_mappedPlainLogFile->content + m_mappedPlainLogFile->offset;
250 int r = memcpy_s(currentContentPos, remainingSpace, logLine.c_str(), logLine.length());
251 if (r != 0) {
252 std::cout << " Can't copy part of memory!\n";
253 return true;
254 }
255 m_mappedPlainLogFile->offset += logLine.length();
256 return true;
257 }
258
WriteLogData(const HilogData & logData)259 int LogPersister::WriteLogData(const HilogData& logData)
260 {
261 LogContent content = {
262 .level = logData.level,
263 .type = logData.type,
264 .pid = logData.pid,
265 .tid = logData.tid,
266 .domain = logData.domain,
267 .tv_sec = logData.tv_sec,
268 .tv_nsec = logData.tv_nsec,
269 .mono_sec = logData.mono_sec,
270 .tag = logData.tag,
271 .log = logData.content
272 };
273 LogFormat format = {
274 .colorful = false,
275 .timeFormat = FormatTime::TIME,
276 .timeAccuFormat = FormatTimeAccu::MSEC,
277 .year = false,
278 .zone = false,
279 };
280 std::ostringstream oss;
281 LogPrintWithFormat(content, format, oss);
282 std::string formatedLogStr = oss.str();
283 // Firstly gather uncompressed logs in auxiliary file
284 if (WriteUncompressedLogs(formatedLogStr))
285 return 0;
286 // Try to compress auxiliary file
287 auto compressionResult = m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
288 if (compressionResult != 0) {
289 std::cerr << " Compression error. Result:" << compressionResult << "\n";
290 return RET_FAIL;
291 }
292 // Write compressed buffor and clear counters
293 WriteCompressedLogs();
294 // Try again write data that wasn't written at the beginning
295 // If again fail then these logs are skipped
296 return WriteUncompressedLogs(formatedLogStr) ? 0 : RET_FAIL;
297 }
298
WriteCompressedLogs()299 inline void LogPersister::WriteCompressedLogs()
300 {
301 if (m_mappedPlainLogFile->offset == 0)
302 return;
303 m_fileRotator->Input(m_compressBuffer->content, m_compressBuffer->offset);
304 m_plainLogSize += m_mappedPlainLogFile->offset;
305 if (m_plainLogSize >= m_startMsg.fileSize) {
306 m_plainLogSize = 0;
307 m_fileRotator->FinishInput();
308 }
309 m_compressBuffer->offset = 0;
310 m_mappedPlainLogFile->offset = 0;
311 }
312
Start()313 void LogPersister::Start()
314 {
315 {
316 std::lock_guard<decltype(m_initMtx)> lock(m_initMtx);
317 if (!m_inited) {
318 std::cerr << " Log persister wasn't inited!\n";
319 return;
320 }
321 }
322
323 if (IsEmptyThread(m_persisterThread)) {
324 m_persisterThread = std::thread(&LogPersister::ReceiveLogLoop, shared_from_this());
325 } else {
326 std::cout << " Persister thread already started!\n";
327 }
328 }
329
ReceiveLogLoop()330 int LogPersister::ReceiveLogLoop()
331 {
332 prctl(PR_SET_NAME, "hilogd.pst");
333 std::cout << "Persist ReceiveLogLoop " << std::this_thread::get_id() << "\n";
334 for (;;) {
335 if (m_stopThread) {
336 break;
337 }
338 std::optional<HilogData> data = m_hilogBuffer.Query(m_startMsg.filter, m_bufReader);
339 if (data.has_value()) {
340 if (WriteLogData(data.value())) {
341 std::cerr << " Can't write new log data!\n";
342 }
343 } else {
344 std::unique_lock<decltype(m_receiveLogCvMtx)> lk(m_receiveLogCvMtx);
345 static const std::chrono::seconds waitTime(MAX_LOG_WRITE_INTERVAL);
346 if (cv_status::timeout == m_receiveLogCv.wait_for(lk, waitTime)) {
347 std::cout << "no log timeout, write log forcely" << std::endl;
348 (void)m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
349 WriteCompressedLogs();
350 }
351 }
352 }
353 // try to compress the remaining log in cache
354 (void)m_compressor->Compress(*m_mappedPlainLogFile, *m_compressBuffer);
355 WriteCompressedLogs();
356 m_fileRotator->FinishInput();
357 return 0;
358 }
359
Query(std::list<LogPersistQueryResult> & results)360 int LogPersister::Query(std::list<LogPersistQueryResult> &results)
361 {
362 std::lock_guard<decltype(s_logPersistersMtx)> guard(s_logPersistersMtx);
363 for (auto& logPersister : s_logPersisters) {
364 LogPersistQueryResult response;
365 response.logType = logPersister->m_startMsg.filter.types;
366 logPersister->FillInfo(response);
367 results.push_back(response);
368 }
369 return 0;
370 }
371
FillInfo(LogPersistQueryResult & response)372 void LogPersister::FillInfo(LogPersistQueryResult &response)
373 {
374 response.jobId = m_startMsg.jobId;
375 if (strcpy_s(response.filePath, FILE_PATH_MAX_LEN, m_startMsg.filePath)) {
376 return;
377 }
378 response.compressAlg = m_startMsg.compressAlg;
379 response.fileSize = m_startMsg.fileSize;
380 response.fileNum = m_startMsg.fileNum;
381 }
382
Kill(uint32_t id)383 int LogPersister::Kill(uint32_t id)
384 {
385 auto logPersisterPtr = GetLogPersisterById(id);
386 if (logPersisterPtr) {
387 return logPersisterPtr->Deinit();
388 }
389 std::cerr << " Log persister with id: " << id << " does not exist.\n";
390 return ERR_LOG_PERSIST_JOBID_FAIL;
391 }
392
Stop()393 void LogPersister::Stop()
394 {
395 std::cout << "Exiting LogPersister!\n";
396 if (IsEmptyThread(m_persisterThread)) {
397 std::cout << "Thread was exited or not started!\n";
398 return;
399 }
400
401 m_stopThread = true;
402 m_receiveLogCv.notify_all();
403
404 if (m_persisterThread.joinable()) {
405 m_persisterThread.join();
406 }
407 }
408
CheckRegistered(uint32_t id,const std::string & logPath)409 bool LogPersister::CheckRegistered(uint32_t id, const std::string& logPath)
410 {
411 std::lock_guard<decltype(s_logPersistersMtx)> lock(s_logPersistersMtx);
412 auto it = std::find_if(s_logPersisters.begin(), s_logPersisters.end(),
413 [&](const std::shared_ptr<LogPersister>& logPersister) {
414 if (logPersister->m_startMsg.jobId == id || logPersister->m_startMsg.filePath == logPath) {
415 return true;
416 }
417 return false;
418 });
419 return it != s_logPersisters.end();
420 }
421
GetLogPersisterById(uint32_t id)422 std::shared_ptr<LogPersister> LogPersister::GetLogPersisterById(uint32_t id)
423 {
424 std::lock_guard<decltype(s_logPersistersMtx)> guard(s_logPersistersMtx);
425
426 auto it = std::find_if(s_logPersisters.begin(), s_logPersisters.end(),
427 [&](const std::shared_ptr<LogPersister>& logPersister) {
428 if (logPersister->m_startMsg.jobId == id) {
429 return true;
430 }
431 return false;
432 });
433 if (it == s_logPersisters.end()) {
434 return std::shared_ptr<LogPersister>();
435 }
436 return *it;
437 }
438
RegisterLogPersister(const std::shared_ptr<LogPersister> & obj)439 void LogPersister::RegisterLogPersister(const std::shared_ptr<LogPersister>& obj)
440 {
441 std::lock_guard<decltype(s_logPersistersMtx)> lock(s_logPersistersMtx);
442 s_logPersisters.push_back(obj);
443 }
444
DeregisterLogPersister(const std::shared_ptr<LogPersister> & obj)445 void LogPersister::DeregisterLogPersister(const std::shared_ptr<LogPersister>& obj)
446 {
447 if (!obj) {
448 std::cerr << " Invalid invoke - this should never happened!\n";
449 return;
450 }
451 std::lock_guard<decltype(s_logPersistersMtx)> lock(s_logPersistersMtx);
452 auto it = std::find_if(s_logPersisters.begin(), s_logPersisters.end(),
453 [&](const std::shared_ptr<LogPersister>& logPersister) {
454 if (logPersister->m_startMsg.jobId == obj->m_startMsg.jobId) {
455 return true;
456 }
457 return false;
458 });
459 if (it == s_logPersisters.end()) {
460 std::cerr << " Inconsistent data - this should never happended!\n";
461 return;
462 }
463 s_logPersisters.erase(it);
464 }
465 } // namespace HiviewDFX
466 } // namespace OHOS
467