• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/connectivity_state.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 
29 #include "src/core/lib/iomgr/combiner.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 
32 namespace grpc_core {
33 
34 TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
35 
ConnectivityStateName(grpc_connectivity_state state)36 const char* ConnectivityStateName(grpc_connectivity_state state) {
37   switch (state) {
38     case GRPC_CHANNEL_IDLE:
39       return "IDLE";
40     case GRPC_CHANNEL_CONNECTING:
41       return "CONNECTING";
42     case GRPC_CHANNEL_READY:
43       return "READY";
44     case GRPC_CHANNEL_TRANSIENT_FAILURE:
45       return "TRANSIENT_FAILURE";
46     case GRPC_CHANNEL_SHUTDOWN:
47       return "SHUTDOWN";
48   }
49   GPR_UNREACHABLE_CODE(return "UNKNOWN");
50 }
51 
52 //
53 // AsyncConnectivityStateWatcherInterface
54 //
55 
56 // A fire-and-forget class to asynchronously deliver a connectivity
57 // state notification to a watcher.
58 class AsyncConnectivityStateWatcherInterface::Notifier {
59  public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,grpc_connectivity_state state,const absl::Status & status,const std::shared_ptr<WorkSerializer> & work_serializer)60   Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
61            grpc_connectivity_state state, const absl::Status& status,
62            const std::shared_ptr<WorkSerializer>& work_serializer)
63       : watcher_(std::move(watcher)), state_(state), status_(status) {
64     if (work_serializer != nullptr) {
65       work_serializer->Run(
66           [this]() { SendNotification(this, GRPC_ERROR_NONE); },
67           DEBUG_LOCATION);
68     } else {
69       GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
70                         grpc_schedule_on_exec_ctx);
71       ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
72     }
73   }
74 
75  private:
SendNotification(void * arg,grpc_error *)76   static void SendNotification(void* arg, grpc_error* /*ignored*/) {
77     Notifier* self = static_cast<Notifier*>(arg);
78     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
79       gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s (%s)",
80               self->watcher_.get(), ConnectivityStateName(self->state_),
81               self->status_.ToString().c_str());
82     }
83     self->watcher_->OnConnectivityStateChange(self->state_, self->status_);
84     delete self;
85   }
86 
87   RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
88   const grpc_connectivity_state state_;
89   const absl::Status status_;
90   grpc_closure closure_;
91 };
92 
Notify(grpc_connectivity_state state,const absl::Status & status)93 void AsyncConnectivityStateWatcherInterface::Notify(
94     grpc_connectivity_state state, const absl::Status& status) {
95   new Notifier(Ref(), state, status,
96                work_serializer_);  // Deletes itself when done.
97 }
98 
99 //
100 // ConnectivityStateTracker
101 //
102 
~ConnectivityStateTracker()103 ConnectivityStateTracker::~ConnectivityStateTracker() {
104   grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
105   if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
106   for (const auto& p : watchers_) {
107     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
108       gpr_log(GPR_INFO,
109               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
110               name_, this, p.first, ConnectivityStateName(current_state),
111               ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
112     }
113     p.second->Notify(GRPC_CHANNEL_SHUTDOWN, absl::Status());
114   }
115 }
116 
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)117 void ConnectivityStateTracker::AddWatcher(
118     grpc_connectivity_state initial_state,
119     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
120   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
121     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
122             this, watcher.get());
123   }
124   grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
125   if (initial_state != current_state) {
126     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
127       gpr_log(GPR_INFO,
128               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
129               name_, this, watcher.get(), ConnectivityStateName(initial_state),
130               ConnectivityStateName(current_state));
131     }
132     watcher->Notify(current_state, status_);
133   }
134   // If we're in state SHUTDOWN, don't add the watcher, so that it will
135   // be orphaned immediately.
136   if (current_state != GRPC_CHANNEL_SHUTDOWN) {
137     watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
138   }
139 }
140 
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)141 void ConnectivityStateTracker::RemoveWatcher(
142     ConnectivityStateWatcherInterface* watcher) {
143   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
144     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
145             name_, this, watcher);
146   }
147   watchers_.erase(watcher);
148 }
149 
SetState(grpc_connectivity_state state,const absl::Status & status,const char * reason)150 void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
151                                         const absl::Status& status,
152                                         const char* reason) {
153   grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
154   if (state == current_state) return;
155   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
156     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s, %s)",
157             name_, this, ConnectivityStateName(current_state),
158             ConnectivityStateName(state), reason, status.ToString().c_str());
159   }
160   state_.Store(state, MemoryOrder::RELAXED);
161   status_ = status;
162   for (const auto& p : watchers_) {
163     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
164       gpr_log(GPR_INFO,
165               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
166               name_, this, p.first, ConnectivityStateName(current_state),
167               ConnectivityStateName(state));
168     }
169     p.second->Notify(state, status);
170   }
171   // If the new state is SHUTDOWN, orphan all of the watchers.  This
172   // avoids the need for the callers to explicitly cancel them.
173   if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
174 }
175 
state() const176 grpc_connectivity_state ConnectivityStateTracker::state() const {
177   grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED);
178   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
179     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
180             name_, this, ConnectivityStateName(state));
181   }
182   return state;
183 }
184 
185 }  // namespace grpc_core
186