1 /*
2 * Copyright (C) 2021 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "src/protozero/proto_ring_buffer.h"
18
19 #include <stdint.h>
20 #include <sys/types.h>
21
22 #include <list>
23 #include <ostream>
24 #include <random>
25 #include <vector>
26
27 #include "perfetto/ext/base/utils.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "test/gtest_and_gmock.h"
30
31 using testing::ElementsAre;
32
33 namespace protozero {
34
35 // For ASSERT_EQ()
operator ==(const ProtoRingBuffer::Message & a,const ProtoRingBuffer::Message & b)36 inline bool operator==(const ProtoRingBuffer::Message& a,
37 const ProtoRingBuffer::Message& b) {
38 if (a.field_id != b.field_id || a.len != b.len || a.valid() != b.valid())
39 return false;
40 if (!a.valid())
41 return true;
42 return memcmp(a.start, b.start, a.len) == 0;
43 }
44
operator <<(std::ostream & stream,const ProtoRingBuffer::Message & msg)45 inline std::ostream& operator<<(std::ostream& stream,
46 const ProtoRingBuffer::Message& msg) {
47 stream << "Message{field_id:" << msg.field_id << ", len:" << msg.len;
48 stream << ", payload: \"";
49 static constexpr uint32_t kTruncLen = 16;
50 for (uint32_t i = 0; i < std::min(msg.len, kTruncLen); i++)
51 stream << static_cast<char>(msg.start[i]);
52 if (msg.len > kTruncLen)
53 stream << "...";
54 stream << "\"}";
55 return stream;
56 }
57
58 namespace {
59
60 using ::perfetto::base::ArraySize;
61
62 constexpr uint32_t kMaxMsgSize = ProtoRingBuffer::kMaxMsgSize;
63
64 class ProtoRingBufferTest : public ::testing::Test {
65 public:
MakeProtoMessage(uint32_t field_id,uint32_t len,bool append=false)66 ProtoRingBuffer::Message MakeProtoMessage(uint32_t field_id,
67 uint32_t len,
68 bool append = false) {
69 ProtoRingBuffer::Message msg{};
70 namespace proto_utils = protozero::proto_utils;
71 const uint8_t* initial_ptr = last_msg_.data();
72 if (!append)
73 last_msg_.clear();
74 size_t initial_size = last_msg_.size();
75
76 // 20 is an over-estimation of the preamble (fixed by the 2nd resize below).
77 last_msg_.resize(initial_size + len + 20);
78 uint8_t* wptr = &last_msg_[initial_size];
79 auto tag = proto_utils::MakeTagLengthDelimited(field_id);
80 wptr = proto_utils::WriteVarInt(tag, wptr);
81 wptr = proto_utils::WriteVarInt(len, wptr);
82 msg.start = wptr;
83 msg.len = len;
84 msg.field_id = field_id;
85 for (uint32_t i = 0; i < len; i++)
86 *(wptr++) = '0' + ((len + i) % 73); // 73 prime for more unique patterns.
87
88 PERFETTO_CHECK(wptr <= &last_msg_.back());
89 last_msg_.resize(static_cast<size_t>(wptr - &last_msg_[0]));
90
91 // Vector must not expand, because the returned Mesdage relies on pointer
92 // stability. The TEST_F must reserve enough capacity.
93 if (append)
94 PERFETTO_CHECK(last_msg_.data() == initial_ptr);
95 return msg;
96 }
97
98 std::vector<uint8_t> last_msg_;
99 };
100
101 // Test that when appending buffers that contain whole messages the ring buffer
102 // is skipped.
TEST_F(ProtoRingBufferTest,Fastpath)103 TEST_F(ProtoRingBufferTest, Fastpath) {
104 ProtoRingBuffer buf;
105 for (uint32_t i = 0; i < 10; i++) {
106 // Write a whole message that hits the fastpath.
107 auto expected = MakeProtoMessage(/*field_id=*/i + 1, /*len=*/i * 7);
108 buf.Append(last_msg_.data(), last_msg_.size());
109 // Shouln't take any space the buffer because it hits the fastpath.
110 EXPECT_EQ(buf.avail(), buf.capacity());
111 auto actual = buf.ReadMessage();
112 ASSERT_TRUE(actual.valid());
113 EXPECT_EQ(actual.start, expected.start); // Should point to the same buf.
114 EXPECT_EQ(actual, expected);
115
116 // Now write a message in two fragments. It won't hit the fastpath
117 expected = MakeProtoMessage(/*field_id*/ 1, /*len=*/32);
118 buf.Append(last_msg_.data(), 13);
119 EXPECT_LT(buf.avail(), buf.capacity());
120 EXPECT_FALSE(buf.ReadMessage().valid());
121
122 // Append 2nd fragment.
123 buf.Append(last_msg_.data() + 13, last_msg_.size() - 13);
124 actual = buf.ReadMessage();
125 ASSERT_TRUE(actual.valid());
126 EXPECT_EQ(actual, expected);
127 }
128 }
129
TEST_F(ProtoRingBufferTest,CoalescingStream)130 TEST_F(ProtoRingBufferTest, CoalescingStream) {
131 ProtoRingBuffer buf;
132 last_msg_.reserve(1024);
133 std::list<ProtoRingBuffer::Message> expected;
134
135 // Build 6 messages of 100 bytes each (100 does not include preambles).
136 for (uint32_t i = 1; i <= 6; i++)
137 expected.emplace_back(MakeProtoMessage(i, 100, /*append=*/true));
138
139 uint32_t frag_lens[] = {120, 20, 471, 1};
140 uint32_t frag_sum = 0;
141 for (uint32_t i = 0; i < ArraySize(frag_lens); i++)
142 frag_sum += frag_lens[i];
143 ASSERT_EQ(frag_sum, last_msg_.size());
144
145 // Append the messages in such a way that each appen either passes a portion
146 // of a message (the 20 ones) or more than a message.
147 uint32_t written = 0;
148 for (uint32_t i = 0; i < ArraySize(frag_lens); i++) {
149 buf.Append(&last_msg_[written], frag_lens[i]);
150 written += frag_lens[i];
151 for (;;) {
152 auto msg = buf.ReadMessage();
153 if (!msg.valid())
154 break;
155 ASSERT_FALSE(expected.empty());
156 ASSERT_EQ(expected.front(), msg);
157 expected.pop_front();
158 }
159 }
160 EXPECT_TRUE(expected.empty());
161 }
162
TEST_F(ProtoRingBufferTest,RandomSizes)163 TEST_F(ProtoRingBufferTest, RandomSizes) {
164 ProtoRingBuffer buf;
165 std::minstd_rand0 rnd(0);
166
167 last_msg_.reserve(1024 * 1024 * 64);
168 std::list<ProtoRingBuffer::Message> expected;
169
170 const uint32_t kNumMsg = 100;
171 for (uint32_t i = 0; i < kNumMsg; i++) {
172 uint32_t field_id = static_cast<uint32_t>(1 + (rnd() % 1024u));
173 uint32_t rndval = static_cast<uint32_t>(rnd());
174 uint32_t len = 1 + (rndval % 1024);
175 if ((rndval % 100) < 2) {
176 len *= 10 * 1024; // 2% of messages will get close to kMaxMsgSize
177 } else if ((rndval % 100) < 20) {
178 len *= 512; // 18% will be around 500K;
179 }
180 len = std::max(std::min(len, kMaxMsgSize), 1u);
181 expected.push_back(MakeProtoMessage(field_id, len, /*append=*/true));
182 }
183
184 uint32_t total = static_cast<uint32_t>(last_msg_.size());
185 for (uint32_t frag_sum = 0; frag_sum < total;) {
186 uint32_t frag_len = static_cast<uint32_t>(1 + (rnd() % 32768));
187 frag_len = std::min(frag_len, total - frag_sum);
188 buf.Append(&last_msg_[frag_sum], frag_len);
189 frag_sum += frag_len;
190 for (;;) {
191 auto msg = buf.ReadMessage();
192 if (!msg.valid())
193 break;
194 ASSERT_FALSE(expected.empty());
195 ASSERT_EQ(expected.front(), msg);
196 expected.pop_front();
197 }
198 }
199 EXPECT_TRUE(expected.empty());
200 }
201
TEST_F(ProtoRingBufferTest,HandleProtoErrorsGracefully)202 TEST_F(ProtoRingBufferTest, HandleProtoErrorsGracefully) {
203 ProtoRingBuffer buf;
204
205 // Apppend a partial valid 32 byte message, followed by some invalild
206 // data.
207 auto expected = MakeProtoMessage(1, 32);
208 buf.Append(last_msg_.data(), last_msg_.size() - 1);
209 auto msg = buf.ReadMessage();
210 EXPECT_FALSE(msg.valid());
211 EXPECT_FALSE(msg.fatal_framing_error);
212
213 uint8_t invalid[] = {0x7f, 0x7f, 0x7f, 0x7f};
214 invalid[0] = last_msg_.back();
215 buf.Append(invalid, sizeof(invalid));
216
217 // The first message shoudl be valild
218 msg = buf.ReadMessage();
219 EXPECT_EQ(msg, expected);
220
221 // All the rest should be a framing error.
222 for (int i = 0; i < 3; i++) {
223 msg = buf.ReadMessage();
224 EXPECT_FALSE(msg.valid());
225 EXPECT_TRUE(msg.fatal_framing_error);
226
227 buf.Append(invalid, sizeof(invalid));
228 }
229 }
230
231 // A customised ring buffer message reader where every message has a
232 // fixed length of |message_length|.
233 class FixedLengthRingBuffer final : public RingBufferMessageReader {
234 public:
FixedLengthRingBuffer(size_t message_length)235 FixedLengthRingBuffer(size_t message_length)
236 : RingBufferMessageReader(), message_length_(message_length) {}
237
238 protected:
TryReadMessage(const uint8_t * start,const uint8_t * end)239 virtual Message TryReadMessage(const uint8_t* start,
240 const uint8_t* end) override {
241 Message msg{};
242 if (message_length_ <= static_cast<size_t>(end - start)) {
243 msg.start = start;
244 msg.len = static_cast<uint32_t>(message_length_);
245 msg.field_id = 0;
246 }
247 return msg;
248 }
249
250 private:
251 size_t message_length_;
252 };
253
TEST(RingBufferTest,FixedLengthRingBuffer)254 TEST(RingBufferTest, FixedLengthRingBuffer) {
255 FixedLengthRingBuffer buf(3);
256 EXPECT_FALSE(buf.ReadMessage().valid());
257 buf.Append("a", 1);
258 EXPECT_FALSE(buf.ReadMessage().valid());
259 buf.Append("bc", 2);
260 FixedLengthRingBuffer::Message msg = buf.ReadMessage();
261 EXPECT_TRUE(msg.valid());
262 EXPECT_EQ(std::string(reinterpret_cast<const char*>(msg.start),
263 static_cast<size_t>(msg.len)),
264 "abc");
265 }
266
267 } // namespace
268 } // namespace protozero
269