• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2017 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "src/ipc/buffered_frame_deserializer.h"
18 
19 #include <inttypes.h>
20 
21 #include <algorithm>
22 #include <type_traits>
23 #include <utility>
24 
25 #include "google/protobuf/io/zero_copy_stream_impl_lite.h"
26 #include "perfetto/base/logging.h"
27 #include "perfetto/base/utils.h"
28 
29 #include "src/ipc/wire_protocol.pb.h"
30 
31 namespace perfetto {
32 namespace ipc {
33 
34 namespace {
35 
36 // The header is just the number of bytes of the Frame protobuf message.
37 constexpr size_t kHeaderSize = sizeof(uint32_t);
38 }  // namespace
39 
BufferedFrameDeserializer(size_t max_capacity)40 BufferedFrameDeserializer::BufferedFrameDeserializer(size_t max_capacity)
41     : capacity_(max_capacity) {
42   PERFETTO_CHECK(max_capacity % base::kPageSize == 0);
43   PERFETTO_CHECK(max_capacity > base::kPageSize);
44 }
45 
46 BufferedFrameDeserializer::~BufferedFrameDeserializer() = default;
47 
48 BufferedFrameDeserializer::ReceiveBuffer
BeginReceive()49 BufferedFrameDeserializer::BeginReceive() {
50   // Upon the first recv initialize the buffer to the max message size but
51   // release the physical memory for all but the first page. The kernel will
52   // automatically give us physical pages back as soon as we page-fault on them.
53   if (!buf_) {
54     PERFETTO_DCHECK(size_ == 0);
55     buf_ = base::PageAllocator::Allocate(capacity_);
56 
57     // Surely we are going to use at least the first page. There is very little
58     // point in madvising that as well and immedately after telling the kernel
59     // that we want it back (via recv()).
60     int res = madvise(buf() + base::kPageSize, capacity_ - base::kPageSize,
61                       MADV_DONTNEED);
62     PERFETTO_DCHECK(res == 0);
63   }
64 
65   PERFETTO_CHECK(capacity_ > size_);
66   return ReceiveBuffer{buf() + size_, capacity_ - size_};
67 }
68 
EndReceive(size_t recv_size)69 bool BufferedFrameDeserializer::EndReceive(size_t recv_size) {
70   PERFETTO_CHECK(recv_size + size_ <= capacity_);
71   size_ += recv_size;
72 
73   // At this point the contents buf_ can contain:
74   // A) Only a fragment of the header (the size of the frame). E.g.,
75   //    03 00 00 (the header is 4 bytes, one is missing).
76   //
77   // B) A header and a part of the frame. E.g.,
78   //     05 00 00 00         11 22 33
79   //    [ header, size=5 ]  [ Partial frame ]
80   //
81   // C) One or more complete header+frame. E.g.,
82   //     05 00 00 00         11 22 33 44 55   03 00 00 00        AA BB CC
83   //    [ header, size=5 ]  [ Whole frame ]  [ header, size=3 ] [ Whole frame ]
84   //
85   // D) Some complete header+frame(s) and a partial header or frame (C + A/B).
86   //
87   // C Is the more likely case and the one we are optimizing for. A, B, D can
88   // happen because of the streaming nature of the socket.
89   // The invariant of this function is that, when it returns, buf_ is either
90   // empty (we drained all the complete frames) or starts with the header of the
91   // next, still incomplete, frame.
92 
93   size_t consumed_size = 0;
94   for (;;) {
95     if (size_ < consumed_size + kHeaderSize)
96       break;  // Case A, not enough data to read even the header.
97 
98     // Read the header into |payload_size|.
99     uint32_t payload_size = 0;
100     const char* rd_ptr = buf() + consumed_size;
101     memcpy(base::AssumeLittleEndian(&payload_size), rd_ptr, kHeaderSize);
102 
103     // Saturate the |payload_size| to prevent overflows. The > capacity_ check
104     // below will abort the parsing.
105     size_t next_frame_size =
106         std::min(static_cast<size_t>(payload_size), capacity_);
107     next_frame_size += kHeaderSize;
108     rd_ptr += kHeaderSize;
109 
110     if (size_ < consumed_size + next_frame_size) {
111       // Case B. We got the header but not the whole frame.
112       if (next_frame_size > capacity_) {
113         // The caller is expected to shut down the socket and give up at this
114         // point. If it doesn't do that and insists going on at some point it
115         // will hit the capacity check in BeginReceive().
116         PERFETTO_DLOG("Frame too large (size %zu)", next_frame_size);
117         return false;
118       }
119       break;
120     }
121 
122     // Case C. We got at least one header and whole frame.
123     DecodeFrame(rd_ptr, payload_size);
124     consumed_size += next_frame_size;
125   }
126 
127   PERFETTO_DCHECK(consumed_size <= size_);
128   if (consumed_size > 0) {
129     // Shift out the consumed data from the buffer. In the typical case (C)
130     // there is nothing to shift really, just setting size_ = 0 is enough.
131     // Shifting is only for the (unlikely) case D.
132     size_ -= consumed_size;
133     if (size_ > 0) {
134       // Case D. We consumed some frames but there is a leftover at the end of
135       // the buffer. Shift out the consumed bytes, so that on the next round
136       // |buf_| starts with the header of the next unconsumed frame.
137       const char* move_begin = buf() + consumed_size;
138       PERFETTO_CHECK(move_begin > buf());
139       PERFETTO_CHECK(move_begin + size_ <= buf() + capacity_);
140       memmove(buf(), move_begin, size_);
141     }
142     // If we just finished decoding a large frame that used more than one page,
143     // release the extra memory in the buffer. Large frames should be quite
144     // rare.
145     if (consumed_size > base::kPageSize) {
146       size_t size_rounded_up = (size_ / base::kPageSize + 1) * base::kPageSize;
147       if (size_rounded_up < capacity_) {
148         char* madvise_begin = buf() + size_rounded_up;
149         const size_t madvise_size = capacity_ - size_rounded_up;
150         PERFETTO_CHECK(madvise_begin > buf() + size_);
151         PERFETTO_CHECK(madvise_begin + madvise_size <= buf() + capacity_);
152         int res = madvise(madvise_begin, madvise_size, MADV_DONTNEED);
153         PERFETTO_DCHECK(res == 0);
154       }
155     }
156   }
157   // At this point |size_| == 0 for case C, > 0 for cases A, B, D.
158   return true;
159 }
160 
PopNextFrame()161 std::unique_ptr<Frame> BufferedFrameDeserializer::PopNextFrame() {
162   if (decoded_frames_.empty())
163     return nullptr;
164   std::unique_ptr<Frame> frame = std::move(decoded_frames_.front());
165   decoded_frames_.pop_front();
166   return frame;
167 }
168 
DecodeFrame(const char * data,size_t size)169 void BufferedFrameDeserializer::DecodeFrame(const char* data, size_t size) {
170   if (size == 0)
171     return;
172   std::unique_ptr<Frame> frame(new Frame);
173   const int sz = static_cast<int>(size);
174   ::google::protobuf::io::ArrayInputStream stream(data, sz);
175   if (frame->ParseFromBoundedZeroCopyStream(&stream, sz))
176     decoded_frames_.push_back(std::move(frame));
177 }
178 
179 // static
Serialize(const Frame & frame)180 std::string BufferedFrameDeserializer::Serialize(const Frame& frame) {
181   std::string buf;
182   buf.reserve(1024);  // Just an educated guess to avoid trivial expansions.
183   buf.insert(0, kHeaderSize, 0);  // Reserve the space for the header.
184   frame.AppendToString(&buf);
185   const uint32_t payload_size = static_cast<uint32_t>(buf.size() - kHeaderSize);
186   PERFETTO_DCHECK(payload_size == static_cast<uint32_t>(frame.GetCachedSize()));
187   // Don't send messages larger than what the receiver can handle.
188   PERFETTO_DCHECK(kHeaderSize + payload_size <= kIPCBufferSize);
189   char header[kHeaderSize];
190   memcpy(header, base::AssumeLittleEndian(&payload_size), kHeaderSize);
191   buf.replace(0, kHeaderSize, header, kHeaderSize);
192   return buf;
193 }
194 
195 }  // namespace ipc
196 }  // namespace perfetto
197