• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15  * Description: ResultTransporter class implements
16  */
17 #include "result_transporter.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 #include <pthread.h>
22 #include <unistd.h>
23 #include "logging.h"
24 
25 namespace {
26 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
27 constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024;
28 } // namespace
29 
30 FTRACE_NS_BEGIN
ResultTransporter(const std::string & name,WriterStructPtr writer)31 ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer)
32     : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer)
33 {
34 }
35 
~ResultTransporter(void)36 ResultTransporter::~ResultTransporter(void)
37 {
38     HILOG_INFO(LOG_CORE, "ResultTransporter destroy!");
39 }
40 
SetFlushInterval(int ms)41 void ResultTransporter::SetFlushInterval(int ms)
42 {
43     HILOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms);
44     flushInterval_ = std::chrono::milliseconds(ms);
45 }
46 
SetFlushThreshold(uint32_t nbytes)47 void ResultTransporter::SetFlushThreshold(uint32_t nbytes)
48 {
49     HILOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes);
50     flushThreshold_ = nbytes;
51 }
52 
IsFlushTime() const53 bool ResultTransporter::IsFlushTime() const
54 {
55     static auto lastTime = std::chrono::high_resolution_clock::now();
56     auto currentTime = std::chrono::high_resolution_clock::now();
57     auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime);
58     if (elapsedTime < flushInterval_) {
59         return false;
60     }
61     lastTime = currentTime;
62     return true;
63 }
64 
Write(ResultPtr && packet)65 long ResultTransporter::Write(ResultPtr&& packet)
66 {
67     if (writer_ == nullptr || writer_->write == nullptr) {
68         return 0;
69     }
70 
71     buffer_.resize(packet->ByteSizeLong());
72     packet->SerializeToArray(buffer_.data(), buffer_.size());
73     writer_->write(writer_, buffer_.data(), buffer_.size());
74     return buffer_.size();
75 }
76 
Flush()77 void ResultTransporter::Flush()
78 {
79     if (writer_ == nullptr || writer_->flush == nullptr) {
80         return;
81     }
82     writer_->flush(writer_);
83 }
84 
Report()85 void ResultTransporter::Report()
86 {
87     auto count = bytesCount_.load();
88     auto pending = bytesPending_.load();
89     HILOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending);
90 }
91 
Submit(ResultPtr && packet)92 bool ResultTransporter::Submit(ResultPtr&& packet)
93 {
94     long nbytes = Write(std::move(packet));
95     CHECK_TRUE(nbytes >= 0, false, "send result FAILED!");
96     bytesCount_ += nbytes;
97     bytesPending_ += nbytes;
98 
99     if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
100         Flush();
101         Report();
102         bytesPending_ = 0;
103     }
104     return true;
105 }
106 FTRACE_NS_END
107