• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import io.grpc.ConnectivityState;
22 import io.grpc.ManagedChannel;
23 import java.util.ArrayList;
24 import java.util.concurrent.Executor;
25 import javax.annotation.Nonnull;
26 import javax.annotation.concurrent.NotThreadSafe;
27 
28 /**
29  * Manages connectivity states of the channel. Used for {@link ManagedChannel#getState} to read the
30  * current state of the channel, for {@link ManagedChannel#notifyWhenStateChanged} to add
31  * listeners to state change events, and for {@link io.grpc.LoadBalancer.Helper#updateBalancingState
32  * LoadBalancer.Helper#updateBalancingState} to update the state and run the {@link #gotoState}s.
33  */
34 @NotThreadSafe
35 final class ConnectivityStateManager {
36   private ArrayList<Listener> listeners = new ArrayList<>();
37 
38   private volatile ConnectivityState state = ConnectivityState.IDLE;
39 
40   /**
41    * Adds a listener for state change event.
42    *
43    * <p>The {@code executor} must be one that can run RPC call listeners.
44    */
notifyWhenStateChanged(Runnable callback, Executor executor, ConnectivityState source)45   void notifyWhenStateChanged(Runnable callback, Executor executor, ConnectivityState source) {
46     checkNotNull(callback, "callback");
47     checkNotNull(executor, "executor");
48     checkNotNull(source, "source");
49 
50     Listener stateChangeListener = new Listener(callback, executor);
51     if (state != source) {
52       stateChangeListener.runInExecutor();
53     } else {
54       listeners.add(stateChangeListener);
55     }
56   }
57 
58   /**
59    * Connectivity state is changed to the specified value. Will trigger some notifications that have
60    * been registered earlier by {@link ManagedChannel#notifyWhenStateChanged}.
61    */
gotoState(@onnull ConnectivityState newState)62   void gotoState(@Nonnull ConnectivityState newState) {
63     checkNotNull(newState, "newState");
64     if (state != newState && state != ConnectivityState.SHUTDOWN) {
65       state = newState;
66       if (listeners.isEmpty()) {
67         return;
68       }
69       // Swap out callback list before calling them, because a callback may register new callbacks,
70       // if run in direct executor, can cause ConcurrentModificationException.
71       ArrayList<Listener> savedListeners = listeners;
72       listeners = new ArrayList<>();
73       for (Listener listener : savedListeners) {
74         listener.runInExecutor();
75       }
76     }
77   }
78 
79   /**
80    * Gets the current connectivity state of the channel. This method is threadsafe.
81    */
getState()82   ConnectivityState getState() {
83     ConnectivityState stateCopy = state;
84     if (stateCopy == null) {
85       throw new UnsupportedOperationException("Channel state API is not implemented");
86     }
87     return stateCopy;
88   }
89 
90   private static final class Listener {
91     final Runnable callback;
92     final Executor executor;
93 
Listener(Runnable callback, Executor executor)94     Listener(Runnable callback, Executor executor) {
95       this.callback = callback;
96       this.executor = executor;
97     }
98 
runInExecutor()99     void runInExecutor() {
100       executor.execute(callback);
101     }
102   }
103 }
104