1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/transport/connectivity_state.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28
29 #include "src/core/lib/iomgr/combiner.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31
32 namespace grpc_core {
33
34 TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
35
ConnectivityStateName(grpc_connectivity_state state)36 const char* ConnectivityStateName(grpc_connectivity_state state) {
37 switch (state) {
38 case GRPC_CHANNEL_IDLE:
39 return "IDLE";
40 case GRPC_CHANNEL_CONNECTING:
41 return "CONNECTING";
42 case GRPC_CHANNEL_READY:
43 return "READY";
44 case GRPC_CHANNEL_TRANSIENT_FAILURE:
45 return "TRANSIENT_FAILURE";
46 case GRPC_CHANNEL_SHUTDOWN:
47 return "SHUTDOWN";
48 }
49 GPR_UNREACHABLE_CODE(return "UNKNOWN");
50 }
51
52 //
53 // AsyncConnectivityStateWatcherInterface
54 //
55
56 // A fire-and-forget class to asynchronously deliver a connectivity
57 // state notification to a watcher.
58 class AsyncConnectivityStateWatcherInterface::Notifier {
59 public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,grpc_connectivity_state state,const absl::Status & status,const std::shared_ptr<WorkSerializer> & work_serializer)60 Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
61 grpc_connectivity_state state, const absl::Status& status,
62 const std::shared_ptr<WorkSerializer>& work_serializer)
63 : watcher_(std::move(watcher)), state_(state), status_(status) {
64 if (work_serializer != nullptr) {
65 work_serializer->Run(
66 [this]() { SendNotification(this, GRPC_ERROR_NONE); },
67 DEBUG_LOCATION);
68 } else {
69 GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
70 grpc_schedule_on_exec_ctx);
71 ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
72 }
73 }
74
75 private:
SendNotification(void * arg,grpc_error *)76 static void SendNotification(void* arg, grpc_error* /*ignored*/) {
77 Notifier* self = static_cast<Notifier*>(arg);
78 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
79 gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s (%s)",
80 self->watcher_.get(), ConnectivityStateName(self->state_),
81 self->status_.ToString().c_str());
82 }
83 self->watcher_->OnConnectivityStateChange(self->state_, self->status_);
84 delete self;
85 }
86
87 RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
88 const grpc_connectivity_state state_;
89 const absl::Status status_;
90 grpc_closure closure_;
91 };
92
Notify(grpc_connectivity_state state,const absl::Status & status)93 void AsyncConnectivityStateWatcherInterface::Notify(
94 grpc_connectivity_state state, const absl::Status& status) {
95 new Notifier(Ref(), state, status,
96 work_serializer_); // Deletes itself when done.
97 }
98
99 //
100 // ConnectivityStateTracker
101 //
102
~ConnectivityStateTracker()103 ConnectivityStateTracker::~ConnectivityStateTracker() {
104 grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
105 if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
106 for (const auto& p : watchers_) {
107 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
108 gpr_log(GPR_INFO,
109 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
110 name_, this, p.first, ConnectivityStateName(current_state),
111 ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
112 }
113 p.second->Notify(GRPC_CHANNEL_SHUTDOWN, absl::Status());
114 }
115 }
116
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)117 void ConnectivityStateTracker::AddWatcher(
118 grpc_connectivity_state initial_state,
119 OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
120 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
121 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
122 this, watcher.get());
123 }
124 grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
125 if (initial_state != current_state) {
126 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
127 gpr_log(GPR_INFO,
128 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
129 name_, this, watcher.get(), ConnectivityStateName(initial_state),
130 ConnectivityStateName(current_state));
131 }
132 watcher->Notify(current_state, status_);
133 }
134 // If we're in state SHUTDOWN, don't add the watcher, so that it will
135 // be orphaned immediately.
136 if (current_state != GRPC_CHANNEL_SHUTDOWN) {
137 watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
138 }
139 }
140
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)141 void ConnectivityStateTracker::RemoveWatcher(
142 ConnectivityStateWatcherInterface* watcher) {
143 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
144 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
145 name_, this, watcher);
146 }
147 watchers_.erase(watcher);
148 }
149
SetState(grpc_connectivity_state state,const absl::Status & status,const char * reason)150 void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
151 const absl::Status& status,
152 const char* reason) {
153 grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
154 if (state == current_state) return;
155 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
156 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s, %s)",
157 name_, this, ConnectivityStateName(current_state),
158 ConnectivityStateName(state), reason, status.ToString().c_str());
159 }
160 state_.Store(state, MemoryOrder::RELAXED);
161 status_ = status;
162 for (const auto& p : watchers_) {
163 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
164 gpr_log(GPR_INFO,
165 "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
166 name_, this, p.first, ConnectivityStateName(current_state),
167 ConnectivityStateName(state));
168 }
169 p.second->Notify(state, status);
170 }
171 // If the new state is SHUTDOWN, orphan all of the watchers. This
172 // avoids the need for the callers to explicitly cancel them.
173 if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
174 }
175
state() const176 grpc_connectivity_state ConnectivityStateTracker::state() const {
177 grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED);
178 if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
179 gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
180 name_, this, ConnectivityStateName(state));
181 }
182 return state;
183 }
184
185 } // namespace grpc_core
186