1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/ext/filters/client_channel/subchannel_index.h"
22
23 #include <stdbool.h>
24 #include <string.h>
25
26 #include <grpc/support/alloc.h>
27 #include <grpc/support/string_util.h>
28
29 #include "src/core/lib/avl/avl.h"
30 #include "src/core/lib/channel/channel_args.h"
31 #include "src/core/lib/gpr/tls.h"
32
33 // a map of subchannel_key --> subchannel, used for detecting connections
34 // to the same destination in order to share them
35 static grpc_avl g_subchannel_index;
36
37 static gpr_mu g_mu;
38
39 static gpr_refcount g_refcount;
40
41 struct grpc_subchannel_key {
42 grpc_subchannel_args args;
43 };
44
45 static bool g_force_creation = false;
46
create_key(const grpc_subchannel_args * args,grpc_channel_args * (* copy_channel_args)(const grpc_channel_args * args))47 static grpc_subchannel_key* create_key(
48 const grpc_subchannel_args* args,
49 grpc_channel_args* (*copy_channel_args)(const grpc_channel_args* args)) {
50 grpc_subchannel_key* k =
51 static_cast<grpc_subchannel_key*>(gpr_malloc(sizeof(*k)));
52 k->args.filter_count = args->filter_count;
53 if (k->args.filter_count > 0) {
54 k->args.filters = static_cast<const grpc_channel_filter**>(
55 gpr_malloc(sizeof(*k->args.filters) * k->args.filter_count));
56 memcpy(reinterpret_cast<grpc_channel_filter*>(k->args.filters),
57 args->filters, sizeof(*k->args.filters) * k->args.filter_count);
58 } else {
59 k->args.filters = nullptr;
60 }
61 k->args.args = copy_channel_args(args->args);
62 return k;
63 }
64
grpc_subchannel_key_create(const grpc_subchannel_args * args)65 grpc_subchannel_key* grpc_subchannel_key_create(
66 const grpc_subchannel_args* args) {
67 return create_key(args, grpc_channel_args_normalize);
68 }
69
subchannel_key_copy(grpc_subchannel_key * k)70 static grpc_subchannel_key* subchannel_key_copy(grpc_subchannel_key* k) {
71 return create_key(&k->args, grpc_channel_args_copy);
72 }
73
grpc_subchannel_key_compare(const grpc_subchannel_key * a,const grpc_subchannel_key * b)74 int grpc_subchannel_key_compare(const grpc_subchannel_key* a,
75 const grpc_subchannel_key* b) {
76 // To pretend the keys are different, return a non-zero value.
77 if (GPR_UNLIKELY(g_force_creation)) return 1;
78 int c = GPR_ICMP(a->args.filter_count, b->args.filter_count);
79 if (c != 0) return c;
80 if (a->args.filter_count > 0) {
81 c = memcmp(a->args.filters, b->args.filters,
82 a->args.filter_count * sizeof(*a->args.filters));
83 if (c != 0) return c;
84 }
85 return grpc_channel_args_compare(a->args.args, b->args.args);
86 }
87
grpc_subchannel_key_destroy(grpc_subchannel_key * k)88 void grpc_subchannel_key_destroy(grpc_subchannel_key* k) {
89 gpr_free(reinterpret_cast<grpc_channel_args*>(k->args.filters));
90 grpc_channel_args_destroy(const_cast<grpc_channel_args*>(k->args.args));
91 gpr_free(k);
92 }
93
sck_avl_destroy(void * p,void * user_data)94 static void sck_avl_destroy(void* p, void* user_data) {
95 grpc_subchannel_key_destroy(static_cast<grpc_subchannel_key*>(p));
96 }
97
sck_avl_copy(void * p,void * unused)98 static void* sck_avl_copy(void* p, void* unused) {
99 return subchannel_key_copy(static_cast<grpc_subchannel_key*>(p));
100 }
101
sck_avl_compare(void * a,void * b,void * unused)102 static long sck_avl_compare(void* a, void* b, void* unused) {
103 return grpc_subchannel_key_compare(static_cast<grpc_subchannel_key*>(a),
104 static_cast<grpc_subchannel_key*>(b));
105 }
106
scv_avl_destroy(void * p,void * user_data)107 static void scv_avl_destroy(void* p, void* user_data) {
108 GRPC_SUBCHANNEL_WEAK_UNREF((grpc_subchannel*)p, "subchannel_index");
109 }
110
scv_avl_copy(void * p,void * unused)111 static void* scv_avl_copy(void* p, void* unused) {
112 GRPC_SUBCHANNEL_WEAK_REF((grpc_subchannel*)p, "subchannel_index");
113 return p;
114 }
115
116 static const grpc_avl_vtable subchannel_avl_vtable = {
117 sck_avl_destroy, // destroy_key
118 sck_avl_copy, // copy_key
119 sck_avl_compare, // compare_keys
120 scv_avl_destroy, // destroy_value
121 scv_avl_copy // copy_value
122 };
123
grpc_subchannel_index_init(void)124 void grpc_subchannel_index_init(void) {
125 g_subchannel_index = grpc_avl_create(&subchannel_avl_vtable);
126 gpr_mu_init(&g_mu);
127 gpr_ref_init(&g_refcount, 1);
128 }
129
grpc_subchannel_index_shutdown(void)130 void grpc_subchannel_index_shutdown(void) {
131 // TODO(juanlishen): This refcounting mechanism may lead to memory leackage.
132 // To solve that, we should force polling to flush any pending callbacks, then
133 // shutdown safely.
134 grpc_subchannel_index_unref();
135 }
136
grpc_subchannel_index_unref(void)137 void grpc_subchannel_index_unref(void) {
138 if (gpr_unref(&g_refcount)) {
139 gpr_mu_destroy(&g_mu);
140 grpc_avl_unref(g_subchannel_index, grpc_core::ExecCtx::Get());
141 }
142 }
143
grpc_subchannel_index_ref(void)144 void grpc_subchannel_index_ref(void) { gpr_ref_non_zero(&g_refcount); }
145
grpc_subchannel_index_find(grpc_subchannel_key * key)146 grpc_subchannel* grpc_subchannel_index_find(grpc_subchannel_key* key) {
147 // Lock, and take a reference to the subchannel index.
148 // We don't need to do the search under a lock as avl's are immutable.
149 gpr_mu_lock(&g_mu);
150 grpc_avl index = grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
151 gpr_mu_unlock(&g_mu);
152
153 grpc_subchannel* c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(
154 (grpc_subchannel*)grpc_avl_get(index, key, grpc_core::ExecCtx::Get()),
155 "index_find");
156 grpc_avl_unref(index, grpc_core::ExecCtx::Get());
157
158 return c;
159 }
160
grpc_subchannel_index_register(grpc_subchannel_key * key,grpc_subchannel * constructed)161 grpc_subchannel* grpc_subchannel_index_register(grpc_subchannel_key* key,
162 grpc_subchannel* constructed) {
163 grpc_subchannel* c = nullptr;
164 bool need_to_unref_constructed = false;
165
166 while (c == nullptr) {
167 need_to_unref_constructed = false;
168
169 // Compare and swap loop:
170 // - take a reference to the current index
171 gpr_mu_lock(&g_mu);
172 grpc_avl index =
173 grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
174 gpr_mu_unlock(&g_mu);
175
176 // - Check to see if a subchannel already exists
177 c = static_cast<grpc_subchannel*>(
178 grpc_avl_get(index, key, grpc_core::ExecCtx::Get()));
179 if (c != nullptr) {
180 c = GRPC_SUBCHANNEL_REF_FROM_WEAK_REF(c, "index_register");
181 }
182 if (c != nullptr) {
183 // yes -> we're done
184 need_to_unref_constructed = true;
185 } else {
186 // no -> update the avl and compare/swap
187 grpc_avl updated =
188 grpc_avl_add(grpc_avl_ref(index, grpc_core::ExecCtx::Get()),
189 subchannel_key_copy(key),
190 GRPC_SUBCHANNEL_WEAK_REF(constructed, "index_register"),
191 grpc_core::ExecCtx::Get());
192
193 // it may happen (but it's expected to be unlikely)
194 // that some other thread has changed the index:
195 // compare/swap here to check that, and retry as necessary
196 gpr_mu_lock(&g_mu);
197 if (index.root == g_subchannel_index.root) {
198 GPR_SWAP(grpc_avl, updated, g_subchannel_index);
199 c = constructed;
200 }
201 gpr_mu_unlock(&g_mu);
202
203 grpc_avl_unref(updated, grpc_core::ExecCtx::Get());
204 }
205 grpc_avl_unref(index, grpc_core::ExecCtx::Get());
206 }
207
208 if (need_to_unref_constructed) {
209 GRPC_SUBCHANNEL_UNREF(constructed, "index_register");
210 }
211
212 return c;
213 }
214
grpc_subchannel_index_unregister(grpc_subchannel_key * key,grpc_subchannel * constructed)215 void grpc_subchannel_index_unregister(grpc_subchannel_key* key,
216 grpc_subchannel* constructed) {
217 bool done = false;
218 while (!done) {
219 // Compare and swap loop:
220 // - take a reference to the current index
221 gpr_mu_lock(&g_mu);
222 grpc_avl index =
223 grpc_avl_ref(g_subchannel_index, grpc_core::ExecCtx::Get());
224 gpr_mu_unlock(&g_mu);
225
226 // Check to see if this key still refers to the previously
227 // registered subchannel
228 grpc_subchannel* c = static_cast<grpc_subchannel*>(
229 grpc_avl_get(index, key, grpc_core::ExecCtx::Get()));
230 if (c != constructed) {
231 grpc_avl_unref(index, grpc_core::ExecCtx::Get());
232 break;
233 }
234
235 // compare and swap the update (some other thread may have
236 // mutated the index behind us)
237 grpc_avl updated =
238 grpc_avl_remove(grpc_avl_ref(index, grpc_core::ExecCtx::Get()), key,
239 grpc_core::ExecCtx::Get());
240
241 gpr_mu_lock(&g_mu);
242 if (index.root == g_subchannel_index.root) {
243 GPR_SWAP(grpc_avl, updated, g_subchannel_index);
244 done = true;
245 }
246 gpr_mu_unlock(&g_mu);
247
248 grpc_avl_unref(updated, grpc_core::ExecCtx::Get());
249 grpc_avl_unref(index, grpc_core::ExecCtx::Get());
250 }
251 }
252
grpc_subchannel_index_test_only_set_force_creation(bool force_creation)253 void grpc_subchannel_index_test_only_set_force_creation(bool force_creation) {
254 g_force_creation = force_creation;
255 }
256