1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/connectivity_state.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28
29 #include "src/core/lib/iomgr/combiner.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31
32 namespace grpc_core {
33
34 TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
35
ConnectivityStateName(grpc_connectivity_state state)36 const char* ConnectivityStateName(grpc_connectivity_state state) {
37 switch (state) {
38 case GRPC_CHANNEL_IDLE:
39 return "IDLE";
40 case GRPC_CHANNEL_CONNECTING:
41 return "CONNECTING";
42 case GRPC_CHANNEL_READY:
43 return "READY";
44 case GRPC_CHANNEL_TRANSIENT_FAILURE:
45 return "TRANSIENT_FAILURE";
46 case GRPC_CHANNEL_SHUTDOWN:
47 return "SHUTDOWN";
48 }
49 GPR_UNREACHABLE_CODE(return "UNKNOWN");
50 }
51
52 //
53 // AsyncConnectivityStateWatcherInterface
54 //
55
56 // A fire-and-forget class to asynchronously deliver a connectivity
57 // state notification to a watcher.
58 class AsyncConnectivityStateWatcherInterface::Notifier {
59 public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,grpc_connectivity_state state,const std::shared_ptr<WorkSerializer> & work_serializer)60 Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
61 grpc_connectivity_state state,
62 const std::shared_ptr<WorkSerializer>& work_serializer)
63 : watcher_(std::move(watcher)), state_(state) {
64 if (work_serializer != nullptr) {
65 work_serializer->Run(
66 [this]() { SendNotification(this, GRPC_ERROR_NONE); },
67 DEBUG_LOCATION);
68 } else {
69 GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
70 grpc_schedule_on_exec_ctx);
71 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
72 }
73 }
74
75 private:
SendNotification(void * arg,grpc_error *)76 static void SendNotification(void* arg, grpc_error* /*ignored*/) {
77 Notifier* self = static_cast<Notifier*>(arg);
78 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
79 gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s",
80 self->watcher_.get(), ConnectivityStateName(self->state_));
81 }
82 self->watcher_->OnConnectivityStateChange(self->state_);
83 delete self;
84 }
85
86 RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
87 const grpc_connectivity_state state_;
88 grpc_closure closure_;
89 };
90
Notify(grpc_connectivity_state state)91 void AsyncConnectivityStateWatcherInterface::Notify(
92 grpc_connectivity_state state) {
93 new Notifier(Ref(), state, work_serializer_); // Deletes itself when done.
94 }
95
96 //
97 // ConnectivityStateTracker
98 //
99
~ConnectivityStateTracker()100 ConnectivityStateTracker::~ConnectivityStateTracker() {
101 grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
102 if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
103 for (const auto& p : watchers_) {
104 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
105 gpr_log(GPR_INFO,
106 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
107 name_, this, p.first, ConnectivityStateName(current_state),
108 ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
109 }
110 p.second->Notify(GRPC_CHANNEL_SHUTDOWN);
111 }
112 }
113
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)114 void ConnectivityStateTracker::AddWatcher(
115 grpc_connectivity_state initial_state,
116 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
117 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
118 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
119 this, watcher.get());
120 }
121 grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
122 if (initial_state != current_state) {
123 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
124 gpr_log(GPR_INFO,
125 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
126 name_, this, watcher.get(), ConnectivityStateName(initial_state),
127 ConnectivityStateName(current_state));
128 }
129 watcher->Notify(current_state);
130 }
131 // If we're in state SHUTDOWN, don't add the watcher, so that it will
132 // be orphaned immediately.
133 if (current_state != GRPC_CHANNEL_SHUTDOWN) {
134 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
135 }
136 }
137
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)138 void ConnectivityStateTracker::RemoveWatcher(
139 ConnectivityStateWatcherInterface* watcher) {
140 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
141 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
142 name_, this, watcher);
143 }
144 watchers_.erase(watcher);
145 }
146
SetState(grpc_connectivity_state state,const char * reason)147 void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
148 const char* reason) {
149 grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
150 if (state == current_state) return;
151 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
152 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s)", name_,
153 this, ConnectivityStateName(current_state),
154 ConnectivityStateName(state), reason);
155 }
156 state_.Store(state, MemoryOrder::RELAXED);
157 for (const auto& p : watchers_) {
158 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
159 gpr_log(GPR_INFO,
160 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
161 name_, this, p.first, ConnectivityStateName(current_state),
162 ConnectivityStateName(state));
163 }
164 p.second->Notify(state);
165 }
166 // If the new state is SHUTDOWN, orphan all of the watchers. This
167 // avoids the need for the callers to explicitly cancel them.
168 if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
169 }
170
state() const171 grpc_connectivity_state ConnectivityStateTracker::state() const {
172 grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED);
173 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
174 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
175 name_, this, ConnectivityStateName(state));
176 }
177 return state;
178 }
179
180 } // namespace grpc_core
181