1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "result_demuxer.h"
16
17 #include <unistd.h>
18
19 #include "logging.h"
20 #include "trace_file_header.h"
21
22 namespace {
23 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
24 } // namespace
25
ResultDemuxer(const ProfilerDataRepeaterPtr & dataRepeater,PluginSessionManagerPtr pluginSessionManager)26 ResultDemuxer::ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater, PluginSessionManagerPtr pluginSessionManager)
27 : dataRepeater_(dataRepeater), flushInterval_(DEFAULT_FLUSH_INTERVAL)
28 {
29 pluginSessionManager_ = pluginSessionManager;
30 }
31
~ResultDemuxer()32 ResultDemuxer::~ResultDemuxer()
33 {
34 isStopTakeData_ = true;
35 if (dataRepeater_) {
36 dataRepeater_->Close();
37 }
38 if (demuxerThread_.joinable()) {
39 demuxerThread_.join();
40 }
41 }
42
SetTraceWriter(const TraceFileWriterPtr & traceWriter)43 void ResultDemuxer::SetTraceWriter(const TraceFileWriterPtr& traceWriter)
44 {
45 traceWriter_ = traceWriter;
46 }
47
SetFlushInterval(std::chrono::milliseconds interval)48 void ResultDemuxer::SetFlushInterval(std::chrono::milliseconds interval)
49 {
50 flushInterval_ = interval;
51 }
52
StartTakeResults()53 bool ResultDemuxer::StartTakeResults()
54 {
55 CHECK_NOTNULL(dataRepeater_, false, "data repeater null");
56
57 std::thread demuxer(&ResultDemuxer::TakeResults, this);
58 CHECK_TRUE(demuxer.get_id() != std::thread::id(), false, "thread invalid");
59
60 demuxerThread_ = std::move(demuxer);
61 isStopTakeData_ = false;
62 return true;
63 }
64
StopTakeResults()65 bool ResultDemuxer::StopTakeResults()
66 {
67 CHECK_NOTNULL(dataRepeater_, false, "data repeater null");
68 CHECK_TRUE(demuxerThread_.get_id() != std::thread::id(), false, "thread invalid");
69
70 isStopTakeData_ = true;
71 dataRepeater_->PutPluginData(nullptr);
72 if (demuxerThread_.joinable()) {
73 demuxerThread_.join();
74 }
75 return true;
76 }
77
TakeResults()78 void ResultDemuxer::TakeResults()
79 {
80 if (!dataRepeater_ || !pluginSessionManager_) {
81 return;
82 }
83
84 HILOG_INFO(LOG_CORE, "TakeResults thread %d, start!", gettid());
85 lastFlushTime_ = std::chrono::steady_clock::now();
86 while (1) {
87 auto pluginData = dataRepeater_->TakePluginData();
88 if (!pluginData || isStopTakeData_) {
89 break;
90 }
91
92 if (traceWriter_) {
93 int ret = traceWriter_->Write(*pluginData);
94 if (ret == -1) {
95 HILOG_DEBUG(LOG_CORE, "need to clear queue and report the basic data");
96 dataRepeater_->ClearQueue();
97 pluginSessionManager_->RefreshPluginSession();
98 }
99 auto currentTime = std::chrono::steady_clock::now();
100 auto elapsedTime = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - lastFlushTime_);
101 if (elapsedTime >= flushInterval_) {
102 traceWriter_->Flush();
103 lastFlushTime_ = currentTime;
104 }
105 } else {
106 HILOG_WARN(LOG_CORE, "no writer, drop data!");
107 }
108 }
109 traceWriter_->Finish();
110 HILOG_INFO(LOG_CORE, "TakeResults thread %d, exit!", gettid());
111 }
112