• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2021 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
18 #define SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
19 
20 #include <stdint.h>
21 
22 #include "perfetto/ext/base/paged_memory.h"
23 
24 namespace perfetto {
25 namespace trace_processor {
26 
27 // This class buffers and tokenizes proto messages used for the TraceProcessor
28 // RPC interface (See comments in trace_processor.proto).
29 // From a logical level, the RPC is a sequence of protos like this.
30 // [ header 1 ] [ payload 1   ]
31 // [ header 2 ] [ payload 2  ]
32 // [ header 3 ] [ payload 3     ]
33 // Where [ header ] is a variable-length sequence of:
34 // [ Field ID = 1, type = length-delimited] [ length (varint) ].
35 // The RPC pipe is byte-oriented, not message-oriented (like a TCP stream).
36 // The pipe is not required to respect the boundaries of each message, it only
37 // guarantees that data is not lost or duplicated. The following sequence of
38 // inbound events is possible:
39 // 1. [ hdr 1 (incomplete) ... ]
40 // 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ]
41 // 3. [ ...load 3 ]
42 //
43 // This class maintains inbound requests in a ring buffer.
44 // The expected usage is:
45 // ring_buf.Append(data, len);
46 // for (;;) {
47 //   auto msg = ring_buf.ReadMessage();
48 //   if (!msg.valid())
49 //     break;
50 //   Decode(msg);
51 // }
52 //
53 // After each call to Append, the caller is expected to call ReadMessage() until
54 // it returns an invalid message (signalling no more messages could be decoded).
55 // Note that a single Append can "unblock" > 1 messages, which is why the caller
56 // needs to keep calling ReadMessage in a loop.
57 //
58 // Internal architecture
59 // ---------------------
60 // Internally this is similar to a ring-buffer, with the caveat that it never
61 // wraps, it only expands. Expansions are rare. The deal is that in most cases
62 // the read cursor follows very closely the write cursor. For instance, if the
63 // uderlying behaves as a dgram socket, after each Append, the read cursor will
64 // chase completely the write cursor. Even if the underyling stream is not
65 // always atomic, the expectation is that the read cursor will eventually reach
66 // the write one within few messages.
67 // A visual example, imagine we have four messages: 2it 4will 2be 4fine
68 // Visually:
69 //
70 // Append("2it4wi"): A message and a bit:
71 // [ 2it 4wi                     ]
72 // ^R       ^W
73 //
74 // After the ReadMessage(), the 1st message will be read, but not the 2nd.
75 // [ 2it 4wi                     ]
76 //      ^R ^W
77 //
78 // Append("ll2be4f")
79 // [ 2it 4will 2be 4f            ]
80 //      ^R           ^W
81 //
82 // After the ReadMessage() loop:
83 // [ 2it 4will 2be 4f            ]
84 //                ^R ^W
85 // Append("ine")
86 // [ 2it 4will 2be 4fine         ]
87 //                ^R    ^W
88 //
89 // In the next ReadMessage() the R cursor will chase the W cursor. When this
90 // happens (very frequent) we can just reset both cursors to 0 and restart.
91 // If we are unlucky and get to the end of the buffer, two things happen:
92 // 1. We try first to recompact the buffer, moving everything left by R.
93 // 2. If still there isn't enough space, we expand the buffer.
94 // Given that each message is expected to be at most kMaxMsgSize (64 MB), the
95 // expansion is bound at 2 * kMaxMsgSize.
96 class ProtoRingBuffer {
97  public:
98   static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024;
99   struct Message {
100     const uint8_t* start = nullptr;
101     uint32_t len = 0;
102     uint32_t field_id = 0;
103     bool fatal_framing_error = false;
endMessage104     const uint8_t* end() const { return start + len; }
validMessage105     inline bool valid() const { return !!start; }
106   };
107 
108   ProtoRingBuffer();
109   ~ProtoRingBuffer();
110   ProtoRingBuffer(const ProtoRingBuffer&) = delete;
111   ProtoRingBuffer& operator=(const ProtoRingBuffer&) = delete;
112 
113   // Appends data into the ring buffer, recompacting or resizing it if needed.
114   // Will invaildate the pointers previously handed out.
115   void Append(const void* data, size_t len);
116 
117   // If a protobuf message can be read, it returns the boundaries of the message
118   // (without including the preamble) and advances the read cursor.
119   // If no message is avaiable, returns a null range.
120   // The returned pointer is only valid until the next call to Append(), as
121   // that can recompact or resize the underlying buffer.
122   Message ReadMessage();
123 
124   // Exposed for testing.
capacity()125   size_t capacity() const { return buf_.size(); }
avail()126   size_t avail() const { return buf_.size() - (wr_ - rd_); }
127 
128  private:
129   base::PagedMemory buf_;
130   Message fastpath_{};
131   bool failed_ = false;  // Set in case of an unrecoverable framing faiulre.
132   size_t rd_ = 0;        // Offset of the read cursor in |buf_|.
133   size_t wr_ = 0;        // Offset of the write cursor in |buf_|.
134 };
135 
136 }  // namespace trace_processor
137 }  // namespace perfetto
138 
139 #endif  // SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_
140