1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.grpclb; 18 19 import static com.google.common.base.Preconditions.checkNotNull; 20 21 import com.google.protobuf.util.Timestamps; 22 import io.grpc.ClientStreamTracer; 23 import io.grpc.Metadata; 24 import io.grpc.Status; 25 import io.grpc.internal.TimeProvider; 26 import io.grpc.lb.v1.ClientStats; 27 import io.grpc.lb.v1.ClientStatsPerToken; 28 import java.util.Collections; 29 import java.util.HashMap; 30 import java.util.Map; 31 import java.util.concurrent.atomic.AtomicLongFieldUpdater; 32 import javax.annotation.concurrent.GuardedBy; 33 import javax.annotation.concurrent.ThreadSafe; 34 35 /** 36 * Record and aggregate client-side load data for GRPCLB. This records load occurred during the 37 * span of an LB stream with the remote load-balancer. 38 */ 39 @ThreadSafe 40 final class GrpclbClientLoadRecorder extends ClientStreamTracer.Factory { 41 42 private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsStartedUpdater = 43 AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsStarted"); 44 private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFinishedUpdater = 45 AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFinished"); 46 private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> callsFailedToSendUpdater = 47 AtomicLongFieldUpdater.newUpdater(GrpclbClientLoadRecorder.class, "callsFailedToSend"); 48 private static final AtomicLongFieldUpdater<GrpclbClientLoadRecorder> 49 callsFinishedKnownReceivedUpdater = 50 AtomicLongFieldUpdater.newUpdater( 51 GrpclbClientLoadRecorder.class, "callsFinishedKnownReceived"); 52 53 private final TimeProvider time; 54 @SuppressWarnings("unused") 55 private volatile long callsStarted; 56 @SuppressWarnings("unused") 57 private volatile long callsFinished; 58 59 private static final class LongHolder { 60 long num; 61 } 62 63 // Specific finish types 64 @GuardedBy("this") 65 private Map<String, LongHolder> callsDroppedPerToken = new HashMap<>(1); 66 @SuppressWarnings("unused") 67 private volatile long callsFailedToSend; 68 @SuppressWarnings("unused") 69 private volatile long callsFinishedKnownReceived; 70 GrpclbClientLoadRecorder(TimeProvider time)71 GrpclbClientLoadRecorder(TimeProvider time) { 72 this.time = checkNotNull(time, "time provider"); 73 } 74 75 @Override newClientStreamTracer( ClientStreamTracer.StreamInfo info, Metadata headers)76 public ClientStreamTracer newClientStreamTracer( 77 ClientStreamTracer.StreamInfo info, Metadata headers) { 78 callsStartedUpdater.getAndIncrement(this); 79 return new StreamTracer(); 80 } 81 82 /** 83 * Records that a request has been dropped as instructed by the remote balancer. 84 */ recordDroppedRequest(String token)85 void recordDroppedRequest(String token) { 86 callsStartedUpdater.getAndIncrement(this); 87 callsFinishedUpdater.getAndIncrement(this); 88 89 synchronized (this) { 90 LongHolder holder; 91 if ((holder = callsDroppedPerToken.get(token)) == null) { 92 callsDroppedPerToken.put(token, (holder = new LongHolder())); 93 } 94 holder.num++; 95 } 96 } 97 98 /** 99 * Generate the report with the data recorded this LB stream since the last report. 100 */ generateLoadReport()101 ClientStats generateLoadReport() { 102 ClientStats.Builder statsBuilder = 103 ClientStats.newBuilder() 104 .setTimestamp(Timestamps.fromNanos(time.currentTimeNanos())) 105 .setNumCallsStarted(callsStartedUpdater.getAndSet(this, 0)) 106 .setNumCallsFinished(callsFinishedUpdater.getAndSet(this, 0)) 107 .setNumCallsFinishedWithClientFailedToSend(callsFailedToSendUpdater.getAndSet(this, 0)) 108 .setNumCallsFinishedKnownReceived(callsFinishedKnownReceivedUpdater.getAndSet(this, 0)); 109 110 Map<String, LongHolder> localCallsDroppedPerToken = Collections.emptyMap(); 111 synchronized (this) { 112 if (!callsDroppedPerToken.isEmpty()) { 113 localCallsDroppedPerToken = callsDroppedPerToken; 114 callsDroppedPerToken = new HashMap<>(localCallsDroppedPerToken.size()); 115 } 116 } 117 for (Map.Entry<String, LongHolder> entry : localCallsDroppedPerToken.entrySet()) { 118 statsBuilder.addCallsFinishedWithDrop( 119 ClientStatsPerToken.newBuilder() 120 .setLoadBalanceToken(entry.getKey()) 121 .setNumCalls(entry.getValue().num) 122 .build()); 123 } 124 return statsBuilder.build(); 125 } 126 127 private class StreamTracer extends ClientStreamTracer { 128 private volatile boolean headersSent; 129 private volatile boolean anythingReceived; 130 131 @Override outboundHeaders()132 public void outboundHeaders() { 133 headersSent = true; 134 } 135 136 @Override inboundHeaders()137 public void inboundHeaders() { 138 anythingReceived = true; 139 } 140 141 @Override inboundMessage(int seqNo)142 public void inboundMessage(int seqNo) { 143 anythingReceived = true; 144 } 145 146 @Override streamClosed(Status status)147 public void streamClosed(Status status) { 148 callsFinishedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this); 149 if (!headersSent) { 150 callsFailedToSendUpdater.getAndIncrement(GrpclbClientLoadRecorder.this); 151 } 152 if (anythingReceived) { 153 callsFinishedKnownReceivedUpdater.getAndIncrement(GrpclbClientLoadRecorder.this); 154 } 155 } 156 } 157 } 158