1 /* 2 * Copyright (C) 2021 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef SRC_PROTOZERO_PROTO_RING_BUFFER_H_ 18 #define SRC_PROTOZERO_PROTO_RING_BUFFER_H_ 19 20 #include <stdint.h> 21 22 #include "perfetto/ext/base/paged_memory.h" 23 24 namespace protozero { 25 26 // This class buffers and tokenizes proto messages. 27 // 28 // From a logical level, it works with a sequence of protos like this. 29 // [ header 1 ] [ payload 1 ] 30 // [ header 2 ] [ payload 2 ] 31 // [ header 3 ] [ payload 3 ] 32 // Where [ header ] is a variable-length sequence of: 33 // [ Field ID = 1, type = length-delimited] [ length (varint) ]. 34 // 35 // The input to this class is byte-oriented, not message-oriented (like a TCP 36 // stream or a pipe). The caller is not required to respect the boundaries of 37 // each message; only guarantee that data is not lost or duplicated. The 38 // following sequence of inbound events is possible: 39 // 1. [ hdr 1 (incomplete) ... ] 40 // 2. [ ... hdr 1 ] [ payload 1 ] [ hdr 2 ] [ payoad 2 ] [ hdr 3 ] [ pay... ] 41 // 3. [ ...load 3 ] 42 // 43 // This class maintains inbound requests in a ring buffer. 44 // The expected usage is: 45 // ring_buf.Append(data, len); 46 // for (;;) { 47 // auto msg = ring_buf.ReadMessage(); 48 // if (!msg.valid()) 49 // break; 50 // Decode(msg); 51 // } 52 // 53 // After each call to Append, the caller is expected to call ReadMessage() until 54 // it returns an invalid message (signalling no more messages could be decoded). 55 // Note that a single Append can "unblock" > 1 messages, which is why the caller 56 // needs to keep calling ReadMessage in a loop. 57 // 58 // Internal architecture 59 // --------------------- 60 // Internally this is similar to a ring-buffer, with the caveat that it never 61 // wraps, it only expands. Expansions are rare. The deal is that in most cases 62 // the read cursor follows very closely the write cursor. For instance, if the 63 // underlying transport behaves as a dgram socket, after each Append, the read 64 // cursor will chase completely the write cursor. Even if the underlying stream 65 // is not always atomic, the expectation is that the read cursor will eventually 66 // reach the write one within few messages. 67 // A visual example, imagine we have four messages: 2it 4will 2be 4fine 68 // Visually: 69 // 70 // Append("2it4wi"): A message and a bit: 71 // [ 2it 4wi ] 72 // ^R ^W 73 // 74 // After the ReadMessage(), the 1st message will be read, but not the 2nd. 75 // [ 2it 4wi ] 76 // ^R ^W 77 // 78 // Append("ll2be4f") 79 // [ 2it 4will 2be 4f ] 80 // ^R ^W 81 // 82 // After the ReadMessage() loop: 83 // [ 2it 4will 2be 4f ] 84 // ^R ^W 85 // Append("ine") 86 // [ 2it 4will 2be 4fine ] 87 // ^R ^W 88 // 89 // In the next ReadMessage() the R cursor will chase the W cursor. When this 90 // happens (very frequent) we can just reset both cursors to 0 and restart. 91 // If we are unlucky and get to the end of the buffer, two things happen: 92 // 1. We try first to recompact the buffer, moving everything left by R. 93 // 2. If still there isn't enough space, we expand the buffer. 94 // Given that each message is expected to be at most kMaxMsgSize (64 MB), the 95 // expansion is bound at 2 * kMaxMsgSize. 96 97 class RingBufferMessageReader { 98 public: 99 static constexpr size_t kMaxMsgSize = 64 * 1024 * 1024; 100 struct Message { 101 const uint8_t* start = nullptr; 102 uint32_t len = 0; 103 uint32_t field_id = 0; 104 bool fatal_framing_error = false; endMessage105 const uint8_t* end() const { return start + len; } validMessage106 inline bool valid() const { return !!start; } 107 }; 108 109 RingBufferMessageReader(); 110 virtual ~RingBufferMessageReader(); 111 RingBufferMessageReader(const RingBufferMessageReader&) = delete; 112 RingBufferMessageReader& operator=(const RingBufferMessageReader&) = delete; 113 114 // Appends data into the ring buffer, recompacting or resizing it if needed. 115 // Will invaildate the pointers previously handed out. 116 void Append(const void* data, size_t len); 117 118 // If a message can be read, it returns the boundaries of the message 119 // (without including the preamble) and advances the read cursor. 120 // If no message is available, returns a null range. 121 // The returned pointer is only valid until the next call to Append(), as 122 // that can recompact or resize the underlying buffer. 123 Message ReadMessage(); 124 125 // Exposed for testing. capacity()126 size_t capacity() const { return buf_.size(); } avail()127 size_t avail() const { return buf_.size() - (wr_ - rd_); } 128 129 protected: 130 // Subclasses must implement the header parsing. 131 virtual Message TryReadMessage(const uint8_t* start, const uint8_t* end) = 0; 132 133 private: 134 perfetto::base::PagedMemory buf_; 135 Message fastpath_{}; 136 bool failed_ = false; // Set in case of an unrecoverable framing faiulre. 137 size_t rd_ = 0; // Offset of the read cursor in |buf_|. 138 size_t wr_ = 0; // Offset of the write cursor in |buf_|. 139 }; 140 141 class ProtoRingBuffer final : public RingBufferMessageReader { 142 public: 143 ProtoRingBuffer(); 144 ~ProtoRingBuffer() override final; 145 146 protected: 147 Message TryReadMessage(const uint8_t* start, 148 const uint8_t* end) override final; 149 }; 150 151 } // namespace protozero 152 153 #endif // SRC_PROTOZERO_PROTO_RING_BUFFER_H_ 154