• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #region Copyright notice and license
2 
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 
17 #endregion
18 
19 using System;
20 using System.Threading;
21 using System.Collections.Generic;
22 using Grpc.Core.Utils;
23 
24 namespace Grpc.Core.Internal
25 {
26     /// <summary>
27     /// Pool of objects that combines a shared pool and a thread local pool.
28     /// </summary>
29     internal class DefaultObjectPool<T> : IObjectPool<T>
30         where T : class, IPooledObject<T>
31     {
32         readonly object myLock = new object();
33         readonly Action<T> returnAction;
34         readonly Func<T> itemFactory;
35 
36         // Queue shared between threads, access needs to be synchronized.
37         readonly Queue<T> sharedQueue;
38         readonly int sharedCapacity;
39 
40         readonly ThreadLocal<ThreadLocalData> threadLocalData;
41         readonly int threadLocalCapacity;
42         readonly int rentLimit;
43 
44         bool disposed;
45 
46         /// <summary>
47         /// Initializes a new instance of <c>DefaultObjectPool</c> with given shared capacity and thread local capacity.
48         /// Thread local capacity should be significantly smaller than the shared capacity as we don't guarantee immediately
49         /// disposing the objects in the thread local pool after this pool is disposed (they will eventually be garbage collected
50         /// after the thread that owns them has finished).
51         /// On average, the shared pool will only be accessed approx. once for every <c>threadLocalCapacity / 2</c> rent or lease
52         /// operations.
53         /// </summary>
DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)54         public DefaultObjectPool(Func<T> itemFactory, int sharedCapacity, int threadLocalCapacity)
55         {
56             GrpcPreconditions.CheckArgument(sharedCapacity >= 0);
57             GrpcPreconditions.CheckArgument(threadLocalCapacity >= 0);
58             this.returnAction = Return;
59             this.itemFactory = GrpcPreconditions.CheckNotNull(itemFactory, nameof(itemFactory));
60             this.sharedQueue = new Queue<T>(sharedCapacity);
61             this.sharedCapacity = sharedCapacity;
62             this.threadLocalData = new ThreadLocal<ThreadLocalData>(() => new ThreadLocalData(threadLocalCapacity), false);
63             this.threadLocalCapacity = threadLocalCapacity;
64             this.rentLimit = threadLocalCapacity != 1 ? threadLocalCapacity / 2 : 1;
65         }
66 
67         /// <summary>
68         /// Leases an item from the pool or creates a new instance if the pool is empty.
69         /// Attempts to retrieve the item from the thread local pool first.
70         /// If the thread local pool is empty, the item is taken from the shared pool
71         /// along with more items that are moved to the thread local pool to avoid
72         /// prevent acquiring the lock for shared pool too often.
73         /// The methods should not be called after the pool is disposed, but it won't
74         /// results in an error to do so (after depleting the items potentially left
75         /// in the thread local pool, it will continue returning new objects created by the factory).
76         /// </summary>
Lease()77         public T Lease()
78         {
79             var item = LeaseInternal();
80             item.SetReturnToPoolAction(returnAction);
81             return item;
82         }
83 
LeaseInternal()84         private T LeaseInternal()
85         {
86             var localData = threadLocalData.Value;
87             if (localData.Queue.Count > 0)
88             {
89                 return localData.Queue.Dequeue();
90             }
91             if (localData.CreateBudget > 0)
92             {
93                 localData.CreateBudget --;
94                 return itemFactory();
95             }
96 
97             int itemsMoved = 0;
98             T leasedItem = null;
99             lock(myLock)
100             {
101                 if (sharedQueue.Count > 0)
102                 {
103                     leasedItem = sharedQueue.Dequeue();
104                 }
105                 while (sharedQueue.Count > 0 && itemsMoved < rentLimit)
106                 {
107                     localData.Queue.Enqueue(sharedQueue.Dequeue());
108                     itemsMoved ++;
109                 }
110             }
111 
112             // If the shared pool didn't contain all rentLimit items,
113             // next time we try to lease we will just create those
114             // instead of trying to grab them from the shared queue.
115             // This is to guarantee we won't be accessing the shared queue too often.
116             localData.CreateBudget = rentLimit - itemsMoved;
117 
118             return leasedItem ?? itemFactory();
119         }
120 
121         /// <summary>
122         /// Returns an item to the pool.
123         /// Attempts to add the item to the thread local pool first.
124         /// If the thread local pool is full, item is added to a shared pool,
125         /// along with half of the items for the thread local pool, which
126         /// should prevent acquiring the lock for shared pool too often.
127         /// If called after the pool is disposed, we make best effort not to
128         /// add anything to the thread local pool and we guarantee not to add
129         /// anything to the shared pool (items will be disposed instead).
130         /// </summary>
Return(T item)131         public void Return(T item)
132         {
133             GrpcPreconditions.CheckNotNull(item);
134 
135             var localData = threadLocalData.Value;
136             if (localData.Queue.Count < threadLocalCapacity && !disposed)
137             {
138                 localData.Queue.Enqueue(item);
139                 return;
140             }
141             if (localData.DisposeBudget > 0)
142             {
143                 localData.DisposeBudget --;
144                 item.Dispose();
145                 return;
146             }
147 
148             int itemsReturned = 0;
149             int returnLimit = rentLimit + 1;
150             lock (myLock)
151             {
152                 if (sharedQueue.Count < sharedCapacity && !disposed)
153                 {
154                     sharedQueue.Enqueue(item);
155                     itemsReturned ++;
156                 }
157                 while (sharedQueue.Count < sharedCapacity && itemsReturned < returnLimit && !disposed)
158                 {
159                     sharedQueue.Enqueue(localData.Queue.Dequeue());
160                     itemsReturned ++;
161                 }
162             }
163 
164             // If the shared pool could not accommodate all returnLimit items,
165             // next time we try to return we will just dispose the item
166             // instead of trying to return them to the shared queue.
167             // This is to guarantee we won't be accessing the shared queue too often.
168             localData.DisposeBudget = returnLimit - itemsReturned;
169 
170             if (itemsReturned == 0)
171             {
172                 localData.DisposeBudget --;
173                 item.Dispose();
174             }
175         }
176 
Dispose()177         public void Dispose()
178         {
179             lock (myLock)
180             {
181                 if (!disposed)
182                 {
183                     disposed = true;
184 
185                     while (sharedQueue.Count > 0)
186                     {
187                         sharedQueue.Dequeue().Dispose();
188                     }
189                 }
190             }
191         }
192 
193         class ThreadLocalData
194         {
ThreadLocalData(int capacity)195             public ThreadLocalData(int capacity)
196             {
197                 this.Queue = new Queue<T>(capacity);
198             }
199 
200             public Queue<T> Queue { get; }
201             public int CreateBudget { get; set; }
202             public int DisposeBudget { get; set; }
203         }
204     }
205 }
206