• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/connectivity_state.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 
29 grpc_core::TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
30 
grpc_connectivity_state_name(grpc_connectivity_state state)31 const char* grpc_connectivity_state_name(grpc_connectivity_state state) {
32   switch (state) {
33     case GRPC_CHANNEL_IDLE:
34       return "IDLE";
35     case GRPC_CHANNEL_CONNECTING:
36       return "CONNECTING";
37     case GRPC_CHANNEL_READY:
38       return "READY";
39     case GRPC_CHANNEL_TRANSIENT_FAILURE:
40       return "TRANSIENT_FAILURE";
41     case GRPC_CHANNEL_SHUTDOWN:
42       return "SHUTDOWN";
43   }
44   GPR_UNREACHABLE_CODE(return "UNKNOWN");
45 }
46 
grpc_connectivity_state_init(grpc_connectivity_state_tracker * tracker,grpc_connectivity_state init_state,const char * name)47 void grpc_connectivity_state_init(grpc_connectivity_state_tracker* tracker,
48                                   grpc_connectivity_state init_state,
49                                   const char* name) {
50   gpr_atm_no_barrier_store(&tracker->current_state_atm, init_state);
51   tracker->current_error = GRPC_ERROR_NONE;
52   tracker->watchers = nullptr;
53   tracker->name = gpr_strdup(name);
54 }
55 
grpc_connectivity_state_destroy(grpc_connectivity_state_tracker * tracker)56 void grpc_connectivity_state_destroy(grpc_connectivity_state_tracker* tracker) {
57   grpc_error* error;
58   grpc_connectivity_state_watcher* w;
59   while ((w = tracker->watchers)) {
60     tracker->watchers = w->next;
61 
62     if (GRPC_CHANNEL_SHUTDOWN != *w->current) {
63       *w->current = GRPC_CHANNEL_SHUTDOWN;
64       error = GRPC_ERROR_NONE;
65     } else {
66       error =
67           GRPC_ERROR_CREATE_FROM_STATIC_STRING("Shutdown connectivity owner");
68     }
69     GRPC_CLOSURE_SCHED(w->notify, error);
70     gpr_free(w);
71   }
72   GRPC_ERROR_UNREF(tracker->current_error);
73   gpr_free(tracker->name);
74 }
75 
grpc_connectivity_state_check(grpc_connectivity_state_tracker * tracker)76 grpc_connectivity_state grpc_connectivity_state_check(
77     grpc_connectivity_state_tracker* tracker) {
78   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
79       gpr_atm_no_barrier_load(&tracker->current_state_atm));
80   if (grpc_connectivity_state_trace.enabled()) {
81     gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
82             grpc_connectivity_state_name(cur));
83   }
84   return cur;
85 }
86 
grpc_connectivity_state_get(grpc_connectivity_state_tracker * tracker,grpc_error ** error)87 grpc_connectivity_state grpc_connectivity_state_get(
88     grpc_connectivity_state_tracker* tracker, grpc_error** error) {
89   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
90       gpr_atm_no_barrier_load(&tracker->current_state_atm));
91   if (grpc_connectivity_state_trace.enabled()) {
92     gpr_log(GPR_INFO, "CONWATCH: %p %s: get %s", tracker, tracker->name,
93             grpc_connectivity_state_name(cur));
94   }
95   if (error != nullptr) {
96     *error = GRPC_ERROR_REF(tracker->current_error);
97   }
98   return cur;
99 }
100 
grpc_connectivity_state_has_watchers(grpc_connectivity_state_tracker * connectivity_state)101 bool grpc_connectivity_state_has_watchers(
102     grpc_connectivity_state_tracker* connectivity_state) {
103   return connectivity_state->watchers != nullptr;
104 }
105 
grpc_connectivity_state_notify_on_state_change(grpc_connectivity_state_tracker * tracker,grpc_connectivity_state * current,grpc_closure * notify)106 bool grpc_connectivity_state_notify_on_state_change(
107     grpc_connectivity_state_tracker* tracker, grpc_connectivity_state* current,
108     grpc_closure* notify) {
109   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
110       gpr_atm_no_barrier_load(&tracker->current_state_atm));
111   if (grpc_connectivity_state_trace.enabled()) {
112     if (current == nullptr) {
113       gpr_log(GPR_INFO, "CONWATCH: %p %s: unsubscribe notify=%p", tracker,
114               tracker->name, notify);
115     } else {
116       gpr_log(GPR_INFO, "CONWATCH: %p %s: from %s [cur=%s] notify=%p", tracker,
117               tracker->name, grpc_connectivity_state_name(*current),
118               grpc_connectivity_state_name(cur), notify);
119     }
120   }
121   if (current == nullptr) {
122     grpc_connectivity_state_watcher* w = tracker->watchers;
123     if (w != nullptr && w->notify == notify) {
124       GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
125       tracker->watchers = w->next;
126       gpr_free(w);
127       return false;
128     }
129     while (w != nullptr) {
130       grpc_connectivity_state_watcher* rm_candidate = w->next;
131       if (rm_candidate != nullptr && rm_candidate->notify == notify) {
132         GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_CANCELLED);
133         w->next = w->next->next;
134         gpr_free(rm_candidate);
135         return false;
136       }
137       w = w->next;
138     }
139     return false;
140   } else {
141     if (cur != *current) {
142       *current = cur;
143       GRPC_CLOSURE_SCHED(notify, GRPC_ERROR_REF(tracker->current_error));
144     } else {
145       grpc_connectivity_state_watcher* w =
146           static_cast<grpc_connectivity_state_watcher*>(gpr_malloc(sizeof(*w)));
147       w->current = current;
148       w->notify = notify;
149       w->next = tracker->watchers;
150       tracker->watchers = w;
151     }
152     return cur == GRPC_CHANNEL_IDLE;
153   }
154 }
155 
grpc_connectivity_state_set(grpc_connectivity_state_tracker * tracker,grpc_connectivity_state state,grpc_error * error,const char * reason)156 void grpc_connectivity_state_set(grpc_connectivity_state_tracker* tracker,
157                                  grpc_connectivity_state state,
158                                  grpc_error* error, const char* reason) {
159   grpc_connectivity_state cur = static_cast<grpc_connectivity_state>(
160       gpr_atm_no_barrier_load(&tracker->current_state_atm));
161   grpc_connectivity_state_watcher* w;
162   if (grpc_connectivity_state_trace.enabled()) {
163     const char* error_string = grpc_error_string(error);
164     gpr_log(GPR_INFO, "SET: %p %s: %s --> %s [%s] error=%p %s", tracker,
165             tracker->name, grpc_connectivity_state_name(cur),
166             grpc_connectivity_state_name(state), reason, error, error_string);
167   }
168   switch (state) {
169     case GRPC_CHANNEL_CONNECTING:
170     case GRPC_CHANNEL_IDLE:
171     case GRPC_CHANNEL_READY:
172       GPR_ASSERT(error == GRPC_ERROR_NONE);
173       break;
174     case GRPC_CHANNEL_SHUTDOWN:
175     case GRPC_CHANNEL_TRANSIENT_FAILURE:
176       GPR_ASSERT(error != GRPC_ERROR_NONE);
177       break;
178   }
179   GRPC_ERROR_UNREF(tracker->current_error);
180   tracker->current_error = error;
181   if (cur == state) {
182     return;
183   }
184   GPR_ASSERT(cur != GRPC_CHANNEL_SHUTDOWN);
185   gpr_atm_no_barrier_store(&tracker->current_state_atm, state);
186   while ((w = tracker->watchers) != nullptr) {
187     *w->current = state;
188     tracker->watchers = w->next;
189     if (grpc_connectivity_state_trace.enabled()) {
190       gpr_log(GPR_INFO, "NOTIFY: %p %s: %p", tracker, tracker->name, w->notify);
191     }
192     GRPC_CLOSURE_SCHED(w->notify, GRPC_ERROR_REF(tracker->current_error));
193     gpr_free(w);
194   }
195 }
196