1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "result_demuxer.h"
16
17 #include <unistd.h>
18 #include "logging.h"
19
20 #define CHECK_POINTER_NOTNULL(ptr) \
21 if (ptr == nullptr) { \
22 HILOG_WARN(LOG_CORE, "%s: FAILED, %s is null!", __func__, #ptr); \
23 return false; \
24 }
25
26 #define CHECK_THREAD_ID_VALID(t) \
27 if (t.get_id() == std::thread::id()) { \
28 HILOG_WARN(LOG_CORE, "%s: FAILED, %s id invalid!", __func__, #t); \
29 return false; \
30 }
31
32 namespace {
33 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
34 } // namespace
35
ResultDemuxer(const ProfilerDataRepeaterPtr & dataRepeater)36 ResultDemuxer::ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater)
37 : dataRepeater_(dataRepeater), flushInterval_(DEFAULT_FLUSH_INTERVAL)
38 {
39 }
40
~ResultDemuxer()41 ResultDemuxer::~ResultDemuxer()
42 {
43 if (dataRepeater_) {
44 dataRepeater_->Close();
45 }
46 if (demuxerThread_.joinable()) {
47 demuxerThread_.join();
48 }
49 }
50
SetTraceWriter(const TraceFileWriterPtr & traceWriter)51 void ResultDemuxer::SetTraceWriter(const TraceFileWriterPtr& traceWriter)
52 {
53 traceWriter_ = traceWriter;
54 }
55
SetFlushInterval(std::chrono::milliseconds interval)56 void ResultDemuxer::SetFlushInterval(std::chrono::milliseconds interval)
57 {
58 flushInterval_ = interval;
59 }
60
StartTakeResults()61 bool ResultDemuxer::StartTakeResults()
62 {
63 CHECK_POINTER_NOTNULL(dataRepeater_);
64
65 std::thread demuxer(&ResultDemuxer::TakeResults, this);
66 CHECK_THREAD_ID_VALID(demuxer);
67
68 demuxerThread_ = std::move(demuxer);
69 return true;
70 }
71
StopTakeResults()72 bool ResultDemuxer::StopTakeResults()
73 {
74 CHECK_POINTER_NOTNULL(dataRepeater_);
75 CHECK_THREAD_ID_VALID(demuxerThread_);
76
77 if (traceWriter_) {
78 traceWriter_->Flush();
79 }
80
81 dataRepeater_->PutPluginData(nullptr);
82 if (demuxerThread_.joinable()) {
83 demuxerThread_.join();
84 }
85 return true;
86 }
87
TakeResults()88 void ResultDemuxer::TakeResults()
89 {
90 if (!dataRepeater_) {
91 return;
92 }
93
94 HILOG_INFO(LOG_CORE, "TakeResults thread %d, start!", gettid());
95 lastFlushTime_ = std::chrono::steady_clock::now();
96 while (1) {
97 auto pluginData = dataRepeater_->TakePluginData();
98 if (!pluginData) {
99 break;
100 }
101
102 if (traceWriter_) {
103 traceWriter_->Write(*pluginData);
104 auto currentTime = std::chrono::steady_clock::now();
105 auto elapsedTime = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - lastFlushTime_);
106 if (elapsedTime >= flushInterval_) {
107 traceWriter_->Flush();
108 lastFlushTime_ = currentTime;
109 }
110 } else {
111 HILOG_WARN(LOG_CORE, "no writer, drop data!");
112 }
113 }
114 traceWriter_->Flush();
115 traceWriter_->Finish();
116 HILOG_INFO(LOG_CORE, "TakeResults thread %d, exit!", gettid());
117 }
118