• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright (C) 2021 The Android Open Source Project
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15import {assertTrue} from '../base/logging';
16
17// This class is the TypeScript equivalent of the identically-named C++ class in
18// //protozero/proto_ring_buffer.h. See comments in that header for a detailed
19// description. The architecture is identical.
20
21const kGrowBytes = 128 * 1024;
22const kMaxMsgSize = 1024 * 1024 * 1024;
23
24export class ProtoRingBuffer {
25  private buf = new Uint8Array(kGrowBytes);
26  private fastpath?: Uint8Array;
27  private rd = 0;
28  private wr = 0;
29
30  // The caller must call ReadMessage() after each append() call.
31  // The |data| might be either copied in the internal ring buffer or returned
32  // (% subarray()) to the next ReadMessage() call.
33  append(data: Uint8Array) {
34    assertTrue(this.wr <= this.buf.length);
35    assertTrue(this.rd <= this.wr);
36
37    // If the last call to ReadMessage() consumed all the data in the buffer and
38    // there are no incomplete messages pending, restart from the beginning
39    // rather than keep ringing. This is the most common case.
40    if (this.rd === this.wr) {
41      this.rd = this.wr = 0;
42    }
43
44    // The caller is expected to issue a ReadMessage() after each append().
45    const dataLen = data.length;
46    if (dataLen === 0) return;
47    assertTrue(this.fastpath === undefined);
48    if (this.rd === this.wr) {
49      const msg = ProtoRingBuffer.tryReadMessage(data, 0, dataLen);
50      if (
51        msg !== undefined &&
52        msg.byteOffset + msg.length === data.byteOffset + dataLen
53      ) {
54        // Fastpath: in many cases, the underlying stream will effectively
55        // preserve the atomicity of messages for most small messages.
56        // In this case we can avoid the extra buffer roundtrip and return the
57        // original array (actually a subarray that skips the proto header).
58        // The next call to ReadMessage() will return this.
59        this.fastpath = msg;
60        return;
61      }
62    }
63
64    let avail = this.buf.length - this.wr;
65    if (dataLen > avail) {
66      // This whole section should be hit extremely rarely.
67
68      // Try first just recompacting the buffer by moving everything to the
69      // left. This can happen if we received "a message and a bit" on each
70      // append() call.
71      this.buf.copyWithin(0, this.rd, this.wr);
72      avail += this.rd;
73      this.wr -= this.rd;
74      this.rd = 0;
75      if (dataLen > avail) {
76        // Still not enough, expand the buffer.
77        let newSize = this.buf.length;
78        while (dataLen > newSize - this.wr) {
79          newSize += kGrowBytes;
80        }
81        assertTrue(newSize <= kMaxMsgSize * 2);
82        const newBuf = new Uint8Array(newSize);
83        newBuf.set(this.buf);
84        this.buf = newBuf;
85        // No need to touch rd / wr.
86      }
87    }
88
89    // Append the received data at the end of the ring buffer.
90    this.buf.set(data, this.wr);
91    this.wr += dataLen;
92  }
93
94  // Tries to extract a message from the ring buffer. If there is no message,
95  // or if the current message is still incomplete, returns undefined.
96  // The caller is expected to call this in a loop until it returns undefined.
97  // Note that a single write to Append() can yield more than one message
98  // (see ProtoRingBufferTest.CoalescingStream in the unittest).
99  readMessage(): Uint8Array | undefined {
100    if (this.fastpath !== undefined) {
101      assertTrue(this.rd === this.wr);
102      const msg = this.fastpath;
103      this.fastpath = undefined;
104      return msg;
105    }
106    assertTrue(this.rd <= this.wr);
107    if (this.rd >= this.wr) {
108      return undefined; // Completely empty.
109    }
110    const msg = ProtoRingBuffer.tryReadMessage(this.buf, this.rd, this.wr);
111    if (msg === undefined) return undefined;
112    assertTrue(msg.buffer === this.buf.buffer);
113    assertTrue(this.buf.byteOffset === 0);
114    this.rd = msg.byteOffset + msg.length;
115
116    // Deliberately returning a copy of the data with slice(). In various cases
117    // (streaming query response) the caller will hold onto the returned buffer.
118    // If we get to this point, |msg| is a view of the circular buffer that we
119    // will overwrite on the next calls to append().
120    return msg.slice();
121  }
122
123  private static tryReadMessage(
124    data: Uint8Array,
125    dataStart: number,
126    dataEnd: number,
127  ): Uint8Array | undefined {
128    assertTrue(dataEnd <= data.length);
129    let pos = dataStart;
130    if (pos >= dataEnd) return undefined;
131    const tag = data[pos++]; // Assume one-byte tag.
132    if (tag >= 0x80 || (tag & 0x07) !== 2 /* len delimited */) {
133      throw new Error(
134        `RPC framing error, unexpected tag ${tag} @ offset ${pos - 1}`,
135      );
136    }
137
138    let len = 0;
139    for (let shift = 0 /* no check */; ; shift += 7) {
140      if (pos >= dataEnd) {
141        return undefined; // Not enough data to read varint.
142      }
143      const val = data[pos++];
144      len |= ((val & 0x7f) << shift) >>> 0;
145      if (val < 0x80) break;
146    }
147
148    if (len >= kMaxMsgSize) {
149      throw new Error(
150        `RPC framing error, message too large (${len} > ${kMaxMsgSize}`,
151      );
152    }
153    const end = pos + len;
154    if (end > dataEnd) return undefined;
155
156    // This is a subarray() and not a slice() because in the |fastpath| case
157    // we want to just return the original buffer pushed by append().
158    // In the slow-path (ring-buffer) case, the readMessage() above will create
159    // a copy via slice() before returning it.
160    return data.subarray(pos, end);
161  }
162}
163