• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/transport/connectivity_state.h"
22 
23 #include <string.h>
24 
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/log.h>
27 #include <grpc/support/string_util.h>
28 
29 #include "src/core/lib/iomgr/combiner.h"
30 #include "src/core/lib/iomgr/exec_ctx.h"
31 
32 namespace grpc_core {
33 
34 TraceFlag grpc_connectivity_state_trace(false, "connectivity_state");
35 
ConnectivityStateName(grpc_connectivity_state state)36 const char* ConnectivityStateName(grpc_connectivity_state state) {
37   switch (state) {
38     case GRPC_CHANNEL_IDLE:
39       return "IDLE";
40     case GRPC_CHANNEL_CONNECTING:
41       return "CONNECTING";
42     case GRPC_CHANNEL_READY:
43       return "READY";
44     case GRPC_CHANNEL_TRANSIENT_FAILURE:
45       return "TRANSIENT_FAILURE";
46     case GRPC_CHANNEL_SHUTDOWN:
47       return "SHUTDOWN";
48   }
49   GPR_UNREACHABLE_CODE(return "UNKNOWN");
50 }
51 
52 //
53 // AsyncConnectivityStateWatcherInterface
54 //
55 
56 // A fire-and-forget class to asynchronously deliver a connectivity
57 // state notification to a watcher.
58 class AsyncConnectivityStateWatcherInterface::Notifier {
59  public:
Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,grpc_connectivity_state state,const std::shared_ptr<WorkSerializer> & work_serializer)60   Notifier(RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher,
61            grpc_connectivity_state state,
62            const std::shared_ptr<WorkSerializer>& work_serializer)
63       : watcher_(std::move(watcher)), state_(state) {
64     if (work_serializer != nullptr) {
65       work_serializer->Run(
66           [this]() { SendNotification(this, GRPC_ERROR_NONE); },
67           DEBUG_LOCATION);
68     } else {
69       GRPC_CLOSURE_INIT(&closure_, SendNotification, this,
70                         grpc_schedule_on_exec_ctx);
71       ExecCtx::Run(DEBUG_LOCATION, &closure_, GRPC_ERROR_NONE);
72     }
73   }
74 
75  private:
SendNotification(void * arg,grpc_error *)76   static void SendNotification(void* arg, grpc_error* /*ignored*/) {
77     Notifier* self = static_cast<Notifier*>(arg);
78     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
79       gpr_log(GPR_INFO, "watcher %p: delivering async notification for %s",
80               self->watcher_.get(), ConnectivityStateName(self->state_));
81     }
82     self->watcher_->OnConnectivityStateChange(self->state_);
83     delete self;
84   }
85 
86   RefCountedPtr<AsyncConnectivityStateWatcherInterface> watcher_;
87   const grpc_connectivity_state state_;
88   grpc_closure closure_;
89 };
90 
Notify(grpc_connectivity_state state)91 void AsyncConnectivityStateWatcherInterface::Notify(
92     grpc_connectivity_state state) {
93   new Notifier(Ref(), state, work_serializer_);  // Deletes itself when done.
94 }
95 
96 //
97 // ConnectivityStateTracker
98 //
99 
~ConnectivityStateTracker()100 ConnectivityStateTracker::~ConnectivityStateTracker() {
101   grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
102   if (current_state == GRPC_CHANNEL_SHUTDOWN) return;
103   for (const auto& p : watchers_) {
104     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
105       gpr_log(GPR_INFO,
106               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
107               name_, this, p.first, ConnectivityStateName(current_state),
108               ConnectivityStateName(GRPC_CHANNEL_SHUTDOWN));
109     }
110     p.second->Notify(GRPC_CHANNEL_SHUTDOWN);
111   }
112 }
113 
AddWatcher(grpc_connectivity_state initial_state,OrphanablePtr<ConnectivityStateWatcherInterface> watcher)114 void ConnectivityStateTracker::AddWatcher(
115     grpc_connectivity_state initial_state,
116     OrphanablePtr<ConnectivityStateWatcherInterface> watcher) {
117   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
118     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: add watcher %p", name_,
119             this, watcher.get());
120   }
121   grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
122   if (initial_state != current_state) {
123     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
124       gpr_log(GPR_INFO,
125               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
126               name_, this, watcher.get(), ConnectivityStateName(initial_state),
127               ConnectivityStateName(current_state));
128     }
129     watcher->Notify(current_state);
130   }
131   // If we're in state SHUTDOWN, don't add the watcher, so that it will
132   // be orphaned immediately.
133   if (current_state != GRPC_CHANNEL_SHUTDOWN) {
134     watchers_.insert(std::make_pair(watcher.get(), std::move(watcher)));
135   }
136 }
137 
RemoveWatcher(ConnectivityStateWatcherInterface * watcher)138 void ConnectivityStateTracker::RemoveWatcher(
139     ConnectivityStateWatcherInterface* watcher) {
140   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
141     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: remove watcher %p",
142             name_, this, watcher);
143   }
144   watchers_.erase(watcher);
145 }
146 
SetState(grpc_connectivity_state state,const char * reason)147 void ConnectivityStateTracker::SetState(grpc_connectivity_state state,
148                                         const char* reason) {
149   grpc_connectivity_state current_state = state_.Load(MemoryOrder::RELAXED);
150   if (state == current_state) return;
151   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
152     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: %s -> %s (%s)", name_,
153             this, ConnectivityStateName(current_state),
154             ConnectivityStateName(state), reason);
155   }
156   state_.Store(state, MemoryOrder::RELAXED);
157   for (const auto& p : watchers_) {
158     if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
159       gpr_log(GPR_INFO,
160               "ConnectivityStateTracker %s[%p]: notifying watcher %p: %s -> %s",
161               name_, this, p.first, ConnectivityStateName(current_state),
162               ConnectivityStateName(state));
163     }
164     p.second->Notify(state);
165   }
166   // If the new state is SHUTDOWN, orphan all of the watchers.  This
167   // avoids the need for the callers to explicitly cancel them.
168   if (state == GRPC_CHANNEL_SHUTDOWN) watchers_.clear();
169 }
170 
state() const171 grpc_connectivity_state ConnectivityStateTracker::state() const {
172   grpc_connectivity_state state = state_.Load(MemoryOrder::RELAXED);
173   if (GRPC_TRACE_FLAG_ENABLED(grpc_connectivity_state_trace)) {
174     gpr_log(GPR_INFO, "ConnectivityStateTracker %s[%p]: get current state: %s",
175             name_, this, ConnectivityStateName(state));
176   }
177   return state;
178 }
179 
180 }  // namespace grpc_core
181