• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "result_demuxer.h"
16 
17 #include <unistd.h>
18 
19 #include "logging.h"
20 #include "trace_file_header.h"
21 
22 namespace {
23 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
24 } // namespace
25 
ResultDemuxer(const ProfilerDataRepeaterPtr & dataRepeater,PluginSessionManagerPtr pluginSessionManager)26 ResultDemuxer::ResultDemuxer(const ProfilerDataRepeaterPtr& dataRepeater, PluginSessionManagerPtr pluginSessionManager)
27     : dataRepeater_(dataRepeater), flushInterval_(DEFAULT_FLUSH_INTERVAL)
28 {
29     pluginSessionManager_ = pluginSessionManager;
30 }
31 
~ResultDemuxer()32 ResultDemuxer::~ResultDemuxer()
33 {
34     isStopTakeData_ = true;
35     if (dataRepeater_) {
36         dataRepeater_->Close();
37     }
38     if (demuxerThread_.joinable()) {
39         demuxerThread_.join();
40     }
41 }
42 
SetTraceWriter(const TraceFileWriterPtr & traceWriter)43 void ResultDemuxer::SetTraceWriter(const TraceFileWriterPtr& traceWriter)
44 {
45     traceWriter_ = traceWriter;
46 }
47 
SetFlushInterval(std::chrono::milliseconds interval)48 void ResultDemuxer::SetFlushInterval(std::chrono::milliseconds interval)
49 {
50     flushInterval_ = interval;
51 }
52 
StartTakeResults()53 bool ResultDemuxer::StartTakeResults()
54 {
55     CHECK_NOTNULL(dataRepeater_, false, "data repeater null");
56 
57     std::thread demuxer(&ResultDemuxer::TakeResults, this);
58     CHECK_TRUE(demuxer.get_id() != std::thread::id(), false, "thread invalid");
59 
60     demuxerThread_ = std::move(demuxer);
61     isStopTakeData_ = false;
62     return true;
63 }
64 
StopTakeResults()65 bool ResultDemuxer::StopTakeResults()
66 {
67     CHECK_NOTNULL(dataRepeater_, false, "data repeater null");
68     CHECK_TRUE(demuxerThread_.get_id() != std::thread::id(), false, "thread invalid");
69 
70     isStopTakeData_ = true;
71     dataRepeater_->PutPluginData(nullptr);
72     if (demuxerThread_.joinable()) {
73         demuxerThread_.join();
74     }
75     return true;
76 }
77 
TakeResults()78 void ResultDemuxer::TakeResults()
79 {
80     if (!dataRepeater_ || !pluginSessionManager_) {
81         return;
82     }
83 
84     HILOG_INFO(LOG_CORE, "TakeResults thread %d, start!", gettid());
85     lastFlushTime_ = std::chrono::steady_clock::now();
86     while (1) {
87         auto pluginData = dataRepeater_->TakePluginData();
88         if (!pluginData || isStopTakeData_) {
89             break;
90         }
91 
92         if (traceWriter_) {
93             int ret = traceWriter_->Write(*pluginData);
94             if (ret == -1) {
95                 HILOG_DEBUG(LOG_CORE, "need to clear queue and report the basic data");
96                 dataRepeater_->ClearQueue();
97                 pluginSessionManager_->RefreshPluginSession();
98             }
99             auto currentTime = std::chrono::steady_clock::now();
100             auto elapsedTime = std::chrono::duration_cast<std::chrono::milliseconds>(currentTime - lastFlushTime_);
101             if (elapsedTime >= flushInterval_) {
102                 traceWriter_->Flush();
103                 lastFlushTime_ = currentTime;
104             }
105         } else {
106             HILOG_WARN(LOG_CORE, "no writer, drop data!");
107         }
108     }
109     traceWriter_->Finish();
110     HILOG_INFO(LOG_CORE, "TakeResults thread %d, exit!", gettid());
111 }
112