1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/connectivity_state.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28
29 grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
30
grpc_connectivity_state_name(grpc_connectivity_state state)31 const char* grpc_connectivity_state_name(grpc_connectivity_state state) {
32 switch (state) {
33 case GRPC_CHANNEL_IDLE:
34 return "IDLE";
35 case GRPC_CHANNEL_CONNECTING:
36 return "CONNECTING";
37 case GRPC_CHANNEL_READY:
38 return "READY";
39 case GRPC_CHANNEL_TRANSIENT_FAILURE:
40 return "TRANSIENT_FAILURE";
41 case GRPC_CHANNEL_SHUTDOWN:
42 return "SHUTDOWN";
43 }
44 GPR_UNREACHABLE_CODE(return "UNKNOWN");
45 }
46
grpc_connectivity_state_init(grpc_connectivity_state_tracker * tracker,grpc_connectivity_state init_state,const char * name)47 void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
48 grpc_connectivity_state init_state,
49 const char* name) {
50 gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
51 tracker->current_error = GRPC_ERROR_NONE;
52 tracker->watchers = nullptr;
53 tracker->name = gpr_strdup(name);
54 }
55
grpc_connectivity_state_destroy(grpc_connectivity_state_tracker * tracker)56 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
57 grpc_error* error;
58 grpc_connectivity_state_watcher* w;
59 while ((w = tracker->watchers)) {
60 tracker->watchers = w->next;
61
62 if (GRPC_CHANNEL_SHUTDOWN != *w->current) {
63 *w->current = GRPC_CHANNEL_SHUTDOWN;
64 error = GRPC_ERROR_NONE;
65 } else {
66 error =
67 GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
68 }
69 GRPC_CLOSURE_SCHED(w->notify, error);
70 gpr_free(w);
71 }
72 GRPC_ERROR_UNREF(tracker->current_error);
73 gpr_free(tracker->name);
74 }
75
grpc_connectivity_state_check(grpc_connectivity_state_tracker * tracker)76 grpc_connectivity_state grpc_connectivity_state_check(
77 grpc_connectivity_state_tracker* tracker) {
78 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
79 gpr_atm_no_barrier_load(&tracker->current_state_atm));
80 if (grpc_connectivity_state_trace.enabled()) {
81 gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
82 grpc_connectivity_state_name(cur));
83 }
84 return cur;
85 }
86
grpc_connectivity_state_get(grpc_connectivity_state_tracker * tracker,grpc_error ** error)87 grpc_connectivity_state grpc_connectivity_state_get(
88 grpc_connectivity_state_tracker* tracker, grpc_error** error) {
89 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
90 gpr_atm_no_barrier_load(&tracker->current_state_atm));
91 if (grpc_connectivity_state_trace.enabled()) {
92 gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
93 grpc_connectivity_state_name(cur));
94 }
95 if (error != nullptr) {
96 *error = GRPC_ERROR_REF(tracker->current_error);
97 }
98 return cur;
99 }
100
grpc_connectivity_state_has_watchers(grpc_connectivity_state_tracker * connectivity_state)101 bool grpc_connectivity_state_has_watchers(
102 grpc_connectivity_state_tracker* connectivity_state) {
103 return connectivity_state->watchers != nullptr;
104 }
105
grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker * tracker,grpc_connectivity_state * current,grpc_closure * notify)106 bool grpc_connectivity_state_notify_on_state_change(
107 grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
108 grpc_closure* notify) {
109 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
110 gpr_atm_no_barrier_load(&tracker->current_state_atm));
111 if (grpc_connectivity_state_trace.enabled()) {
112 if (current == nullptr) {
113 gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
114 tracker->name, notify);
115 } else {
116 gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
117 tracker->name, grpc_connectivity_state_name(*current),
118 grpc_connectivity_state_name(cur), notify);
119 }
120 }
121 if (current == nullptr) {
122 grpc_connectivity_state_watcher* w = tracker->watchers;
123 if (w != nullptr && w->notify == notify) {
124 GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
125 tracker->watchers = w->next;
126 gpr_free(w);
127 return false;
128 }
129 while (w != nullptr) {
130 grpc_connectivity_state_watcher* rm_candidate = w->next;
131 if (rm_candidate != nullptr && rm_candidate->notify == notify) {
132 GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
133 w->next = w->next->next;
134 gpr_free(rm_candidate);
135 return false;
136 }
137 w = w->next;
138 }
139 return false;
140 } else {
141 if (cur != *current) {
142 *current = cur;
143 GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error));
144 } else {
145 grpc_connectivity_state_watcher* w =
146 static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w)));
147 w->current = current;
148 w->notify = notify;
149 w->next = tracker->watchers;
150 tracker->watchers = w;
151 }
152 return cur == GRPC_CHANNEL_IDLE;
153 }
154 }
155
grpc_connectivity_state_set(grpc_connectivity_state_tracker * tracker,grpc_connectivity_state state,grpc_error * error,const char * reason)156 void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
157 grpc_connectivity_state state,
158 grpc_error* error, const char* reason) {
159 grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
160 gpr_atm_no_barrier_load(&tracker->current_state_atm));
161 grpc_connectivity_state_watcher* w;
162 if (grpc_connectivity_state_trace.enabled()) {
163 const char* error_string = grpc_error_string(error);
164 gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
165 tracker->name, grpc_connectivity_state_name(cur),
166 grpc_connectivity_state_name(state), reason, error, error_string);
167 }
168 switch (state) {
169 case GRPC_CHANNEL_CONNECTING:
170 case GRPC_CHANNEL_IDLE:
171 case GRPC_CHANNEL_READY:
172 GPR_ASSERT(error == GRPC_ERROR_NONE);
173 break;
174 case GRPC_CHANNEL_SHUTDOWN:
175 case GRPC_CHANNEL_TRANSIENT_FAILURE:
176 GPR_ASSERT(error != GRPC_ERROR_NONE);
177 break;
178 }
179 GRPC_ERROR_UNREF(tracker->current_error);
180 tracker->current_error = error;
181 if (cur == state) {
182 return;
183 }
184 GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
185 gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
186 while ((w = tracker->watchers) != nullptr) {
187 *w->current = state;
188 tracker->watchers = w->next;
189 if (grpc_connectivity_state_trace.enabled()) {
190 gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
191 }
192 GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error));
193 gpr_free(w);
194 }
195 }
196