• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.netty;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.common.base.Preconditions;
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.ByteBufAllocator;
23 import io.netty.buffer.CompositeByteBuf;
24 import io.netty.handler.codec.ByteToMessageDecoder.Cumulator;
25 
26 class NettyAdaptiveCumulator implements Cumulator {
27   private final int composeMinSize;
28 
29   /**
30    * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
31    * compose strategies.
32    *
33    * @param composeMinSize Determines the minimal size of the buffer that should be composed (added
34    *                       as a new component of the {@link CompositeByteBuf}). If the total size
35    *                       of the last component (tail) and the incoming buffer is below this value,
36    *                       the incoming buffer is appended to the tail, and the new component is not
37    *                       added.
38    */
NettyAdaptiveCumulator(int composeMinSize)39   NettyAdaptiveCumulator(int composeMinSize) {
40     Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative");
41     this.composeMinSize = composeMinSize;
42   }
43 
44   /**
45    * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and
46    * compose strategies.
47    *
48    * <p>This cumulator applies a heuristic to make a decision whether to track a reference to the
49    * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into
50    * the last component (the tail) by performing a memory copy.
51    *
52    * <p>It is necessary as a protection from a potential attack on the {@link
53    * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case
54    * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator
55    * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra
56    * compute to read the cumulation.
57    *
58    * <p>Implemented heuristic establishes a minimal threshold for the total size of the tail and
59    * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is
60    * used to avoid a case where attacker alternates the size of data packets to trick the cumulator
61    * into always selecting compose strategy.
62    *
63    * <p>Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands
64    * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when
65    * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new
66    * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure
67    * runtime {@code O(n^2)} is amortized to {@code O(n)}.
68    */
69   @Override
70   @SuppressWarnings("ReferenceEquality")
cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)71   public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
72     if (!cumulation.isReadable()) {
73       cumulation.release();
74       return in;
75     }
76     CompositeByteBuf composite = null;
77     try {
78       if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
79         composite = (CompositeByteBuf) cumulation;
80         // Writer index must equal capacity if we are going to "write"
81         // new components to the end
82         if (composite.writerIndex() != composite.capacity()) {
83           composite.capacity(composite.writerIndex());
84         }
85       } else {
86         composite = alloc.compositeBuffer(Integer.MAX_VALUE)
87             .addFlattenedComponents(true, cumulation);
88       }
89       addInput(alloc, composite, in);
90       in = null;
91       return composite;
92     } finally {
93       if (in != null) {
94         // We must release if the ownership was not transferred as otherwise it may produce a leak
95         in.release();
96         // Also release any new buffer allocated if we're not returning it
97         if (composite != null && composite != cumulation) {
98           composite.release();
99         }
100       }
101     }
102   }
103 
104   @VisibleForTesting
addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in)105   void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
106     if (shouldCompose(composite, in, composeMinSize)) {
107       composite.addFlattenedComponents(true, in);
108     } else {
109       // The total size of the new data and the last component are below the threshold. Merge them.
110       mergeWithCompositeTail(alloc, composite, in);
111     }
112   }
113 
114   @VisibleForTesting
shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize)115   static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) {
116     int componentCount = composite.numComponents();
117     if (composite.numComponents() == 0) {
118       return true;
119     }
120     int inputSize = in.readableBytes();
121     int tailStart = composite.toByteIndex(componentCount - 1);
122     int tailSize = composite.writerIndex() - tailStart;
123     return tailSize + inputSize >= composeMinSize;
124   }
125 
126   /**
127    * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by
128    * expanding or replacing the tail component of the {@link CompositeByteBuf}.
129    *
130    * <p>The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying
131    * the tail component into a new buffer, for each incoming single-byte buffer. We append the new
132    * bytes to the tail, when a write (or a fast write) is possible.
133    *
134    * <p>Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to
135    * achieve runtime amortization.
136    *
137    * <p>We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)},
138    * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)},
139    * which doubles buffer capacity by normalizing it to the closest power of two. This assumption
140    * is verified in unit tests for this method.
141    */
142   @VisibleForTesting
mergeWithCompositeTail( ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in)143   static void mergeWithCompositeTail(
144       ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) {
145     int inputSize = in.readableBytes();
146     int tailComponentIndex = composite.numComponents() - 1;
147     int tailStart = composite.toByteIndex(tailComponentIndex);
148     int tailSize = composite.writerIndex() - tailStart;
149     int newTailSize = inputSize + tailSize;
150     ByteBuf tail = composite.component(tailComponentIndex);
151     ByteBuf newTail = null;
152     try {
153       if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) {
154         // Ideal case: the tail isn't shared, and can be expanded to the required capacity.
155         // Take ownership of the tail.
156         newTail = tail.retain();
157 
158         // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with
159         //   the issue fixed.
160         // In certain cases, removing the CompositeByteBuf component, and then adding it back
161         // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844.
162         // This happens because the buffer returned by composite.component() has out-of-sync
163         // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying
164         // buffer, but doesn't set the indexes.
165         //
166         // To get the right indexes we use the fact that composite.internalComponent() returns
167         // the slice() into the readable portion of the underlying buffer.
168         // We use this implementation detail (internalComponent() returning a *SlicedByteBuf),
169         // and combine it with the fact that SlicedByteBuf duplicates have their indexes
170         // adjusted so they correspond to the to the readable portion of the slice.
171         //
172         // Hence composite.internalComponent().duplicate() returns a buffer with the
173         // indexes that should've been on the composite.component() in the first place.
174         // Until the issue is fixed, we manually adjust the indexes of the removed component.
175         ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate();
176         newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex());
177 
178         /*
179          * The tail is a readable non-composite buffer, so writeBytes() handles everything for us.
180          *
181          * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply
182          *   updates its boundary to the end of consecutive memory run assigned to this buffer)
183          * - when the required size doesn't fit into writableBytes(), a new buffer is
184          *   allocated, and the capacity calculated with alloc.calculateNewCapacity()
185          * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf
186          *   is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer.
187          *   Unwrapping buffers is unsafe, and potential benefit of fast writes may not be
188          *   as pronounced because the capacity is doubled with each reallocation.
189          */
190         newTail.writeBytes(in);
191       } else {
192         // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity.
193         newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE));
194         newTail.setBytes(0, composite, tailStart, tailSize)
195             .setBytes(tailSize, in, in.readerIndex(), inputSize)
196             .writerIndex(newTailSize);
197         in.readerIndex(in.writerIndex());
198       }
199       // Store readerIndex to avoid out of bounds writerIndex during component replacement.
200       int prevReader = composite.readerIndex();
201       // Remove the old tail, reset writer index.
202       composite.removeComponent(tailComponentIndex).setIndex(0, tailStart);
203       // Add back the new tail.
204       composite.addFlattenedComponents(true, newTail);
205       // New tail's ownership transferred to the composite buf.
206       newTail = null;
207       in.release();
208       in = null;
209       // Restore the reader. In case it fails we restore the reader after releasing/forgetting
210       // the input and the new tail so that finally block can handles them properly.
211       composite.readerIndex(prevReader);
212     } finally {
213       // Input buffer was merged with the tail.
214       if (in != null) {
215         in.release();
216       }
217       // If new tail's ownership isn't transferred to the composite buf.
218       // Release it to prevent a leak.
219       if (newTail != null) {
220         newTail.release();
221       }
222     }
223   }
224 }
225