1 /* 2 * Copyright 2017 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.internal; 18 19 import static io.grpc.internal.TimeProvider.SYSTEM_TIME_PROVIDER; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.Preconditions; 23 import io.grpc.InternalChannelz.TransportStats; 24 25 /** 26 * A class for gathering statistics about a transport. This is an experimental feature. 27 * Can only be called from the transport thread unless otherwise noted. 28 */ 29 public final class TransportTracer { 30 private static final Factory DEFAULT_FACTORY = new Factory(SYSTEM_TIME_PROVIDER); 31 32 private final TimeProvider timeProvider; 33 private long streamsStarted; 34 private long lastLocalStreamCreatedTimeNanos; 35 private long lastRemoteStreamCreatedTimeNanos; 36 private long streamsSucceeded; 37 private long streamsFailed; 38 private long keepAlivesSent; 39 private FlowControlReader flowControlWindowReader; 40 41 private long messagesSent; 42 private long lastMessageSentTimeNanos; 43 // deframing happens on the application thread, and there's no easy way to avoid synchronization 44 private final LongCounter messagesReceived = LongCounterFactory.create(); 45 private volatile long lastMessageReceivedTimeNanos; 46 TransportTracer()47 public TransportTracer() { 48 this.timeProvider = SYSTEM_TIME_PROVIDER; 49 } 50 TransportTracer(TimeProvider timeProvider)51 private TransportTracer(TimeProvider timeProvider) { 52 this.timeProvider = timeProvider; 53 } 54 55 /** 56 * Returns a read only set of current stats. 57 */ getStats()58 public TransportStats getStats() { 59 long localFlowControlWindow = 60 flowControlWindowReader == null ? -1 : flowControlWindowReader.read().localBytes; 61 long remoteFlowControlWindow = 62 flowControlWindowReader == null ? -1 : flowControlWindowReader.read().remoteBytes; 63 return new TransportStats( 64 streamsStarted, 65 lastLocalStreamCreatedTimeNanos, 66 lastRemoteStreamCreatedTimeNanos, 67 streamsSucceeded, 68 streamsFailed, 69 messagesSent, 70 messagesReceived.value(), 71 keepAlivesSent, 72 lastMessageSentTimeNanos, 73 lastMessageReceivedTimeNanos, 74 localFlowControlWindow, 75 remoteFlowControlWindow); 76 } 77 78 /** 79 * Called by the client to report a stream has started. 80 */ reportLocalStreamStarted()81 public void reportLocalStreamStarted() { 82 streamsStarted++; 83 lastLocalStreamCreatedTimeNanos = timeProvider.currentTimeNanos(); 84 } 85 86 /** 87 * Called by the server to report a stream has started. 88 */ reportRemoteStreamStarted()89 public void reportRemoteStreamStarted() { 90 streamsStarted++; 91 lastRemoteStreamCreatedTimeNanos = timeProvider.currentTimeNanos(); 92 } 93 94 /** 95 * Reports that a stream closed with the specified Status. 96 */ reportStreamClosed(boolean success)97 public void reportStreamClosed(boolean success) { 98 if (success) { 99 streamsSucceeded++; 100 } else { 101 streamsFailed++; 102 } 103 } 104 105 /** 106 * Reports that some messages were successfully sent. {@code numMessages} must be at least 0. 107 */ reportMessageSent(int numMessages)108 public void reportMessageSent(int numMessages) { 109 if (numMessages == 0) { 110 return; 111 } 112 messagesSent += numMessages; 113 lastMessageSentTimeNanos = timeProvider.currentTimeNanos(); 114 } 115 116 /** 117 * Reports that a message was successfully received. This method is thread safe. 118 */ reportMessageReceived()119 public void reportMessageReceived() { 120 messagesReceived.add(1); 121 lastMessageReceivedTimeNanos = timeProvider.currentTimeNanos(); 122 } 123 124 /** 125 * Reports that a keep alive message was sent. 126 */ reportKeepAliveSent()127 public void reportKeepAliveSent() { 128 keepAlivesSent++; 129 } 130 131 /** 132 * Registers a {@link FlowControlReader} that can be used to read the local and remote flow 133 * control window sizes. 134 */ setFlowControlWindowReader(FlowControlReader flowControlWindowReader)135 public void setFlowControlWindowReader(FlowControlReader flowControlWindowReader) { 136 this.flowControlWindowReader = Preconditions.checkNotNull(flowControlWindowReader); 137 } 138 139 /** 140 * A container that holds the local and remote flow control window sizes. 141 */ 142 public static final class FlowControlWindows { 143 public final long remoteBytes; 144 public final long localBytes; 145 FlowControlWindows(long localBytes, long remoteBytes)146 public FlowControlWindows(long localBytes, long remoteBytes) { 147 this.localBytes = localBytes; 148 this.remoteBytes = remoteBytes; 149 } 150 } 151 152 /** 153 * An interface for reading the local and remote flow control windows of the transport. 154 */ 155 public interface FlowControlReader { read()156 FlowControlWindows read(); 157 } 158 159 public static final class Factory { 160 private TimeProvider timeProvider; 161 162 @VisibleForTesting Factory(TimeProvider timeProvider)163 public Factory(TimeProvider timeProvider) { 164 this.timeProvider = timeProvider; 165 } 166 create()167 public TransportTracer create() { 168 return new TransportTracer(timeProvider); 169 } 170 } 171 getDefaultFactory()172 public static Factory getDefaultFactory() { 173 return DEFAULT_FACTORY; 174 } 175 } 176