1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/trace_processor/rpc/proto_ring_buffer.h"
18
19 #include <stdint.h>
20 #include <sys/types.h>
21
22 #include <list>
23 #include <ostream>
24 #include <random>
25 #include <vector>
26
27 #include "perfetto/ext/base/utils.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "test/gtest_and_gmock.h"
30
31 using testing::ElementsAre;
32
33 namespace perfetto {
34 namespace trace_processor {
35
36 // For ASSERT_EQ()
operator ==(const ProtoRingBuffer::Message & a,const ProtoRingBuffer::Message & b)37 inline bool operator==(const ProtoRingBuffer::Message& a,
38 const ProtoRingBuffer::Message& b) {
39 if (a.field_id != b.field_id || a.len != b.len || a.valid() != b.valid())
40 return false;
41 if (!a.valid())
42 return true;
43 return memcmp(a.start, b.start, a.len) == 0;
44 }
45
operator <<(std::ostream & stream,const ProtoRingBuffer::Message & msg)46 inline std::ostream& operator<<(std::ostream& stream,
47 const ProtoRingBuffer::Message& msg) {
48 stream << "Message{field_id:" << msg.field_id << ", len:" << msg.len;
49 stream << ", payload: \"";
50 static constexpr uint32_t kTruncLen = 16;
51 for (uint32_t i = 0; i < std::min(msg.len, kTruncLen); i++)
52 stream << static_cast<char>(msg.start[i]);
53 if (msg.len > kTruncLen)
54 stream << "...";
55 stream << "\"}";
56 return stream;
57 }
58
59 namespace {
60
61 constexpr uint32_t kMaxMsgSize = ProtoRingBuffer::kMaxMsgSize;
62
63 class ProtoRingBufferTest : public ::testing::Test {
64 public:
MakeProtoMessage(uint32_t field_id,uint32_t len,bool append=false)65 ProtoRingBuffer::Message MakeProtoMessage(uint32_t field_id,
66 uint32_t len,
67 bool append = false) {
68 ProtoRingBuffer::Message msg{};
69 namespace proto_utils = protozero::proto_utils;
70 const uint8_t* initial_ptr = last_msg_.data();
71 if (!append)
72 last_msg_.clear();
73 size_t initial_size = last_msg_.size();
74
75 // 20 is an over-estimation of the preamble (fixed by the 2nd resize below).
76 last_msg_.resize(initial_size + len + 20);
77 uint8_t* wptr = &last_msg_[initial_size];
78 auto tag = proto_utils::MakeTagLengthDelimited(field_id);
79 wptr = proto_utils::WriteVarInt(tag, wptr);
80 wptr = proto_utils::WriteVarInt(len, wptr);
81 msg.start = wptr;
82 msg.len = len;
83 msg.field_id = field_id;
84 for (uint32_t i = 0; i < len; i++)
85 *(wptr++) = '0' + ((len + i) % 73); // 73 prime for more unique patterns.
86
87 PERFETTO_CHECK(wptr <= &last_msg_.back());
88 last_msg_.resize(static_cast<size_t>(wptr - &last_msg_[0]));
89
90 // Vector must not expand, because the returned Mesdage relies on pointer
91 // stability. The TEST_F must reserve enough capacity.
92 if (append)
93 PERFETTO_CHECK(last_msg_.data() == initial_ptr);
94 return msg;
95 }
96
97 std::vector<uint8_t> last_msg_;
98 };
99
100 // Test that when appending buffers that contain whole messages the ring buffer
101 // is skipped.
TEST_F(ProtoRingBufferTest,Fastpath)102 TEST_F(ProtoRingBufferTest, Fastpath) {
103 ProtoRingBuffer buf;
104 for (uint32_t i = 0; i < 10; i++) {
105 // Write a whole message that hits the fastpath.
106 auto expected = MakeProtoMessage(/*field_id=*/i + 1, /*len=*/i * 7);
107 buf.Append(last_msg_.data(), last_msg_.size());
108 // Shouln't take any space the buffer because it hits the fastpath.
109 EXPECT_EQ(buf.avail(), buf.capacity());
110 auto actual = buf.ReadMessage();
111 ASSERT_TRUE(actual.valid());
112 EXPECT_EQ(actual.start, expected.start); // Should point to the same buf.
113 EXPECT_EQ(actual, expected);
114
115 // Now write a message in two fragments. It won't hit the fastpath
116 expected = MakeProtoMessage(/*field_id*/ 1, /*len=*/32);
117 buf.Append(last_msg_.data(), 13);
118 EXPECT_LT(buf.avail(), buf.capacity());
119 EXPECT_FALSE(buf.ReadMessage().valid());
120
121 // Append 2nd fragment.
122 buf.Append(last_msg_.data() + 13, last_msg_.size() - 13);
123 actual = buf.ReadMessage();
124 ASSERT_TRUE(actual.valid());
125 EXPECT_EQ(actual, expected);
126 }
127 }
128
TEST_F(ProtoRingBufferTest,CoalescingStream)129 TEST_F(ProtoRingBufferTest, CoalescingStream) {
130 ProtoRingBuffer buf;
131 last_msg_.reserve(1024);
132 std::list<ProtoRingBuffer::Message> expected;
133
134 // Build 6 messages of 100 bytes each (100 does not include preambles).
135 for (uint32_t i = 1; i <= 6; i++)
136 expected.emplace_back(MakeProtoMessage(i, 100, /*append=*/true));
137
138 uint32_t frag_lens[] = {120, 20, 471, 1};
139 uint32_t frag_sum = 0;
140 for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++)
141 frag_sum += frag_lens[i];
142 ASSERT_EQ(frag_sum, last_msg_.size());
143
144 // Append the messages in such a way that each appen either passes a portion
145 // of a message (the 20 ones) or more than a message.
146 uint32_t written = 0;
147 for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++) {
148 buf.Append(&last_msg_[written], frag_lens[i]);
149 written += frag_lens[i];
150 for (;;) {
151 auto msg = buf.ReadMessage();
152 if (!msg.valid())
153 break;
154 ASSERT_FALSE(expected.empty());
155 ASSERT_EQ(expected.front(), msg);
156 expected.pop_front();
157 }
158 }
159 EXPECT_TRUE(expected.empty());
160 }
161
TEST_F(ProtoRingBufferTest,RandomSizes)162 TEST_F(ProtoRingBufferTest, RandomSizes) {
163 ProtoRingBuffer buf;
164 std::minstd_rand0 rnd(0);
165
166 last_msg_.reserve(1024 * 1024 * 64);
167 std::list<ProtoRingBuffer::Message> expected;
168
169 const uint32_t kNumMsg = 100;
170 for (uint32_t i = 0; i < kNumMsg; i++) {
171 uint32_t field_id = static_cast<uint32_t>(1 + (rnd() % 1024u));
172 uint32_t rndval = static_cast<uint32_t>(rnd());
173 uint32_t len = 1 + (rndval % 1024);
174 if ((rndval % 100) < 2) {
175 len *= 10 * 1024; // 2% of messages will get close to kMaxMsgSize
176 } else if ((rndval % 100) < 20) {
177 len *= 512; // 18% will be around 500K;
178 }
179 len = std::max(std::min(len, kMaxMsgSize), 1u);
180 expected.push_back(MakeProtoMessage(field_id, len, /*append=*/true));
181 }
182
183 uint32_t total = static_cast<uint32_t>(last_msg_.size());
184 for (uint32_t frag_sum = 0; frag_sum < total;) {
185 uint32_t frag_len = static_cast<uint32_t>(1 + (rnd() % 32768));
186 frag_len = std::min(frag_len, total - frag_sum);
187 buf.Append(&last_msg_[frag_sum], frag_len);
188 frag_sum += frag_len;
189 for (;;) {
190 auto msg = buf.ReadMessage();
191 if (!msg.valid())
192 break;
193 ASSERT_FALSE(expected.empty());
194 ASSERT_EQ(expected.front(), msg);
195 expected.pop_front();
196 }
197 }
198 EXPECT_TRUE(expected.empty());
199 }
200
TEST_F(ProtoRingBufferTest,HandleProtoErrorsGracefully)201 TEST_F(ProtoRingBufferTest, HandleProtoErrorsGracefully) {
202 ProtoRingBuffer buf;
203
204 // Apppend a partial valid 32 byte message, followed by some invalild
205 // data.
206 auto expected = MakeProtoMessage(1, 32);
207 buf.Append(last_msg_.data(), last_msg_.size() - 1);
208 auto msg = buf.ReadMessage();
209 EXPECT_FALSE(msg.valid());
210 EXPECT_FALSE(msg.fatal_framing_error);
211
212 uint8_t invalid[] = {0x7f, 0x7f, 0x7f, 0x7f};
213 invalid[0] = last_msg_.back();
214 buf.Append(invalid, sizeof(invalid));
215
216 // The first message shoudl be valild
217 msg = buf.ReadMessage();
218 EXPECT_EQ(msg, expected);
219
220 // All the rest should be a framing error.
221 for (int i = 0; i < 3; i++) {
222 msg = buf.ReadMessage();
223 EXPECT_FALSE(msg.valid());
224 EXPECT_TRUE(msg.fatal_framing_error);
225
226 buf.Append(invalid, sizeof(invalid));
227 }
228 }
229
230 } // namespace
231 } // namespace trace_processor
232 } // namespace perfetto
233