1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 *
15 * Description: ResultTransporter class implements
16 */
17 #include "result_transporter.h"
18
19 #include <chrono>
20 #include <cinttypes>
21 #include <pthread.h>
22 #include <unistd.h>
23 #include "logging.h"
24
25 namespace {
26 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
27 constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024;
28 } // namespace
29
30 FTRACE_NS_BEGIN
ResultTransporter(const std::string & name,WriterStructPtr writer)31 ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer)
32 : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer)
33 {
34 }
35
~ResultTransporter(void)36 ResultTransporter::~ResultTransporter(void)
37 {
38 HILOG_INFO(LOG_CORE, "ResultTransporter destroy!");
39 }
40
SetFlushInterval(int ms)41 void ResultTransporter::SetFlushInterval(int ms)
42 {
43 HILOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms);
44 flushInterval_ = std::chrono::milliseconds(ms);
45 }
46
SetFlushThreshold(uint32_t nbytes)47 void ResultTransporter::SetFlushThreshold(uint32_t nbytes)
48 {
49 HILOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes);
50 flushThreshold_ = nbytes;
51 }
52
IsFlushTime() const53 bool ResultTransporter::IsFlushTime() const
54 {
55 static auto lastTime = std::chrono::high_resolution_clock::now();
56 auto currentTime = std::chrono::high_resolution_clock::now();
57 auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime);
58 if (elapsedTime < flushInterval_) {
59 return false;
60 }
61 lastTime = currentTime;
62 return true;
63 }
64
Write(ResultPtr && packet)65 long ResultTransporter::Write(ResultPtr&& packet)
66 {
67 if (writer_ == nullptr || writer_->write == nullptr) {
68 return 0;
69 }
70
71 size_t size = packet->ByteSizeLong();
72 buffer_.resize(size);
73 if (buffer_.size() != size) {
74 HILOG_ERROR(LOG_CORE, "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)",
75 __func__, size, buffer_.size(), errno, strerror(errno));
76 return -1;
77 }
78
79 int ret = packet->SerializeToArray(buffer_.data(), buffer_.size());
80 if (ret <= 0) {
81 HILOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
82 return ret;
83 }
84
85 writer_->write(writer_, buffer_.data(), buffer_.size());
86 return buffer_.size();
87 }
88
Flush()89 void ResultTransporter::Flush()
90 {
91 if (writer_ == nullptr || writer_->flush == nullptr) {
92 return;
93 }
94 writer_->flush(writer_);
95
96 auto count = bytesCount_.load();
97 auto pending = bytesPending_.load();
98 bytesPending_ = 0;
99 HILOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending);
100 }
101
Submit(ResultPtr && packet)102 bool ResultTransporter::Submit(ResultPtr&& packet)
103 {
104 std::unique_lock<std::mutex> lock(mutex_);
105 long nbytes = Write(std::move(packet));
106 if (nbytes < 0) {
107 HILOG_ERROR(LOG_CORE, "send result FAILED!");
108 lock.unlock();
109 return false;
110 }
111 bytesCount_ += nbytes;
112 bytesPending_ += nbytes;
113
114 if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
115 Flush();
116 }
117 lock.unlock();
118 return true;
119 }
120 FTRACE_NS_END
121