1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/rpc/proto_ring_buffer.h"
18
19 #include "perfetto/base/logging.h"
20 #include "perfetto/ext/base/paged_memory.h"
21 #include "perfetto/protozero/proto_utils.h"
22
23 namespace perfetto {
24 namespace trace_processor {
25
26 namespace {
27 constexpr size_t kGrowBytes = 128 * 1024;
28
FramingError()29 inline ProtoRingBuffer::Message FramingError() {
30 ProtoRingBuffer::Message msg{};
31 msg.fatal_framing_error = true;
32 return msg;
33 }
34
35 // Tries to decode a length-delimited proto field from |start|.
36 // Returns a valid boundary if the preamble is valid and the length is within
37 // |end|, or an invalid message otherwise.
TryReadMessage(const uint8_t * start,const uint8_t * end)38 ProtoRingBuffer::Message TryReadMessage(const uint8_t* start,
39 const uint8_t* end) {
40 namespace proto_utils = protozero::proto_utils;
41 uint64_t field_tag = 0;
42 auto* start_of_len = proto_utils::ParseVarInt(start, end, &field_tag);
43 if (start_of_len == start)
44 return ProtoRingBuffer::Message{}; // Not enough data.
45
46 const uint32_t tag = field_tag & 0x07;
47 if (tag !=
48 static_cast<uint32_t>(proto_utils::ProtoWireType::kLengthDelimited)) {
49 PERFETTO_ELOG("RPC framing error, unexpected msg tag 0x%xu", tag);
50 return FramingError();
51 }
52
53 uint64_t msg_len = 0;
54 auto* start_of_msg = proto_utils::ParseVarInt(start_of_len, end, &msg_len);
55 if (start_of_msg == start_of_len)
56 return ProtoRingBuffer::Message{}; // Not enough data.
57
58 if (msg_len > ProtoRingBuffer::kMaxMsgSize) {
59 PERFETTO_ELOG("RPC framing error, message too large (%" PRIu64 " > %zu)",
60 msg_len, ProtoRingBuffer::kMaxMsgSize);
61 return FramingError();
62 }
63
64 if (start_of_msg + msg_len > end)
65 return ProtoRingBuffer::Message{}; // Not enough data.
66
67 ProtoRingBuffer::Message msg{};
68 msg.start = start_of_msg;
69 msg.len = static_cast<uint32_t>(msg_len);
70 msg.field_id = static_cast<uint32_t>(field_tag >> 3);
71 return msg;
72 }
73
74 } // namespace
75
ProtoRingBuffer()76 ProtoRingBuffer::ProtoRingBuffer()
77 : buf_(base::PagedMemory::Allocate(kGrowBytes)) {}
78 ProtoRingBuffer::~ProtoRingBuffer() = default;
79
Append(const void * data_void,size_t data_len)80 void ProtoRingBuffer::Append(const void* data_void, size_t data_len) {
81 if (failed_)
82 return;
83 const uint8_t* data = static_cast<const uint8_t*>(data_void);
84 PERFETTO_DCHECK(wr_ <= buf_.size());
85 PERFETTO_DCHECK(wr_ >= rd_);
86
87 // If the last call to ReadMessage() consumed all the data in the buffer and
88 // there are no incomplete messages pending, restart from the beginning rather
89 // than keep ringing. This is the most common case.
90 if (rd_ == wr_)
91 rd_ = wr_ = 0;
92
93 // The caller is expected to always issue a ReadMessage() after each Append().
94 PERFETTO_CHECK(!fastpath_.valid());
95 if (rd_ == wr_) {
96 auto msg = TryReadMessage(data, data + data_len);
97 if (msg.valid() && msg.end() == (data + data_len)) {
98 // Fastpath: in many cases, the underlying stream will effectively
99 // preserve the atomicity of messages for most small messages.
100 // In this case we can avoid the extra buf_ roundtrip and just pass a
101 // pointer to |data| + (proto preamble len).
102 // The next call to ReadMessage)= will return |fastpath_|.
103 fastpath_ = std::move(msg);
104 return;
105 }
106 }
107
108 size_t avail = buf_.size() - wr_;
109 if (data_len > avail) {
110 // This whole section should be hit extremely rare.
111
112 // Try first just recompacting the buffer by moving everything to the left.
113 // This can happen if we received "a message and a bit" on each Append call
114 // so we ended pup in a situation like:
115 // buf_: [unused space] [msg1 incomplete]
116 // ^rd_ ^wr_
117 //
118 // After recompaction:
119 // buf_: [msg1 incomplete]
120 // ^rd_ ^wr_
121 uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
122 memmove(&buf[0], &buf[rd_], wr_ - rd_);
123 avail += rd_;
124 wr_ -= rd_;
125 rd_ = 0;
126 if (data_len > avail) {
127 // The compaction didn't free up enough space and we need to expand the
128 // ring buffer. Yes, we could have detected this earlier and split the
129 // code paths, rather than first compacting and then realizing it wasn't
130 // sufficient. However, that would make the code harder to reason about,
131 // creating code paths that are nearly never hit, hence making it more
132 // likely to accumulate bugs in future. All this is very rare.
133 size_t new_size = buf_.size();
134 while (data_len > new_size - wr_)
135 new_size += kGrowBytes;
136 if (new_size > kMaxMsgSize * 2) {
137 failed_ = true;
138 return;
139 }
140 auto new_buf = base::PagedMemory::Allocate(new_size);
141 memcpy(new_buf.Get(), buf_.Get(), buf_.size());
142 buf_ = std::move(new_buf);
143 avail = new_size - wr_;
144 // No need to touch rd_ / wr_ cursors.
145 }
146 }
147
148 // Append the received data at the end of the ring buffer.
149 uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
150 memcpy(&buf[wr_], data, data_len);
151 wr_ += data_len;
152 }
153
ReadMessage()154 ProtoRingBuffer::Message ProtoRingBuffer::ReadMessage() {
155 if (failed_)
156 return FramingError();
157
158 if (fastpath_.valid()) {
159 // The fastpath can only be hit when the buffer is empty.
160 PERFETTO_CHECK(rd_ == wr_);
161 auto msg = std::move(fastpath_);
162 fastpath_ = Message{};
163 return msg;
164 }
165
166 uint8_t* buf = static_cast<uint8_t*>(buf_.Get());
167
168 PERFETTO_DCHECK(rd_ <= wr_);
169 if (rd_ >= wr_)
170 return Message{}; // Completely empty.
171
172 auto msg = TryReadMessage(&buf[rd_], &buf[wr_]);
173 if (!msg.valid()) {
174 failed_ = failed_ || msg.fatal_framing_error;
175 return msg; // Return |msg| because it could be a framing error.
176 }
177
178 // Note: msg.start is > buf[rd_], because it skips the proto preamble.
179 PERFETTO_DCHECK(msg.start > &buf[rd_]);
180 const uint8_t* msg_end = msg.start + msg.len;
181 PERFETTO_CHECK(msg_end > &buf[rd_] && msg_end <= &buf[wr_]);
182 auto msg_outer_len = static_cast<size_t>(msg_end - &buf[rd_]);
183 rd_ += msg_outer_len;
184 return msg;
185 }
186
187 } // namespace trace_processor
188 } // namespace perfetto
189