1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.netty; 18 19 import com.google.common.annotations.VisibleForTesting; 20 import com.google.common.base.Preconditions; 21 import io.netty.buffer.ByteBuf; 22 import io.netty.buffer.ByteBufAllocator; 23 import io.netty.buffer.CompositeByteBuf; 24 import io.netty.handler.codec.ByteToMessageDecoder.Cumulator; 25 26 class NettyAdaptiveCumulator implements Cumulator { 27 private final int composeMinSize; 28 29 /** 30 * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and 31 * compose strategies. 32 * 33 * @param composeMinSize Determines the minimal size of the buffer that should be composed (added 34 * as a new component of the {@link CompositeByteBuf}). If the total size 35 * of the last component (tail) and the incoming buffer is below this value, 36 * the incoming buffer is appended to the tail, and the new component is not 37 * added. 38 */ NettyAdaptiveCumulator(int composeMinSize)39 NettyAdaptiveCumulator(int composeMinSize) { 40 Preconditions.checkArgument(composeMinSize >= 0, "composeMinSize must be non-negative"); 41 this.composeMinSize = composeMinSize; 42 } 43 44 /** 45 * "Adaptive" cumulator: cumulate {@link ByteBuf}s by dynamically switching between merge and 46 * compose strategies. 47 * 48 * <p>This cumulator applies a heuristic to make a decision whether to track a reference to the 49 * buffer with bytes received from the network stack in an array ("zero-copy"), or to merge into 50 * the last component (the tail) by performing a memory copy. 51 * 52 * <p>It is necessary as a protection from a potential attack on the {@link 53 * io.netty.handler.codec.ByteToMessageDecoder#COMPOSITE_CUMULATOR}. Consider a pathological case 54 * when an attacker sends TCP packages containing a single byte of data, and forcing the cumulator 55 * to track each one in a separate buffer. The cost is memory overhead for each buffer, and extra 56 * compute to read the cumulation. 57 * 58 * <p>Implemented heuristic establishes a minimal threshold for the total size of the tail and 59 * incoming buffer, below which they are merged. The sum of the tail and the incoming buffer is 60 * used to avoid a case where attacker alternates the size of data packets to trick the cumulator 61 * into always selecting compose strategy. 62 * 63 * <p>Merging strategy attempts to minimize unnecessary memory writes. When possible, it expands 64 * the tail capacity and only copies the incoming buffer into available memory. Otherwise, when 65 * both tail and the buffer must be copied, the tail is reallocated (or fully replaced) with a new 66 * buffer of exponentially increasing capacity (bounded to {@link #composeMinSize}) to ensure 67 * runtime {@code O(n^2)} is amortized to {@code O(n)}. 68 */ 69 @Override 70 @SuppressWarnings("ReferenceEquality") cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in)71 public final ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { 72 if (!cumulation.isReadable()) { 73 cumulation.release(); 74 return in; 75 } 76 CompositeByteBuf composite = null; 77 try { 78 if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) { 79 composite = (CompositeByteBuf) cumulation; 80 // Writer index must equal capacity if we are going to "write" 81 // new components to the end 82 if (composite.writerIndex() != composite.capacity()) { 83 composite.capacity(composite.writerIndex()); 84 } 85 } else { 86 composite = alloc.compositeBuffer(Integer.MAX_VALUE) 87 .addFlattenedComponents(true, cumulation); 88 } 89 addInput(alloc, composite, in); 90 in = null; 91 return composite; 92 } finally { 93 if (in != null) { 94 // We must release if the ownership was not transferred as otherwise it may produce a leak 95 in.release(); 96 // Also release any new buffer allocated if we're not returning it 97 if (composite != null && composite != cumulation) { 98 composite.release(); 99 } 100 } 101 } 102 } 103 104 @VisibleForTesting addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in)105 void addInput(ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { 106 if (shouldCompose(composite, in, composeMinSize)) { 107 composite.addFlattenedComponents(true, in); 108 } else { 109 // The total size of the new data and the last component are below the threshold. Merge them. 110 mergeWithCompositeTail(alloc, composite, in); 111 } 112 } 113 114 @VisibleForTesting shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize)115 static boolean shouldCompose(CompositeByteBuf composite, ByteBuf in, int composeMinSize) { 116 int componentCount = composite.numComponents(); 117 if (composite.numComponents() == 0) { 118 return true; 119 } 120 int inputSize = in.readableBytes(); 121 int tailStart = composite.toByteIndex(componentCount - 1); 122 int tailSize = composite.writerIndex() - tailStart; 123 return tailSize + inputSize >= composeMinSize; 124 } 125 126 /** 127 * Append the given {@link ByteBuf} {@code in} to {@link CompositeByteBuf} {@code composite} by 128 * expanding or replacing the tail component of the {@link CompositeByteBuf}. 129 * 130 * <p>The goal is to prevent {@code O(n^2)} runtime in a pathological case, that forces copying 131 * the tail component into a new buffer, for each incoming single-byte buffer. We append the new 132 * bytes to the tail, when a write (or a fast write) is possible. 133 * 134 * <p>Otherwise, the tail is replaced with a new buffer, with the capacity increased enough to 135 * achieve runtime amortization. 136 * 137 * <p>We assume that implementations of {@link ByteBufAllocator#calculateNewCapacity(int, int)}, 138 * are similar to {@link io.netty.buffer.AbstractByteBufAllocator#calculateNewCapacity(int, int)}, 139 * which doubles buffer capacity by normalizing it to the closest power of two. This assumption 140 * is verified in unit tests for this method. 141 */ 142 @VisibleForTesting mergeWithCompositeTail( ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in)143 static void mergeWithCompositeTail( 144 ByteBufAllocator alloc, CompositeByteBuf composite, ByteBuf in) { 145 int inputSize = in.readableBytes(); 146 int tailComponentIndex = composite.numComponents() - 1; 147 int tailStart = composite.toByteIndex(tailComponentIndex); 148 int tailSize = composite.writerIndex() - tailStart; 149 int newTailSize = inputSize + tailSize; 150 ByteBuf tail = composite.component(tailComponentIndex); 151 ByteBuf newTail = null; 152 try { 153 if (tail.refCnt() == 1 && !tail.isReadOnly() && newTailSize <= tail.maxCapacity()) { 154 // Ideal case: the tail isn't shared, and can be expanded to the required capacity. 155 // Take ownership of the tail. 156 newTail = tail.retain(); 157 158 // TODO(https://github.com/netty/netty/issues/12844): remove when we use Netty with 159 // the issue fixed. 160 // In certain cases, removing the CompositeByteBuf component, and then adding it back 161 // isn't idempotent. An example is provided in https://github.com/netty/netty/issues/12844. 162 // This happens because the buffer returned by composite.component() has out-of-sync 163 // indexes. Under the hood the CompositeByteBuf returns a duplicate() of the underlying 164 // buffer, but doesn't set the indexes. 165 // 166 // To get the right indexes we use the fact that composite.internalComponent() returns 167 // the slice() into the readable portion of the underlying buffer. 168 // We use this implementation detail (internalComponent() returning a *SlicedByteBuf), 169 // and combine it with the fact that SlicedByteBuf duplicates have their indexes 170 // adjusted so they correspond to the to the readable portion of the slice. 171 // 172 // Hence composite.internalComponent().duplicate() returns a buffer with the 173 // indexes that should've been on the composite.component() in the first place. 174 // Until the issue is fixed, we manually adjust the indexes of the removed component. 175 ByteBuf sliceDuplicate = composite.internalComponent(tailComponentIndex).duplicate(); 176 newTail.setIndex(sliceDuplicate.readerIndex(), sliceDuplicate.writerIndex()); 177 178 /* 179 * The tail is a readable non-composite buffer, so writeBytes() handles everything for us. 180 * 181 * - ensureWritable() performs a fast resize when possible (f.e. PooledByteBuf simply 182 * updates its boundary to the end of consecutive memory run assigned to this buffer) 183 * - when the required size doesn't fit into writableBytes(), a new buffer is 184 * allocated, and the capacity calculated with alloc.calculateNewCapacity() 185 * - note that maxFastWritableBytes() would normally allow a fast expansion of PooledByteBuf 186 * is not called because CompositeByteBuf.component() returns a duplicate, wrapped buffer. 187 * Unwrapping buffers is unsafe, and potential benefit of fast writes may not be 188 * as pronounced because the capacity is doubled with each reallocation. 189 */ 190 newTail.writeBytes(in); 191 } else { 192 // The tail is shared, or not expandable. Replace it with a new buffer of desired capacity. 193 newTail = alloc.buffer(alloc.calculateNewCapacity(newTailSize, Integer.MAX_VALUE)); 194 newTail.setBytes(0, composite, tailStart, tailSize) 195 .setBytes(tailSize, in, in.readerIndex(), inputSize) 196 .writerIndex(newTailSize); 197 in.readerIndex(in.writerIndex()); 198 } 199 // Store readerIndex to avoid out of bounds writerIndex during component replacement. 200 int prevReader = composite.readerIndex(); 201 // Remove the old tail, reset writer index. 202 composite.removeComponent(tailComponentIndex).setIndex(0, tailStart); 203 // Add back the new tail. 204 composite.addFlattenedComponents(true, newTail); 205 // New tail's ownership transferred to the composite buf. 206 newTail = null; 207 in.release(); 208 in = null; 209 // Restore the reader. In case it fails we restore the reader after releasing/forgetting 210 // the input and the new tail so that finally block can handles them properly. 211 composite.readerIndex(prevReader); 212 } finally { 213 // Input buffer was merged with the tail. 214 if (in != null) { 215 in.release(); 216 } 217 // If new tail's ownership isn't transferred to the composite buf. 218 // Release it to prevent a leak. 219 if (newTail != null) { 220 newTail.release(); 221 } 222 } 223 } 224 } 225