1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_ 18 #define SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_ 19 20 #include <stdint.h> 21 22 #include "perfetto/ext/base/paged_memory.h" 23 24 namespace perfetto { 25 namespace trace_processor { 26 27 // This class buffers and tokenizes proto messages used for the TraceProcessor 28 // RPC interface (See comments in trace_processor.proto). 29 // From a logical level, the RPC is a sequence of protos like this. 30 // [ header 1 ] [ payload 1 ] 31 // [ header 2 ] [ payload 2 ] 32 // [ header 3 ] [ payload 3 ] 33 // Where [ header ] is a variable-length sequence of: 34 // [ Field ID = 1, type = length-delimited] [ length (varint) ]. 35 // The RPC pipe is byte-oriented, not message-oriented (like a TCP stream). 36 // The pipe is not required to respect the boundaries of each message, it only 37 // guarantees that data is not lost or duplicated. The following sequence of 38 // inbound events is possible: 39 // 1. [ hdr 1 (incomplete) ... ] 40 // 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ] 41 // 3. [ ...load 3 ] 42 // 43 // This class maintains inbound requests in a ring buffer. 44 // The expected usage is: 45 // ring_buf.Append(data, len); 46 // for (;;) { 47 // auto msg = ring_buf.ReadMessage(); 48 // if (!msg.valid()) 49 // break; 50 // Decode(msg); 51 // } 52 // 53 // After each call to Append, the caller is expected to call ReadMessage() until 54 // it returns an invalid message (signalling no more messages could be decoded). 55 // Note that a single Append can "unblock" > 1 messages, which is why the caller 56 // needs to keep calling ReadMessage in a loop. 57 // 58 // Internal architecture 59 // --------------------- 60 // Internally this is similar to a ring-buffer, with the caveat that it never 61 // wraps, it only expands. Expansions are rare. The deal is that in most cases 62 // the read cursor follows very closely the write cursor. For instance, if the 63 // uderlying behaves as a dgram socket, after each Append, the read cursor will 64 // chase completely the write cursor. Even if the underyling stream is not 65 // always atomic, the expectation is that the read cursor will eventually reach 66 // the write one within few messages. 67 // A visual example, imagine we have four messages: 2it 4will 2be 4fine 68 // Visually: 69 // 70 // Append("2it4wi"): A message and a bit: 71 // [ 2it 4wi ] 72 // ^R ^W 73 // 74 // After the ReadMessage(), the 1st message will be read, but not the 2nd. 75 // [ 2it 4wi ] 76 // ^R ^W 77 // 78 // Append("ll2be4f") 79 // [ 2it 4will 2be 4f ] 80 // ^R ^W 81 // 82 // After the ReadMessage() loop: 83 // [ 2it 4will 2be 4f ] 84 // ^R ^W 85 // Append("ine") 86 // [ 2it 4will 2be 4fine ] 87 // ^R ^W 88 // 89 // In the next ReadMessage() the R cursor will chase the W cursor. When this 90 // happens (very frequent) we can just reset both cursors to 0 and restart. 91 // If we are unlucky and get to the end of the buffer, two things happen: 92 // 1. We try first to recompact the buffer, moving everything left by R. 93 // 2. If still there isn't enough space, we expand the buffer. 94 // Given that each message is expected to be at most kMaxMsgSize (64 MB), the 95 // expansion is bound at 2 * kMaxMsgSize. 96 class ProtoRingBuffer { 97 public: 98 static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024; 99 struct Message { 100 const uint8_t* start = nullptr; 101 uint32_t len = 0; 102 uint32_t field_id = 0; 103 bool fatal_framing_error = false; endMessage104 const uint8_t* end() const { return start + len; } validMessage105 inline bool valid() const { return !!start; } 106 }; 107 108 ProtoRingBuffer(); 109 ~ProtoRingBuffer(); 110 ProtoRingBuffer(const ProtoRingBuffer&) = delete; 111 ProtoRingBuffer& operator=(const ProtoRingBuffer&) = delete; 112 113 // Appends data into the ring buffer, recompacting or resizing it if needed. 114 // Will invaildate the pointers previously handed out. 115 void Append(const void* data, size_t len); 116 117 // If a protobuf message can be read, it returns the boundaries of the message 118 // (without including the preamble) and advances the read cursor. 119 // If no message is avaiable, returns a null range. 120 // The returned pointer is only valid until the next call to Append(), as 121 // that can recompact or resize the underlying buffer. 122 Message ReadMessage(); 123 124 // Exposed for testing. capacity()125 size_t capacity() const { return buf_.size(); } avail()126 size_t avail() const { return buf_.size() - (wr_ - rd_); } 127 128 private: 129 base::PagedMemory buf_; 130 Message fastpath_{}; 131 bool failed_ = false; // Set in case of an unrecoverable framing faiulre. 132 size_t rd_ = 0; // Offset of the read cursor in |buf_|. 133 size_t wr_ = 0; // Offset of the write cursor in |buf_|. 134 }; 135 136 } // namespace trace_processor 137 } // namespace perfetto 138 139 #endif // SRC_TRACE_PROCESSOR_RPC_PROTO_RING_BUFFER_H_ 140