• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2016 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.internal;
18 
19 import static com.google.common.base.Preconditions.checkNotNull;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import io.grpc.CallOptions;
23 import io.grpc.ClientStreamTracer;
24 import io.grpc.Context;
25 import io.grpc.Metadata;
26 import io.grpc.ServerStreamTracer;
27 import io.grpc.ServerStreamTracer.ServerCallInfo;
28 import io.grpc.Status;
29 import io.grpc.StreamTracer;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.List;
33 import java.util.concurrent.atomic.AtomicBoolean;
34 import javax.annotation.concurrent.ThreadSafe;
35 
36 /**
37  * The stats and tracing information for a stream.
38  */
39 @ThreadSafe
40 public final class StatsTraceContext {
41   public static final StatsTraceContext NOOP = new StatsTraceContext(new StreamTracer[0]);
42 
43   private final StreamTracer[] tracers;
44   private final AtomicBoolean closed = new AtomicBoolean(false);
45 
46   /**
47    * Factory method for the client-side.
48    */
newClientContext(CallOptions callOptions, Metadata headers)49   public static StatsTraceContext newClientContext(CallOptions callOptions, Metadata headers) {
50     List<ClientStreamTracer.Factory> factories = callOptions.getStreamTracerFactories();
51     if (factories.isEmpty()) {
52       return NOOP;
53     }
54     // This array will be iterated multiple times per RPC. Use primitive array instead of Collection
55     // so that for-each doesn't create an Iterator every time.
56     StreamTracer[] tracers = new StreamTracer[factories.size()];
57     for (int i = 0; i < tracers.length; i++) {
58       tracers[i] = factories.get(i).newClientStreamTracer(callOptions, headers);
59     }
60     return new StatsTraceContext(tracers);
61   }
62 
63   /**
64    * Factory method for the server-side.
65    */
newServerContext( List<ServerStreamTracer.Factory> factories, String fullMethodName, Metadata headers)66   public static StatsTraceContext newServerContext(
67       List<ServerStreamTracer.Factory> factories, String fullMethodName, Metadata headers) {
68     if (factories.isEmpty()) {
69       return NOOP;
70     }
71     StreamTracer[] tracers = new StreamTracer[factories.size()];
72     for (int i = 0; i < tracers.length; i++) {
73       tracers[i] = factories.get(i).newServerStreamTracer(fullMethodName, headers);
74     }
75     return new StatsTraceContext(tracers);
76   }
77 
78   @VisibleForTesting
StatsTraceContext(StreamTracer[] tracers)79   StatsTraceContext(StreamTracer[] tracers) {
80     this.tracers = tracers;
81   }
82 
83   /**
84    * Returns a copy of the tracer list.
85    */
86   @VisibleForTesting
getTracersForTest()87   public List<StreamTracer> getTracersForTest() {
88     return new ArrayList<>(Arrays.asList(tracers));
89   }
90 
91   /**
92    * See {@link ClientStreamTracer#outboundHeaders}.  For client-side only.
93    *
94    * <p>Transport-specific, thus should be called by transport implementations.
95    */
clientOutboundHeaders()96   public void clientOutboundHeaders() {
97     for (StreamTracer tracer : tracers) {
98       ((ClientStreamTracer) tracer).outboundHeaders();
99     }
100   }
101 
102   /**
103    * See {@link ClientStreamTracer#inboundHeaders}.  For client-side only.
104    *
105    * <p>Called from abstract stream implementations.
106    */
clientInboundHeaders()107   public void clientInboundHeaders() {
108     for (StreamTracer tracer : tracers) {
109       ((ClientStreamTracer) tracer).inboundHeaders();
110     }
111   }
112 
113   /**
114    * See {@link ServerStreamTracer#filterContext}.  For server-side only.
115    *
116    * <p>Called from {@link io.grpc.internal.ServerImpl}.
117    */
serverFilterContext(Context context)118   public <ReqT, RespT> Context serverFilterContext(Context context) {
119     Context ctx = checkNotNull(context, "context");
120     for (StreamTracer tracer : tracers) {
121       ctx = ((ServerStreamTracer) tracer).filterContext(ctx);
122       checkNotNull(ctx, "%s returns null context", tracer);
123     }
124     return ctx;
125   }
126 
127   /**
128    * See {@link ServerStreamTracer#serverCallStarted}.  For server-side only.
129    *
130    * <p>Called from {@link io.grpc.internal.ServerImpl}.
131    */
serverCallStarted(ServerCallInfo<?, ?> callInfo)132   public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
133     for (StreamTracer tracer : tracers) {
134       ((ServerStreamTracer) tracer).serverCallStarted(callInfo);
135     }
136   }
137 
138   /**
139    * See {@link StreamTracer#streamClosed}. This may be called multiple times, and only the first
140    * value will be taken.
141    *
142    * <p>Called from abstract stream implementations.
143    */
streamClosed(Status status)144   public void streamClosed(Status status) {
145     if (closed.compareAndSet(false, true)) {
146       for (StreamTracer tracer : tracers) {
147         tracer.streamClosed(status);
148       }
149     }
150   }
151 
152   /**
153    * See {@link StreamTracer#outboundMessage(int)}.
154    *
155    * <p>Called from {@link io.grpc.internal.Framer}.
156    */
outboundMessage(int seqNo)157   public void outboundMessage(int seqNo) {
158     for (StreamTracer tracer : tracers) {
159       tracer.outboundMessage(seqNo);
160     }
161   }
162 
163   /**
164    * See {@link StreamTracer#inboundMessage(int)}.
165    *
166    * <p>Called from {@link io.grpc.internal.MessageDeframer}.
167    */
inboundMessage(int seqNo)168   public void inboundMessage(int seqNo) {
169     for (StreamTracer tracer : tracers) {
170       tracer.inboundMessage(seqNo);
171     }
172   }
173 
174   /**
175    * See {@link StreamTracer#outboundMessageSent}.
176    *
177    * <p>Called from {@link io.grpc.internal.Framer}.
178    */
outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize)179   public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
180     for (StreamTracer tracer : tracers) {
181       tracer.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
182     }
183   }
184 
185   /**
186    * See {@link StreamTracer#inboundMessageRead}.
187    *
188    * <p>Called from {@link io.grpc.internal.MessageDeframer}.
189    */
inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize)190   public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
191     for (StreamTracer tracer : tracers) {
192       tracer.inboundMessageRead(seqNo, optionalWireSize, optionalUncompressedSize);
193     }
194   }
195 
196   /**
197    * See {@link StreamTracer#outboundUncompressedSize}.
198    *
199    * <p>Called from {@link io.grpc.internal.Framer}.
200    */
outboundUncompressedSize(long bytes)201   public void outboundUncompressedSize(long bytes) {
202     for (StreamTracer tracer : tracers) {
203       tracer.outboundUncompressedSize(bytes);
204     }
205   }
206 
207   /**
208    * See {@link StreamTracer#outboundWireSize}.
209    *
210    * <p>Called from {@link io.grpc.internal.Framer}.
211    */
outboundWireSize(long bytes)212   public void outboundWireSize(long bytes) {
213     for (StreamTracer tracer : tracers) {
214       tracer.outboundWireSize(bytes);
215     }
216   }
217 
218   /**
219    * See {@link StreamTracer#inboundUncompressedSize}.
220    *
221    * <p>Called from {@link io.grpc.internal.MessageDeframer}.
222    */
inboundUncompressedSize(long bytes)223   public void inboundUncompressedSize(long bytes) {
224     for (StreamTracer tracer : tracers) {
225       tracer.inboundUncompressedSize(bytes);
226     }
227   }
228 
229   /**
230    * See {@link StreamTracer#inboundWireSize}.
231    *
232    * <p>Called from {@link io.grpc.internal.MessageDeframer}.
233    */
inboundWireSize(long bytes)234   public void inboundWireSize(long bytes) {
235     for (StreamTracer tracer : tracers) {
236       tracer.inboundWireSize(bytes);
237     }
238   }
239 }
240