• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
22 
23 #include "src/core/ext/filters/client_channel/subchannel.h"
24 
25 namespace grpc_core {
26 
27 #define GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_AFTER_ATTEMPTS 100
28 #define GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_MICROS 10
29 
GlobalSubchannelPool()30 GlobalSubchannelPool::GlobalSubchannelPool() {
31   subchannel_map_ = grpc_avl_create(&subchannel_avl_vtable_);
32   gpr_mu_init(&mu_);
33 }
34 
~GlobalSubchannelPool()35 GlobalSubchannelPool::~GlobalSubchannelPool() {
36   gpr_mu_destroy(&mu_);
37   grpc_avl_unref(subchannel_map_, nullptr);
38 }
39 
Init()40 void GlobalSubchannelPool::Init() {
41   instance_ = new RefCountedPtr<GlobalSubchannelPool>(
42       MakeRefCounted<GlobalSubchannelPool>());
43 }
44 
Shutdown()45 void GlobalSubchannelPool::Shutdown() {
46   // To ensure Init() was called before.
47   GPR_ASSERT(instance_ != nullptr);
48   // To ensure Shutdown() was not called before.
49   GPR_ASSERT(*instance_ != nullptr);
50   instance_->reset();
51   delete instance_;
52 }
53 
instance()54 RefCountedPtr<GlobalSubchannelPool> GlobalSubchannelPool::instance() {
55   GPR_ASSERT(instance_ != nullptr);
56   GPR_ASSERT(*instance_ != nullptr);
57   return *instance_;
58 }
59 
RegisterSubchannel(SubchannelKey * key,Subchannel * constructed)60 Subchannel* GlobalSubchannelPool::RegisterSubchannel(SubchannelKey* key,
61                                                      Subchannel* constructed) {
62   Subchannel* c = nullptr;
63   // Compare and swap (CAS) loop:
64   for (int attempt_count = 0; c == nullptr; attempt_count++) {
65     // Ref the shared map to have a local copy.
66     gpr_mu_lock(&mu_);
67     grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr);
68     gpr_mu_unlock(&mu_);
69     // Check to see if a subchannel already exists.
70     c = static_cast<Subchannel*>(grpc_avl_get(old_map, key, nullptr));
71     if (c != nullptr) {
72       // The subchannel already exists. Try to reuse it.
73       c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "subchannel_register+reuse");
74       if (c != nullptr) {
75         GRPC_SUBCHANNEL_UNREF(constructed,
76                               "subchannel_register+found_existing");
77         // Exit the CAS loop without modifying the shared map.
78       } else {
79         // Reuse of the subchannel failed, so retry CAS loop
80         if (attempt_count >=
81             GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_AFTER_ATTEMPTS) {
82           // GRPC_SUBCHANNEL_REF_FROM_WEAK_REF returning nullptr means that the
83           // subchannel we got is no longer valid and it's going to be removed
84           // from the AVL tree soon. Spinning here excesively here can actually
85           // prevent another thread from removing the subchannel, basically
86           // resulting in a live lock. See b/157516542 for more details.
87           // TODO(jtattermusch): the entire ref-counting mechanism for
88           // subchannels should be overhaulded, but the current workaround
89           // is fine for short-term.
90           // TODO(jtattermusch): gpr does not support thread yield operation,
91           // so a very short wait is the best we can do.
92           gpr_sleep_until(gpr_time_add(
93               gpr_now(GPR_CLOCK_REALTIME),
94               gpr_time_from_micros(GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_MICROS,
95                                    GPR_TIMESPAN)));
96         }
97       }
98     } else {
99       // There hasn't been such subchannel. Add one.
100       // Note that we should ref the old map first because grpc_avl_add() will
101       // unref it while we still need to access it later.
102       grpc_avl new_map = grpc_avl_add(
103           grpc_avl_ref(old_map, nullptr), new SubchannelKey(*key),
104           GRPC_SUBCHANNEL_WEAK_REF(constructed, "subchannel_register+new"),
105           nullptr);
106       // Try to publish the change to the shared map. It may happen (but
107       // unlikely) that some other thread has changed the shared map, so compare
108       // to make sure it's unchanged before swapping. Retry if it's changed.
109       gpr_mu_lock(&mu_);
110       if (old_map.root == subchannel_map_.root) {
111         GPR_SWAP(grpc_avl, new_map, subchannel_map_);
112         c = constructed;
113       }
114       gpr_mu_unlock(&mu_);
115       grpc_avl_unref(new_map, nullptr);
116     }
117     grpc_avl_unref(old_map, nullptr);
118   }
119   return c;
120 }
121 
UnregisterSubchannel(SubchannelKey * key)122 void GlobalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) {
123   bool done = false;
124   // Compare and swap (CAS) loop:
125   while (!done) {
126     // Ref the shared map to have a local copy.
127     gpr_mu_lock(&mu_);
128     grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr);
129     gpr_mu_unlock(&mu_);
130     // Remove the subchannel.
131     // Note that we should ref the old map first because grpc_avl_remove() will
132     // unref it while we still need to access it later.
133     grpc_avl new_map =
134         grpc_avl_remove(grpc_avl_ref(old_map, nullptr), key, nullptr);
135     // Try to publish the change to the shared map. It may happen (but
136     // unlikely) that some other thread has changed the shared map, so compare
137     // to make sure it's unchanged before swapping. Retry if it's changed.
138     gpr_mu_lock(&mu_);
139     if (old_map.root == subchannel_map_.root) {
140       GPR_SWAP(grpc_avl, new_map, subchannel_map_);
141       done = true;
142     }
143     gpr_mu_unlock(&mu_);
144     grpc_avl_unref(new_map, nullptr);
145     grpc_avl_unref(old_map, nullptr);
146   }
147 }
148 
FindSubchannel(SubchannelKey * key)149 Subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) {
150   // Lock, and take a reference to the subchannel map.
151   // We don't need to do the search under a lock as AVL's are immutable.
152   gpr_mu_lock(&mu_);
153   grpc_avl index = grpc_avl_ref(subchannel_map_, nullptr);
154   gpr_mu_unlock(&mu_);
155   Subchannel* c = static_cast<Subchannel*>(grpc_avl_get(index, key, nullptr));
156   if (c != nullptr) c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "found_from_pool");
157   grpc_avl_unref(index, nullptr);
158   return c;
159 }
160 
161 RefCountedPtr<GlobalSubchannelPool>* GlobalSubchannelPool::instance_ = nullptr;
162 
163 namespace {
164 
sck_avl_destroy(void * p,void *)165 void sck_avl_destroy(void* p, void* /*user_data*/) {
166   SubchannelKey* key = static_cast<SubchannelKey*>(p);
167   delete key;
168 }
169 
sck_avl_copy(void * p,void *)170 void* sck_avl_copy(void* p, void* /*unused*/) {
171   const SubchannelKey* key = static_cast<const SubchannelKey*>(p);
172   auto* new_key = new SubchannelKey(*key);
173   return static_cast<void*>(new_key);
174 }
175 
sck_avl_compare(void * a,void * b,void *)176 long sck_avl_compare(void* a, void* b, void* /*unused*/) {
177   const SubchannelKey* key_a = static_cast<const SubchannelKey*>(a);
178   const SubchannelKey* key_b = static_cast<const SubchannelKey*>(b);
179   return key_a->Cmp(*key_b);
180 }
181 
scv_avl_destroy(void * p,void *)182 void scv_avl_destroy(void* p, void* /*user_data*/) {
183   GRPC_SUBCHANNEL_WEAK_UNREF((Subchannel*)p, "global_subchannel_pool");
184 }
185 
scv_avl_copy(void * p,void *)186 void* scv_avl_copy(void* p, void* /*unused*/) {
187   GRPC_SUBCHANNEL_WEAK_REF((Subchannel*)p, "global_subchannel_pool");
188   return p;
189 }
190 
191 }  // namespace
192 
193 const grpc_avl_vtable GlobalSubchannelPool::subchannel_avl_vtable_ = {
194     sck_avl_destroy,  // destroy_key
195     sck_avl_copy,     // copy_key
196     sck_avl_compare,  // compare_keys
197     scv_avl_destroy,  // destroy_value
198     scv_avl_copy      // copy_value
199 };
200 
201 }  // namespace grpc_core
202