1 //
2 //
3 // Copyright 2018 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/global_subchannel_pool.h"
22
23 #include "src/core/ext/filters/client_channel/subchannel.h"
24
25 namespace grpc_core {
26
27 #define GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_AFTER_ATTEMPTS 100
28 #define GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_MICROS 10
29
GlobalSubchannelPool()30 GlobalSubchannelPool::GlobalSubchannelPool() {
31 subchannel_map_ = grpc_avl_create(&subchannel_avl_vtable_);
32 gpr_mu_init(&mu_);
33 }
34
~GlobalSubchannelPool()35 GlobalSubchannelPool::~GlobalSubchannelPool() {
36 gpr_mu_destroy(&mu_);
37 grpc_avl_unref(subchannel_map_, nullptr);
38 }
39
Init()40 void GlobalSubchannelPool::Init() {
41 instance_ = new RefCountedPtr<GlobalSubchannelPool>(
42 MakeRefCounted<GlobalSubchannelPool>());
43 }
44
Shutdown()45 void GlobalSubchannelPool::Shutdown() {
46 // To ensure Init() was called before.
47 GPR_ASSERT(instance_ != nullptr);
48 // To ensure Shutdown() was not called before.
49 GPR_ASSERT(*instance_ != nullptr);
50 instance_->reset();
51 delete instance_;
52 }
53
instance()54 RefCountedPtr<GlobalSubchannelPool> GlobalSubchannelPool::instance() {
55 GPR_ASSERT(instance_ != nullptr);
56 GPR_ASSERT(*instance_ != nullptr);
57 return *instance_;
58 }
59
RegisterSubchannel(SubchannelKey * key,Subchannel * constructed)60 Subchannel* GlobalSubchannelPool::RegisterSubchannel(SubchannelKey* key,
61 Subchannel* constructed) {
62 Subchannel* c = nullptr;
63 // Compare and swap (CAS) loop:
64 for (int attempt_count = 0; c == nullptr; attempt_count++) {
65 // Ref the shared map to have a local copy.
66 gpr_mu_lock(&mu_);
67 grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr);
68 gpr_mu_unlock(&mu_);
69 // Check to see if a subchannel already exists.
70 c = static_cast<Subchannel*>(grpc_avl_get(old_map, key, nullptr));
71 if (c != nullptr) {
72 // The subchannel already exists. Try to reuse it.
73 c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "subchannel_register+reuse");
74 if (c != nullptr) {
75 GRPC_SUBCHANNEL_UNREF(constructed,
76 "subchannel_register+found_existing");
77 // Exit the CAS loop without modifying the shared map.
78 } else {
79 // Reuse of the subchannel failed, so retry CAS loop
80 if (attempt_count >=
81 GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_AFTER_ATTEMPTS) {
82 // GRPC_SUBCHANNEL_REF_FROM_WEAK_REF returning nullptr means that the
83 // subchannel we got is no longer valid and it's going to be removed
84 // from the AVL tree soon. Spinning here excesively here can actually
85 // prevent another thread from removing the subchannel, basically
86 // resulting in a live lock. See b/157516542 for more details.
87 // TODO(jtattermusch): the entire ref-counting mechanism for
88 // subchannels should be overhaulded, but the current workaround
89 // is fine for short-term.
90 // TODO(jtattermusch): gpr does not support thread yield operation,
91 // so a very short wait is the best we can do.
92 gpr_sleep_until(gpr_time_add(
93 gpr_now(GPR_CLOCK_REALTIME),
94 gpr_time_from_micros(GRPC_REGISTER_SUBCHANNEL_CALM_DOWN_MICROS,
95 GPR_TIMESPAN)));
96 }
97 }
98 } else {
99 // There hasn't been such subchannel. Add one.
100 // Note that we should ref the old map first because grpc_avl_add() will
101 // unref it while we still need to access it later.
102 grpc_avl new_map = grpc_avl_add(
103 grpc_avl_ref(old_map, nullptr), new SubchannelKey(*key),
104 GRPC_SUBCHANNEL_WEAK_REF(constructed, "subchannel_register+new"),
105 nullptr);
106 // Try to publish the change to the shared map. It may happen (but
107 // unlikely) that some other thread has changed the shared map, so compare
108 // to make sure it's unchanged before swapping. Retry if it's changed.
109 gpr_mu_lock(&mu_);
110 if (old_map.root == subchannel_map_.root) {
111 GPR_SWAP(grpc_avl, new_map, subchannel_map_);
112 c = constructed;
113 }
114 gpr_mu_unlock(&mu_);
115 grpc_avl_unref(new_map, nullptr);
116 }
117 grpc_avl_unref(old_map, nullptr);
118 }
119 return c;
120 }
121
UnregisterSubchannel(SubchannelKey * key)122 void GlobalSubchannelPool::UnregisterSubchannel(SubchannelKey* key) {
123 bool done = false;
124 // Compare and swap (CAS) loop:
125 while (!done) {
126 // Ref the shared map to have a local copy.
127 gpr_mu_lock(&mu_);
128 grpc_avl old_map = grpc_avl_ref(subchannel_map_, nullptr);
129 gpr_mu_unlock(&mu_);
130 // Remove the subchannel.
131 // Note that we should ref the old map first because grpc_avl_remove() will
132 // unref it while we still need to access it later.
133 grpc_avl new_map =
134 grpc_avl_remove(grpc_avl_ref(old_map, nullptr), key, nullptr);
135 // Try to publish the change to the shared map. It may happen (but
136 // unlikely) that some other thread has changed the shared map, so compare
137 // to make sure it's unchanged before swapping. Retry if it's changed.
138 gpr_mu_lock(&mu_);
139 if (old_map.root == subchannel_map_.root) {
140 GPR_SWAP(grpc_avl, new_map, subchannel_map_);
141 done = true;
142 }
143 gpr_mu_unlock(&mu_);
144 grpc_avl_unref(new_map, nullptr);
145 grpc_avl_unref(old_map, nullptr);
146 }
147 }
148
FindSubchannel(SubchannelKey * key)149 Subchannel* GlobalSubchannelPool::FindSubchannel(SubchannelKey* key) {
150 // Lock, and take a reference to the subchannel map.
151 // We don't need to do the search under a lock as AVL's are immutable.
152 gpr_mu_lock(&mu_);
153 grpc_avl index = grpc_avl_ref(subchannel_map_, nullptr);
154 gpr_mu_unlock(&mu_);
155 Subchannel* c = static_cast<Subchannel*>(grpc_avl_get(index, key, nullptr));
156 if (c != nullptr) c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "found_from_pool");
157 grpc_avl_unref(index, nullptr);
158 return c;
159 }
160
161 RefCountedPtr<GlobalSubchannelPool>* GlobalSubchannelPool::instance_ = nullptr;
162
163 namespace {
164
sck_avl_destroy(void * p,void *)165 void sck_avl_destroy(void* p, void* /*user_data*/) {
166 SubchannelKey* key = static_cast<SubchannelKey*>(p);
167 delete key;
168 }
169
sck_avl_copy(void * p,void *)170 void* sck_avl_copy(void* p, void* /*unused*/) {
171 const SubchannelKey* key = static_cast<const SubchannelKey*>(p);
172 auto* new_key = new SubchannelKey(*key);
173 return static_cast<void*>(new_key);
174 }
175
sck_avl_compare(void * a,void * b,void *)176 long sck_avl_compare(void* a, void* b, void* /*unused*/) {
177 const SubchannelKey* key_a = static_cast<const SubchannelKey*>(a);
178 const SubchannelKey* key_b = static_cast<const SubchannelKey*>(b);
179 return key_a->Cmp(*key_b);
180 }
181
scv_avl_destroy(void * p,void *)182 void scv_avl_destroy(void* p, void* /*user_data*/) {
183 GRPC_SUBCHANNEL_WEAK_UNREF((Subchannel*)p, "global_subchannel_pool");
184 }
185
scv_avl_copy(void * p,void *)186 void* scv_avl_copy(void* p, void* /*unused*/) {
187 GRPC_SUBCHANNEL_WEAK_REF((Subchannel*)p, "global_subchannel_pool");
188 return p;
189 }
190
191 } // namespace
192
193 const grpc_avl_vtable GlobalSubchannelPool::subchannel_avl_vtable_ = {
194 sck_avl_destroy, // destroy_key
195 sck_avl_copy, // copy_key
196 sck_avl_compare, // compare_keys
197 scv_avl_destroy, // destroy_value
198 scv_avl_copy // copy_value
199 };
200
201 } // namespace grpc_core
202