1// Copyright (C) 2021 The Android Open Source Project 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15import {assertTrue} from '../base/logging'; 16 17// This class is the TypeScript equivalent of the identically-named C++ class in 18// //protozero/proto_ring_buffer.h. See comments in that header for a detailed 19// description. The architecture is identical. 20 21const kGrowBytes = 128 * 1024; 22const kMaxMsgSize = 1024 * 1024 * 1024; 23 24export class ProtoRingBuffer { 25 private buf = new Uint8Array(kGrowBytes); 26 private fastpath?: Uint8Array; 27 private rd = 0; 28 private wr = 0; 29 30 // The caller must call ReadMessage() after each append() call. 31 // The |data| might be either copied in the internal ring buffer or returned 32 // (% subarray()) to the next ReadMessage() call. 33 append(data: Uint8Array) { 34 assertTrue(this.wr <= this.buf.length); 35 assertTrue(this.rd <= this.wr); 36 37 // If the last call to ReadMessage() consumed all the data in the buffer and 38 // there are no incomplete messages pending, restart from the beginning 39 // rather than keep ringing. This is the most common case. 40 if (this.rd === this.wr) { 41 this.rd = this.wr = 0; 42 } 43 44 // The caller is expected to issue a ReadMessage() after each append(). 45 const dataLen = data.length; 46 if (dataLen === 0) return; 47 assertTrue(this.fastpath === undefined); 48 if (this.rd === this.wr) { 49 const msg = ProtoRingBuffer.tryReadMessage(data, 0, dataLen); 50 if ( 51 msg !== undefined && 52 msg.byteOffset + msg.length === data.byteOffset + dataLen 53 ) { 54 // Fastpath: in many cases, the underlying stream will effectively 55 // preserve the atomicity of messages for most small messages. 56 // In this case we can avoid the extra buffer roundtrip and return the 57 // original array (actually a subarray that skips the proto header). 58 // The next call to ReadMessage() will return this. 59 this.fastpath = msg; 60 return; 61 } 62 } 63 64 let avail = this.buf.length - this.wr; 65 if (dataLen > avail) { 66 // This whole section should be hit extremely rarely. 67 68 // Try first just recompacting the buffer by moving everything to the 69 // left. This can happen if we received "a message and a bit" on each 70 // append() call. 71 this.buf.copyWithin(0, this.rd, this.wr); 72 avail += this.rd; 73 this.wr -= this.rd; 74 this.rd = 0; 75 if (dataLen > avail) { 76 // Still not enough, expand the buffer. 77 let newSize = this.buf.length; 78 while (dataLen > newSize - this.wr) { 79 newSize += kGrowBytes; 80 } 81 assertTrue(newSize <= kMaxMsgSize * 2); 82 const newBuf = new Uint8Array(newSize); 83 newBuf.set(this.buf); 84 this.buf = newBuf; 85 // No need to touch rd / wr. 86 } 87 } 88 89 // Append the received data at the end of the ring buffer. 90 this.buf.set(data, this.wr); 91 this.wr += dataLen; 92 } 93 94 // Tries to extract a message from the ring buffer. If there is no message, 95 // or if the current message is still incomplete, returns undefined. 96 // The caller is expected to call this in a loop until it returns undefined. 97 // Note that a single write to Append() can yield more than one message 98 // (see ProtoRingBufferTest.CoalescingStream in the unittest). 99 readMessage(): Uint8Array | undefined { 100 if (this.fastpath !== undefined) { 101 assertTrue(this.rd === this.wr); 102 const msg = this.fastpath; 103 this.fastpath = undefined; 104 return msg; 105 } 106 assertTrue(this.rd <= this.wr); 107 if (this.rd >= this.wr) { 108 return undefined; // Completely empty. 109 } 110 const msg = ProtoRingBuffer.tryReadMessage(this.buf, this.rd, this.wr); 111 if (msg === undefined) return undefined; 112 assertTrue(msg.buffer === this.buf.buffer); 113 assertTrue(this.buf.byteOffset === 0); 114 this.rd = msg.byteOffset + msg.length; 115 116 // Deliberately returning a copy of the data with slice(). In various cases 117 // (streaming query response) the caller will hold onto the returned buffer. 118 // If we get to this point, |msg| is a view of the circular buffer that we 119 // will overwrite on the next calls to append(). 120 return msg.slice(); 121 } 122 123 private static tryReadMessage( 124 data: Uint8Array, 125 dataStart: number, 126 dataEnd: number, 127 ): Uint8Array | undefined { 128 assertTrue(dataEnd <= data.length); 129 let pos = dataStart; 130 if (pos >= dataEnd) return undefined; 131 const tag = data[pos++]; // Assume one-byte tag. 132 if (tag >= 0x80 || (tag & 0x07) !== 2 /* len delimited */) { 133 throw new Error( 134 `RPC framing error, unexpected tag ${tag} @ offset ${pos - 1}`, 135 ); 136 } 137 138 let len = 0; 139 for (let shift = 0 /* no check */; ; shift += 7) { 140 if (pos >= dataEnd) { 141 return undefined; // Not enough data to read varint. 142 } 143 const val = data[pos++]; 144 len |= ((val & 0x7f) << shift) >>> 0; 145 if (val < 0x80) break; 146 } 147 148 if (len >= kMaxMsgSize) { 149 throw new Error( 150 `RPC framing error, message too large (${len} > ${kMaxMsgSize}`, 151 ); 152 } 153 const end = pos + len; 154 if (end > dataEnd) return undefined; 155 156 // This is a subarray() and not a slice() because in the |fastpath| case 157 // we want to just return the original buffer pushed by append(). 158 // In the slow-path (ring-buffer) case, the readMessage() above will create 159 // a copy via slice() before returning it. 160 return data.subarray(pos, end); 161 } 162} 163