• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.grpclb;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.protobuf.util.Timestamps;
22 import io.grpc.ClientStreamTracer;
23 import io.grpc.Metadata;
24 import io.grpc.Status;
25 import io.grpc.internal.TimeProvider;
26 import io.grpc.lb.v1.ClientStats;
27 import io.grpc.lb.v1.ClientStatsPerToken;
28 import java.util.Collections;
29 import java.util.HashMap;
30 import java.util.Map;
31 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
32 import javax.annotation.concurrent.GuardedBy;
33 import javax.annotation.concurrent.ThreadSafe;
34 
35 /**
36  * Record and aggregate client-side load data for GRPCLB.  This records load occurred during the
37  * span of an LB stream with the remote load-balancer.
38  */
39 @ThreadSafe
40 final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory {
41 
42   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsStartedUpdater =
43       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsStarted");
44   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFinishedUpdater =
45       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFinished");
46   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFailedToSendUpdater =
47       AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFailedToSend");
48   private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder>
49       callsFinishedKnownReceivedUpdater =
50           AtomicLongFieldUpdater.newUpdater(
51               GrpclbClientLoadRecorder.class, "callsFinishedKnownReceived");
52 
53   private final TimeProvider time;
54   @SuppressWarnings("unused")
55   private volatile long callsStarted;
56   @SuppressWarnings("unused")
57   private volatile long callsFinished;
58 
59   private static final class LongHolder {
60     long num;
61   }
62 
63   // Specific finish types
64   @GuardedBy("this")
65   private Map<String, LongHolder> callsDroppedPerToken = new HashMap<>(1);
66   @SuppressWarnings("unused")
67   private volatile long callsFailedToSend;
68   @SuppressWarnings("unused")
69   private volatile long callsFinishedKnownReceived;
70 
GrpclbClientLoadRecorder(TimeProvider time)71   GrpclbClientLoadRecorder(TimeProvider time) {
72     this.time = checkNotNull(time, "time provider");
73   }
74 
75   @Override
newClientStreamTracer( ClientStreamTracer.StreamInfo info, Metadata headers)76   public ClientStreamTracer newClientStreamTracer(
77       ClientStreamTracer.StreamInfo info, Metadata headers) {
78     callsStartedUpdater.getAndIncrement(this);
79     return new StreamTracer();
80   }
81 
82   /**
83    * Records that a request has been dropped as instructed by the remote balancer.
84    */
recordDroppedRequest(String token)85   void recordDroppedRequest(String token) {
86     callsStartedUpdater.getAndIncrement(this);
87     callsFinishedUpdater.getAndIncrement(this);
88 
89     synchronized (this) {
90       LongHolder holder;
91       if ((holder = callsDroppedPerToken.get(token)) == null) {
92         callsDroppedPerToken.put(token, (holder = new LongHolder()));
93       }
94       holder.num++;
95     }
96   }
97 
98   /**
99    * Generate the report with the data recorded this LB stream since the last report.
100    */
generateLoadReport()101   ClientStats generateLoadReport() {
102     ClientStats.Builder statsBuilder =
103         ClientStats.newBuilder()
104         .setTimestamp(Timestamps.fromNanos(time.currentTimeNanos()))
105         .setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0))
106         .setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0))
107         .setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0))
108         .setNumCallsFinishedKnownReceived(callsFinishedKnownReceivedUpdater.getAndSet(this, 0));
109 
110     Map<String, LongHolder> localCallsDroppedPerToken = Collections.emptyMap();
111     synchronized (this) {
112       if (!callsDroppedPerToken.isEmpty()) {
113         localCallsDroppedPerToken = callsDroppedPerToken;
114         callsDroppedPerToken = new HashMap<>(localCallsDroppedPerToken.size());
115       }
116     }
117     for (Map.Entry<String, LongHolder> entry : localCallsDroppedPerToken.entrySet()) {
118       statsBuilder.addCallsFinishedWithDrop(
119           ClientStatsPerToken.newBuilder()
120               .setLoadBalanceToken(entry.getKey())
121               .setNumCalls(entry.getValue().num)
122               .build());
123     }
124     return statsBuilder.build();
125   }
126 
127   private class StreamTracer extends ClientStreamTracer {
128     private volatile boolean headersSent;
129     private volatile boolean anythingReceived;
130 
131     @Override
outboundHeaders()132     public void outboundHeaders() {
133       headersSent = true;
134     }
135 
136     @Override
inboundHeaders()137     public void inboundHeaders() {
138       anythingReceived = true;
139     }
140 
141     @Override
inboundMessage(int seqNo)142     public void inboundMessage(int seqNo) {
143       anythingReceived = true;
144     }
145 
146     @Override
streamClosed(Status status)147     public void streamClosed(Status status) {
148       callsFinishedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
149       if (!headersSent) {
150         callsFailedToSendUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
151       }
152       if (anythingReceived) {
153         callsFinishedKnownReceivedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this);
154       }
155     }
156   }
157 }
158