• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15  * Description: ResultTransporter class implements
16  */
17 #include "result_transporter.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 #include <pthread.h>
22 #include <unistd.h>
23 #include "logging.h"
24 
25 namespace {
26 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
27 constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024;
28 } // namespace
29 
30 FTRACE_NS_BEGIN
ResultTransporter(const std::string & name,WriterStructPtr writer)31 ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer)
32     : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer)
33 {
34 }
35 
~ResultTransporter(void)36 ResultTransporter::~ResultTransporter(void)
37 {
38     PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter destroy!");
39 }
40 
SetFlushInterval(int ms)41 void ResultTransporter::SetFlushInterval(int ms)
42 {
43     PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms);
44     flushInterval_ = std::chrono::milliseconds(ms);
45 }
46 
SetFlushThreshold(uint32_t nbytes)47 void ResultTransporter::SetFlushThreshold(uint32_t nbytes)
48 {
49     PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes);
50     flushThreshold_ = nbytes;
51 }
52 
IsFlushTime() const53 bool ResultTransporter::IsFlushTime() const
54 {
55     static auto lastTime = std::chrono::high_resolution_clock::now();
56     auto currentTime = std::chrono::high_resolution_clock::now();
57     auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime);
58     if (elapsedTime < flushInterval_) {
59         return false;
60     }
61     lastTime = currentTime;
62     return true;
63 }
64 
Write(ResultPtr && packet)65 long ResultTransporter::Write(ResultPtr&& packet)
66 {
67     if (writer_ == nullptr || writer_->write == nullptr) {
68         return 0;
69     }
70 
71     size_t size = packet->ByteSizeLong();
72     buffer_.resize(size);
73     CHECK_TRUE(buffer_.size() == size, -1,
74                "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)",
75                __func__, size, buffer_.size(), errno, strerror(errno));
76 
77     int ret = packet->SerializeToArray(buffer_.data(), buffer_.size());
78     CHECK_TRUE(ret > 0, ret, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
79 
80     writer_->write(writer_, buffer_.data(), buffer_.size());
81     return buffer_.size();
82 }
83 
Flush()84 void ResultTransporter::Flush()
85 {
86     if (writer_ == nullptr || writer_->flush == nullptr) {
87         return;
88     }
89     writer_->flush(writer_);
90 
91     auto count = bytesCount_.load();
92     auto pending = bytesPending_.load();
93     bytesPending_ = 0;
94     PROFILER_LOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending);
95 }
96 
Submit(ResultPtr && packet)97 bool ResultTransporter::Submit(ResultPtr&& packet)
98 {
99     std::unique_lock<std::mutex> lock(mutex_);
100     long nbytes = Write(std::move(packet));
101     if (nbytes < 0) {
102         PROFILER_LOG_ERROR(LOG_CORE, "send result FAILED!");
103         lock.unlock();
104         return false;
105     }
106     bytesCount_ += nbytes;
107     bytesPending_ += nbytes;
108 
109     if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
110         Flush();
111     }
112     lock.unlock();
113     return true;
114 }
115 
Report(size_t msgSize)116 void ResultTransporter::Report(size_t msgSize)
117 {
118     bytesCount_ += msgSize;
119     bytesPending_ += msgSize;
120 
121     if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
122         Flush();
123     }
124 }
125 FTRACE_NS_END
126