1 #region Copyright notice and license 2 3 // Copyright 2017 gRPC authors. 4 // 5 // Licensed under the Apache License, Version 2.0 (the "License"); 6 // you may not use this file except in compliance with the License. 7 // You may obtain a copy of the License at 8 // 9 // http://www.apache.org/licenses/LICENSE-2.0 10 // 11 // Unless required by applicable law or agreed to in writing, software 12 // distributed under the License is distributed on an "AS IS" BASIS, 13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 // See the License for the specific language governing permissions and 15 // limitations under the License. 16 17 #endregion 18 19 using System; 20 using System.Threading; 21 using System.Collections.Generic; 22 using Grpc.Core.Utils; 23 24 namespace Grpc.Core.Internal 25 { 26 /// <summary> 27 /// Pool of objects that combines a shared pool and a thread local pool. 28 /// </summary> 29 internal class DefaultObjectPool<T> : IObjectPool<T> 30 where T : class, IPooledObject<T> 31 { 32 readonly object myLock = new object(); 33 readonly Action<T> returnAction; 34 readonly Func<T> itemFactory; 35 36 // Queue shared between threads, access needs to be synchronized. 37 readonly Queue<T> sharedQueue; 38 readonly int sharedCapacity; 39 40 readonly ThreadLocal<ThreadLocalData> threadLocalData; 41 readonly int threadLocalCapacity; 42 readonly int rentLimit; 43 44 bool disposed; 45 46 /// <summary> 47 /// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity. 48 /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately 49 /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected 50 /// after the thread that owns them has finished). 51 /// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease 52 /// operations. 53 /// </summary> DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)54 public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity) 55 { 56 GrpcPreconditions.CheckArgument(sharedCapacity >= 0); 57 GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0); 58 this.returnAction = Return; 59 this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory)); 60 this.sharedQueue = new Queue<T>(sharedCapacity); 61 this.sharedCapacity = sharedCapacity; 62 this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false); 63 this.threadLocalCapacity = threadLocalCapacity; 64 this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1; 65 } 66 67 /// <summary> 68 /// Leases an item from the pool or creates a new instance if the pool is empty. 69 /// Attempts to retrieve the item from the thread local pool first. 70 /// If the thread local pool is empty, the item is taken from the shared pool 71 /// along with more items that are moved to the thread local pool to avoid 72 /// prevent acquiring the lock for shared pool too often. 73 /// The methods should not be called after the pool is disposed, but it won't 74 /// results in an error to do so (after depleting the items potentially left 75 /// in the thread local pool, it will continue returning new objects created by the factory). 76 /// </summary> Lease()77 public T Lease() 78 { 79 var item = LeaseInternal(); 80 item.SetReturnToPoolAction(returnAction); 81 return item; 82 } 83 LeaseInternal()84 private T LeaseInternal() 85 { 86 var localData = threadLocalData.Value; 87 if (localData.Queue.Count > 0) 88 { 89 return localData.Queue.Dequeue(); 90 } 91 if (localData.CreateBudget > 0) 92 { 93 localData.CreateBudget --; 94 return itemFactory(); 95 } 96 97 int itemsMoved = 0; 98 T leasedItem = null; 99 lock(myLock) 100 { 101 if (sharedQueue.Count > 0) 102 { 103 leasedItem = sharedQueue.Dequeue(); 104 } 105 while (sharedQueue.Count > 0 && itemsMoved < rentLimit) 106 { 107 localData.Queue.Enqueue(sharedQueue.Dequeue()); 108 itemsMoved ++; 109 } 110 } 111 112 // If the shared pool didn't contain all rentLimit items, 113 // next time we try to lease we will just create those 114 // instead of trying to grab them from the shared queue. 115 // This is to guarantee we won't be accessing the shared queue too often. 116 localData.CreateBudget = rentLimit - itemsMoved; 117 118 return leasedItem ?? itemFactory(); 119 } 120 121 /// <summary> 122 /// Returns an item to the pool. 123 /// Attempts to add the item to the thread local pool first. 124 /// If the thread local pool is full, item is added to a shared pool, 125 /// along with half of the items for the thread local pool, which 126 /// should prevent acquiring the lock for shared pool too often. 127 /// If called after the pool is disposed, we make best effort not to 128 /// add anything to the thread local pool and we guarantee not to add 129 /// anything to the shared pool (items will be disposed instead). 130 /// </summary> Return(T item)131 public void Return(T item) 132 { 133 GrpcPreconditions.CheckNotNull(item); 134 135 var localData = threadLocalData.Value; 136 if (localData.Queue.Count < threadLocalCapacity && !disposed) 137 { 138 localData.Queue.Enqueue(item); 139 return; 140 } 141 if (localData.DisposeBudget > 0) 142 { 143 localData.DisposeBudget --; 144 item.Dispose(); 145 return; 146 } 147 148 int itemsReturned = 0; 149 int returnLimit = rentLimit + 1; 150 lock (myLock) 151 { 152 if (sharedQueue.Count < sharedCapacity && !disposed) 153 { 154 sharedQueue.Enqueue(item); 155 itemsReturned ++; 156 } 157 while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed) 158 { 159 sharedQueue.Enqueue(localData.Queue.Dequeue()); 160 itemsReturned ++; 161 } 162 } 163 164 // If the shared pool could not accommodate all returnLimit items, 165 // next time we try to return we will just dispose the item 166 // instead of trying to return them to the shared queue. 167 // This is to guarantee we won't be accessing the shared queue too often. 168 localData.DisposeBudget = returnLimit - itemsReturned; 169 170 if (itemsReturned == 0) 171 { 172 localData.DisposeBudget --; 173 item.Dispose(); 174 } 175 } 176 Dispose()177 public void Dispose() 178 { 179 lock (myLock) 180 { 181 if (!disposed) 182 { 183 disposed = true; 184 185 while (sharedQueue.Count > 0) 186 { 187 sharedQueue.Dequeue().Dispose(); 188 } 189 } 190 } 191 } 192 193 class ThreadLocalData 194 { ThreadLocalData(int capacity)195 public ThreadLocalData(int capacity) 196 { 197 this.Queue = new Queue<T>(capacity); 198 } 199 200 public Queue<T> Queue { get; } 201 public int CreateBudget { get; set; } 202 public int DisposeBudget { get; set; } 203 } 204 } 205 } 206