• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/trace_processor/rpc/proto_ring_buffer.h"
18 
19 #include <stdint.h>
20 #include <sys/types.h>
21 
22 #include <list>
23 #include <ostream>
24 #include <random>
25 #include <vector>
26 
27 #include "perfetto/ext/base/utils.h"
28 #include "perfetto/protozero/proto_utils.h"
29 #include "test/gtest_and_gmock.h"
30 
31 using testing::ElementsAre;
32 
33 namespace perfetto {
34 namespace trace_processor {
35 
36 // For ASSERT_EQ()
operator ==(const ProtoRingBuffer::Message & a,const ProtoRingBuffer::Message & b)37 inline bool operator==(const ProtoRingBuffer::Message& a,
38                        const ProtoRingBuffer::Message& b) {
39   if (a.field_id != b.field_id || a.len != b.len || a.valid() != b.valid())
40     return false;
41   if (!a.valid())
42     return true;
43   return memcmp(a.start, b.start, a.len) == 0;
44 }
45 
operator <<(std::ostream & stream,const ProtoRingBuffer::Message & msg)46 inline std::ostream& operator<<(std::ostream& stream,
47                                 const ProtoRingBuffer::Message& msg) {
48   stream << "Message{field_id:" << msg.field_id << ", len:" << msg.len;
49   stream << ", payload: \"";
50   static constexpr uint32_t kTruncLen = 16;
51   for (uint32_t i = 0; i < std::min(msg.len, kTruncLen); i++)
52     stream << static_cast<char>(msg.start[i]);
53   if (msg.len > kTruncLen)
54     stream << "...";
55   stream << "\"}";
56   return stream;
57 }
58 
59 namespace {
60 
61 constexpr uint32_t kMaxMsgSize = ProtoRingBuffer::kMaxMsgSize;
62 
63 class ProtoRingBufferTest : public ::testing::Test {
64  public:
MakeProtoMessage(uint32_t field_id,uint32_t len,bool append=false)65   ProtoRingBuffer::Message MakeProtoMessage(uint32_t field_id,
66                                             uint32_t len,
67                                             bool append = false) {
68     ProtoRingBuffer::Message msg{};
69     namespace proto_utils = protozero::proto_utils;
70     const uint8_t* initial_ptr = last_msg_.data();
71     if (!append)
72       last_msg_.clear();
73     size_t initial_size = last_msg_.size();
74 
75     // 20 is an over-estimation of the preamble (fixed by the 2nd resize below).
76     last_msg_.resize(initial_size + len + 20);
77     uint8_t* wptr = &last_msg_[initial_size];
78     auto tag = proto_utils::MakeTagLengthDelimited(field_id);
79     wptr = proto_utils::WriteVarInt(tag, wptr);
80     wptr = proto_utils::WriteVarInt(len, wptr);
81     msg.start = wptr;
82     msg.len = len;
83     msg.field_id = field_id;
84     for (uint32_t i = 0; i < len; i++)
85       *(wptr++) = '0' + ((len + i) % 73);  // 73 prime for more unique patterns.
86 
87     PERFETTO_CHECK(wptr <= &last_msg_.back());
88     last_msg_.resize(static_cast<size_t>(wptr - &last_msg_[0]));
89 
90     // Vector must not expand, because the returned Mesdage relies on pointer
91     // stability. The TEST_F must reserve enough capacity.
92     if (append)
93       PERFETTO_CHECK(last_msg_.data() == initial_ptr);
94     return msg;
95   }
96 
97   std::vector<uint8_t> last_msg_;
98 };
99 
100 // Test that when appending buffers that contain whole messages the ring buffer
101 // is skipped.
TEST_F(ProtoRingBufferTest,Fastpath)102 TEST_F(ProtoRingBufferTest, Fastpath) {
103   ProtoRingBuffer buf;
104   for (uint32_t i = 0; i < 10; i++) {
105     // Write a whole message that hits the fastpath.
106     auto expected = MakeProtoMessage(/*field_id=*/i + 1, /*len=*/i * 7);
107     buf.Append(last_msg_.data(), last_msg_.size());
108     // Shouln't take any space the buffer because it hits the fastpath.
109     EXPECT_EQ(buf.avail(), buf.capacity());
110     auto actual = buf.ReadMessage();
111     ASSERT_TRUE(actual.valid());
112     EXPECT_EQ(actual.start, expected.start);  // Should point to the same buf.
113     EXPECT_EQ(actual, expected);
114 
115     // Now write a message in two fragments. It won't hit the fastpath
116     expected = MakeProtoMessage(/*field_id*/ 1, /*len=*/32);
117     buf.Append(last_msg_.data(), 13);
118     EXPECT_LT(buf.avail(), buf.capacity());
119     EXPECT_FALSE(buf.ReadMessage().valid());
120 
121     // Append 2nd fragment.
122     buf.Append(last_msg_.data() + 13, last_msg_.size() - 13);
123     actual = buf.ReadMessage();
124     ASSERT_TRUE(actual.valid());
125     EXPECT_EQ(actual, expected);
126   }
127 }
128 
TEST_F(ProtoRingBufferTest,CoalescingStream)129 TEST_F(ProtoRingBufferTest, CoalescingStream) {
130   ProtoRingBuffer buf;
131   last_msg_.reserve(1024);
132   std::list<ProtoRingBuffer::Message> expected;
133 
134   // Build 6 messages of 100 bytes each (100 does not include preambles).
135   for (uint32_t i = 1; i <= 6; i++)
136     expected.emplace_back(MakeProtoMessage(i, 100, /*append=*/true));
137 
138   uint32_t frag_lens[] = {120, 20, 471, 1};
139   uint32_t frag_sum = 0;
140   for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++)
141     frag_sum += frag_lens[i];
142   ASSERT_EQ(frag_sum, last_msg_.size());
143 
144   // Append the messages in such a way that each appen either passes a portion
145   // of a message (the 20 ones) or more than a message.
146   uint32_t written = 0;
147   for (uint32_t i = 0; i < base::ArraySize(frag_lens); i++) {
148     buf.Append(&last_msg_[written], frag_lens[i]);
149     written += frag_lens[i];
150     for (;;) {
151       auto msg = buf.ReadMessage();
152       if (!msg.valid())
153         break;
154       ASSERT_FALSE(expected.empty());
155       ASSERT_EQ(expected.front(), msg);
156       expected.pop_front();
157     }
158   }
159   EXPECT_TRUE(expected.empty());
160 }
161 
TEST_F(ProtoRingBufferTest,RandomSizes)162 TEST_F(ProtoRingBufferTest, RandomSizes) {
163   ProtoRingBuffer buf;
164   std::minstd_rand0 rnd(0);
165 
166   last_msg_.reserve(1024 * 1024 * 64);
167   std::list<ProtoRingBuffer::Message> expected;
168 
169   const uint32_t kNumMsg = 100;
170   for (uint32_t i = 0; i < kNumMsg; i++) {
171     uint32_t field_id = static_cast<uint32_t>(1 + (rnd() % 1024u));
172     uint32_t rndval = static_cast<uint32_t>(rnd());
173     uint32_t len = 1 + (rndval % 1024);
174     if ((rndval % 100) < 2) {
175       len *= 10 * 1024;  // 2% of messages will get close to kMaxMsgSize
176     } else if ((rndval % 100) < 20) {
177       len *= 512;  // 18% will be around 500K;
178     }
179     len = std::max(std::min(len, kMaxMsgSize), 1u);
180     expected.push_back(MakeProtoMessage(field_id, len, /*append=*/true));
181   }
182 
183   uint32_t total = static_cast<uint32_t>(last_msg_.size());
184   for (uint32_t frag_sum = 0; frag_sum < total;) {
185     uint32_t frag_len = static_cast<uint32_t>(1 + (rnd() % 32768));
186     frag_len = std::min(frag_len, total - frag_sum);
187     buf.Append(&last_msg_[frag_sum], frag_len);
188     frag_sum += frag_len;
189     for (;;) {
190       auto msg = buf.ReadMessage();
191       if (!msg.valid())
192         break;
193       ASSERT_FALSE(expected.empty());
194       ASSERT_EQ(expected.front(), msg);
195       expected.pop_front();
196     }
197   }
198   EXPECT_TRUE(expected.empty());
199 }
200 
TEST_F(ProtoRingBufferTest,HandleProtoErrorsGracefully)201 TEST_F(ProtoRingBufferTest, HandleProtoErrorsGracefully) {
202   ProtoRingBuffer buf;
203 
204   // Apppend a partial valid 32 byte message, followed by some invalild
205   // data.
206   auto expected = MakeProtoMessage(1, 32);
207   buf.Append(last_msg_.data(), last_msg_.size() - 1);
208   auto msg = buf.ReadMessage();
209   EXPECT_FALSE(msg.valid());
210   EXPECT_FALSE(msg.fatal_framing_error);
211 
212   uint8_t invalid[] = {0x7f, 0x7f, 0x7f, 0x7f};
213   invalid[0] = last_msg_.back();
214   buf.Append(invalid, sizeof(invalid));
215 
216   // The first message shoudl be valild
217   msg = buf.ReadMessage();
218   EXPECT_EQ(msg, expected);
219 
220   // All the rest should be a framing error.
221   for (int i = 0; i < 3; i++) {
222     msg = buf.ReadMessage();
223     EXPECT_FALSE(msg.valid());
224     EXPECT_TRUE(msg.fatal_framing_error);
225 
226     buf.Append(invalid, sizeof(invalid));
227   }
228 }
229 
230 }  // namespace
231 }  // namespace trace_processor
232 }  // namespace perfetto
233