• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import com.google.common.base.Preconditions;
20 import java.util.IdentityHashMap;
21 import java.util.concurrent.Executors;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ScheduledFuture;
24 import java.util.concurrent.TimeUnit;
25 import javax.annotation.concurrent.ThreadSafe;
26 
27 /**
28  * A holder for shared resource singletons.
29  *
30  * <p>Components like client channels and servers need certain resources, e.g. a thread pool, to
31  * run. If the user has not provided such resources, these components will use a default one, which
32  * is shared as a static resource. This class holds these default resources and manages their
33  * life-cycles.
34  *
35  * <p>A resource is identified by the reference of a {@link Resource} object, which is typically a
36  * singleton, provided to the get() and release() methods. Each Resource object (not its class) maps
37  * to an object cached in the holder.
38  *
39  * <p>Resources are ref-counted and shut down after a delay when the ref-count reaches zero.
40  */
41 @ThreadSafe
42 public final class SharedResourceHolder {
43   static final long DESTROY_DELAY_SECONDS = 1;
44 
45   // The sole holder instance.
46   private static final SharedResourceHolder holder = new SharedResourceHolder(
47       new ScheduledExecutorFactory() {
48         @Override
49         public ScheduledExecutorService createScheduledExecutor() {
50           return Executors.newSingleThreadScheduledExecutor(
51               GrpcUtil.getThreadFactory("grpc-shared-destroyer-%d", true));
52         }
53       });
54 
55   private final IdentityHashMap<Resource<?>, Instance> instances =
56       new IdentityHashMap<Resource<?>, Instance>();
57 
58   private final ScheduledExecutorFactory destroyerFactory;
59 
60   private ScheduledExecutorService destroyer;
61 
62   // Visible to tests that would need to create instances of the holder.
SharedResourceHolder(ScheduledExecutorFactory destroyerFactory)63   SharedResourceHolder(ScheduledExecutorFactory destroyerFactory) {
64     this.destroyerFactory = destroyerFactory;
65   }
66 
67   /**
68    * Try to get an existing instance of the given resource. If an instance does not exist, create a
69    * new one with the given factory.
70    *
71    * @param resource the singleton object that identifies the requested static resource
72    */
get(Resource<T> resource)73   public static <T> T get(Resource<T> resource) {
74     return holder.getInternal(resource);
75   }
76 
77   /**
78    * Releases an instance of the given resource.
79    *
80    * <p>The instance must have been obtained from {@link #get(Resource)}. Otherwise will throw
81    * IllegalArgumentException.
82    *
83    * <p>Caller must not release a reference more than once. It's advisory that you clear the
84    * reference to the instance with the null returned by this method.
85    *
86    * @param resource the singleton Resource object that identifies the released static resource
87    * @param instance the released static resource
88    *
89    * @return a null which the caller can use to clear the reference to that instance.
90    */
release(final Resource<T> resource, final T instance)91   public static <T> T release(final Resource<T> resource, final T instance) {
92     return holder.releaseInternal(resource, instance);
93   }
94 
95   /**
96    * Visible to unit tests.
97    *
98    * @see #get(Resource)
99    */
100   @SuppressWarnings("unchecked")
getInternal(Resource<T> resource)101   synchronized <T> T getInternal(Resource<T> resource) {
102     Instance instance = instances.get(resource);
103     if (instance == null) {
104       instance = new Instance(resource.create());
105       instances.put(resource, instance);
106     }
107     if (instance.destroyTask != null) {
108       instance.destroyTask.cancel(false);
109       instance.destroyTask = null;
110     }
111     instance.refcount++;
112     return (T) instance.payload;
113   }
114 
115   /**
116    * Visible to unit tests.
117    */
releaseInternal(final Resource<T> resource, final T instance)118   synchronized <T> T releaseInternal(final Resource<T> resource, final T instance) {
119     final Instance cached = instances.get(resource);
120     if (cached == null) {
121       throw new IllegalArgumentException("No cached instance found for " + resource);
122     }
123     Preconditions.checkArgument(instance == cached.payload, "Releasing the wrong instance");
124     Preconditions.checkState(cached.refcount > 0, "Refcount has already reached zero");
125     cached.refcount--;
126     if (cached.refcount == 0) {
127       if (GrpcUtil.IS_RESTRICTED_APPENGINE) {
128         // AppEngine must immediately release shared resources, particularly executors
129         // which could retain request-scoped threads which become zombies after the request
130         // completes.
131         resource.close(instance);
132         instances.remove(resource);
133       } else {
134         Preconditions.checkState(cached.destroyTask == null, "Destroy task already scheduled");
135         // Schedule a delayed task to destroy the resource.
136         if (destroyer == null) {
137           destroyer = destroyerFactory.createScheduledExecutor();
138         }
139         cached.destroyTask = destroyer.schedule(new LogExceptionRunnable(new Runnable() {
140           @Override
141           public void run() {
142             synchronized (SharedResourceHolder.this) {
143               // Refcount may have gone up since the task was scheduled. Re-check it.
144               if (cached.refcount == 0) {
145                 resource.close(instance);
146                 instances.remove(resource);
147                 if (instances.isEmpty()) {
148                   destroyer.shutdown();
149                   destroyer = null;
150                 }
151               }
152             }
153           }
154         }), DESTROY_DELAY_SECONDS, TimeUnit.SECONDS);
155       }
156     }
157     // Always returning null
158     return null;
159   }
160 
161   /**
162    * Defines a resource, and the way to create and destroy instances of it.
163    */
164   public interface Resource<T> {
165     /**
166      * Create a new instance of the resource.
167      */
create()168     T create();
169 
170     /**
171      * Destroy the given instance.
172      */
close(T instance)173     void close(T instance);
174   }
175 
176   interface ScheduledExecutorFactory {
createScheduledExecutor()177     ScheduledExecutorService createScheduledExecutor();
178   }
179 
180   private static class Instance {
181     final Object payload;
182     int refcount;
183     ScheduledFuture<?> destroyTask;
184 
Instance(Object payload)185     Instance(Object payload) {
186       this.payload = payload;
187     }
188   }
189 }
190