• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  *
15  * Description: ResultTransporter class implements
16  */
17 #include "result_transporter.h"
18 
19 #include <chrono>
20 #include <cinttypes>
21 #include <pthread.h>
22 #include <unistd.h>
23 #include "logging.h"
24 
25 namespace {
26 constexpr auto DEFAULT_FLUSH_INTERVAL = std::chrono::milliseconds(1000);
27 constexpr uint32_t DEFAULT_FLUSH_THRESHOLD = 1024 * 1024;
28 } // namespace
29 
30 FTRACE_NS_BEGIN
ResultTransporter(const std::string & name,WriterStructPtr writer)31 ResultTransporter::ResultTransporter(const std::string& name, WriterStructPtr writer)
32     : name_(name), flushThreshold_(DEFAULT_FLUSH_THRESHOLD), flushInterval_(DEFAULT_FLUSH_INTERVAL), writer_(writer)
33 {
34 }
35 
~ResultTransporter(void)36 ResultTransporter::~ResultTransporter(void)
37 {
38     HILOG_INFO(LOG_CORE, "ResultTransporter destroy!");
39 }
40 
SetFlushInterval(int ms)41 void ResultTransporter::SetFlushInterval(int ms)
42 {
43     HILOG_INFO(LOG_CORE, "ResultTransporter set flush interval to %d", ms);
44     flushInterval_ = std::chrono::milliseconds(ms);
45 }
46 
SetFlushThreshold(uint32_t nbytes)47 void ResultTransporter::SetFlushThreshold(uint32_t nbytes)
48 {
49     HILOG_INFO(LOG_CORE, "ResultTransporter set flush threshold to %u", nbytes);
50     flushThreshold_ = nbytes;
51 }
52 
IsFlushTime() const53 bool ResultTransporter::IsFlushTime() const
54 {
55     static auto lastTime = std::chrono::high_resolution_clock::now();
56     auto currentTime = std::chrono::high_resolution_clock::now();
57     auto elapsedTime = std::chrono::duration_cast<decltype(flushInterval_)>(currentTime - lastTime);
58     if (elapsedTime < flushInterval_) {
59         return false;
60     }
61     lastTime = currentTime;
62     return true;
63 }
64 
Write(ResultPtr && packet)65 long ResultTransporter::Write(ResultPtr&& packet)
66 {
67     if (writer_ == nullptr || writer_->write == nullptr) {
68         return 0;
69     }
70 
71     size_t size = packet->ByteSizeLong();
72     buffer_.resize(size);
73     if (buffer_.size() != size) {
74         HILOG_ERROR(LOG_CORE, "%s: buffer resize failed, size: %zu, buffer size: %zu, errno: %d(%s)",
75             __func__, size, buffer_.size(), errno, strerror(errno));
76         return -1;
77     }
78 
79     int ret = packet->SerializeToArray(buffer_.data(), buffer_.size());
80     if (ret <= 0) {
81         HILOG_ERROR(LOG_CORE, "%s: SerializeToArray failed with %d, size: %zu", __func__, ret, size);
82         return ret;
83     }
84 
85     writer_->write(writer_, buffer_.data(), buffer_.size());
86     return buffer_.size();
87 }
88 
Flush()89 void ResultTransporter::Flush()
90 {
91     if (writer_ == nullptr || writer_->flush == nullptr) {
92         return;
93     }
94     writer_->flush(writer_);
95 
96     auto count = bytesCount_.load();
97     auto pending = bytesPending_.load();
98     bytesPending_ = 0;
99     HILOG_DEBUG(LOG_CORE, "ResultTransporter TX stats B: %" PRIu64 ", P: %u", count, pending);
100 }
101 
Submit(ResultPtr && packet)102 bool ResultTransporter::Submit(ResultPtr&& packet)
103 {
104     std::unique_lock<std::mutex> lock(mutex_);
105     long nbytes = Write(std::move(packet));
106     if (nbytes < 0) {
107         HILOG_ERROR(LOG_CORE, "send result FAILED!");
108         lock.unlock();
109         return false;
110     }
111     bytesCount_ += nbytes;
112     bytesPending_ += nbytes;
113 
114     if (IsFlushTime() || bytesPending_ >= flushThreshold_) {
115         Flush();
116     }
117     lock.unlock();
118     return true;
119 }
120 FTRACE_NS_END
121