• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "result_demuxer.h"
16 
17 #include <unistd.h>
18 #include "logging.h"
19 
20 #define CHECK_POINTER_NOTNULL(ptr)                                       \
21     if (ptr == nullptr) {                                                \
22         HILOG_WARN(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
23         return false;                                                    \
24     }
25 
26 #define CHECK_THREAD_ID_VALID(t)                                          \
27     if (t.get_id() == std::thread::id()) {                                \
28         HILOG_WARN(LOG_CORE, "%s: FAILED, %s id invalid!", __func__, #t); \
29         return false;                                                     \
30     }
31 
32 namespace {
33 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
34 } // namespace
35 
ResultDemuxer(const ProfilerDataRepeaterPtr & dataRepeater)36 ResultDemuxer::ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater)
37     : dataRepeater_(dataRepeater), flushInterval_(DEFAULT_FLUSH_INTERVAL)
38 {
39 }
40 
~ResultDemuxer()41 ResultDemuxer::~ResultDemuxer()
42 {
43     if (dataRepeater_) {
44         dataRepeater_->Close();
45     }
46     if (demuxerThread_.joinable()) {
47         demuxerThread_.join();
48     }
49 }
50 
SetTraceWriter(const TraceFileWriterPtr & traceWriter)51 void ResultDemuxer::SetTraceWriter(const TraceFileWriterPtr& traceWriter)
52 {
53     traceWriter_ = traceWriter;
54 }
55 
SetFlushInterval(std::chrono::milliseconds interval)56 void ResultDemuxer::SetFlushInterval(std::chrono::milliseconds interval)
57 {
58     flushInterval_ = interval;
59 }
60 
StartTakeResults()61 bool ResultDemuxer::StartTakeResults()
62 {
63     CHECK_POINTER_NOTNULL(dataRepeater_);
64 
65     std::thread demuxer(&ResultDemuxer::TakeResults, this);
66     CHECK_THREAD_ID_VALID(demuxer);
67 
68     demuxerThread_ = std::move(demuxer);
69     return true;
70 }
71 
StopTakeResults()72 bool ResultDemuxer::StopTakeResults()
73 {
74     CHECK_POINTER_NOTNULL(dataRepeater_);
75     CHECK_THREAD_ID_VALID(demuxerThread_);
76 
77     if (traceWriter_) {
78         traceWriter_->Flush();
79     }
80 
81     dataRepeater_->PutPluginData(nullptr);
82     if (demuxerThread_.joinable()) {
83         demuxerThread_.join();
84     }
85     return true;
86 }
87 
TakeResults()88 void ResultDemuxer::TakeResults()
89 {
90     if (!dataRepeater_) {
91         return;
92     }
93 
94     HILOG_INFO(LOG_CORE, "TakeResults thread %d, start!", gettid());
95     lastFlushTime_ = std::chrono::steady_clock::now();
96     while (1) {
97         auto pluginData = dataRepeater_->TakePluginData();
98         if (!pluginData) {
99             break;
100         }
101 
102         if (traceWriter_) {
103             traceWriter_->Write(*pluginData);
104             auto currentTime = std::chrono::steady_clock::now();
105             auto elapsedTime = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - lastFlushTime_);
106             if (elapsedTime >= flushInterval_) {
107                 traceWriter_->Flush();
108                 lastFlushTime_ = currentTime;
109             }
110         } else {
111             HILOG_WARN(LOG_CORE, "no writer, drop data!");
112         }
113     }
114     traceWriter_->Flush();
115     traceWriter_->Finish();
116     HILOG_INFO(LOG_CORE, "TakeResults thread %d, exit!", gettid());
117 }
118