1 /*
2 * Copyright (c) Huawei Technologies Co., Ltd. 2021. All rights reserved.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 *
15 * Description: ResultTransporter class implements
16 */
17 #include "result_transporter.h"
18
19 #include <chrono>
20 #include <cinttypes>
21 #include <pthread.h>
22 #include <unistd.h>
23 #include "logging.h"
24
25 namespace {
26 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
27 constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024;
28 } // namespace
29
30 FTRACE_NS_BEGIN
ResultTransporter(const std::string & name,WriterStructPtr writer)31 ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer)
32 : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer)
33 {
34 }
35
~ResultTransporter(void)36 ResultTransporter::~ResultTransporter(void)
37 {
38 PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter destroy!");
39 }
40
SetFlushInterval(int ms)41 void ResultTransporter::SetFlushInterval(int ms)
42 {
43 PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms);
44 flushInterval_ = std::chrono::milliseconds(ms);
45 }
46
SetFlushThreshold(uint32_t nbytes)47 void ResultTransporter::SetFlushThreshold(uint32_t nbytes)
48 {
49 PROFILER_LOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes);
50 flushThreshold_ = nbytes;
51 }
52
IsFlushTime() const53 bool ResultTransporter::IsFlushTime() const
54 {
55 static auto lastTime = std::chrono::high_resolution_clock::now();
56 auto currentTime = std::chrono::high_resolution_clock::now();
57 auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime);
58 if (elapsedTime < flushInterval_) {
59 return false;
60 }
61 lastTime = currentTime;
62 return true;
63 }
64
Write(ResultPtr && packet)65 long ResultTransporter::Write(ResultPtr&& packet)
66 {
67 if (writer_ == nullptr || writer_->write == nullptr) {
68 return 0;
69 }
70
71 size_t size = packet->ByteSizeLong();
72 buffer_.resize(size);
73 CHECK_TRUE(buffer_.size() == size, -1,
74 "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)",
75 __func__, size, buffer_.size(), errno, strerror(errno));
76
77 int ret = packet->SerializeToArray(buffer_.data(), buffer_.size());
78 CHECK_TRUE(ret > 0, ret, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
79
80 writer_->write(writer_, buffer_.data(), buffer_.size());
81 return buffer_.size();
82 }
83
Flush()84 void ResultTransporter::Flush()
85 {
86 if (writer_ == nullptr || writer_->flush == nullptr) {
87 return;
88 }
89 writer_->flush(writer_);
90
91 auto count = bytesCount_.load();
92 auto pending = bytesPending_.load();
93 bytesPending_ = 0;
94 PROFILER_LOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending);
95 }
96
Submit(ResultPtr && packet)97 bool ResultTransporter::Submit(ResultPtr&& packet)
98 {
99 std::unique_lock<std::mutex> lock(mutex_);
100 long nbytes = Write(std::move(packet));
101 if (nbytes < 0) {
102 PROFILER_LOG_ERROR(LOG_CORE, "send result FAILED!");
103 lock.unlock();
104 return false;
105 }
106 bytesCount_ += nbytes;
107 bytesPending_ += nbytes;
108
109 if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
110 Flush();
111 }
112 lock.unlock();
113 return true;
114 }
115
Report(size_t msgSize)116 void ResultTransporter::Report(size_t msgSize)
117 {
118 bytesCount_ += msgSize;
119 bytesPending_ += msgSize;
120
121 if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
122 Flush();
123 }
124 }
125 FTRACE_NS_END
126