• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2020 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.binder.internal;
18 
19 import io.grpc.Attributes;
20 import io.grpc.Compressor;
21 import io.grpc.Deadline;
22 import io.grpc.DecompressorRegistry;
23 import io.grpc.Status;
24 import io.grpc.StatusException;
25 import io.grpc.internal.ClientStream;
26 import io.grpc.internal.ClientStreamListener;
27 import io.grpc.internal.InsightBuilder;
28 import java.io.InputStream;
29 import javax.annotation.Nonnull;
30 import javax.annotation.Nullable;
31 
32 /**
33  * The client side of a single RPC, which sends a single request message.
34  *
35  * <p>An instance of this class is effectively a go-between, receiving messages from the gRPC
36  * ClientCall instance (via calls on the ClientStream interface we implement), and sending them out
37  * on the transport, as well as receiving messages from the transport, and passing the resultant
38  * data back to the gRPC ClientCall instance (via calls on the ClientStreamListener instance we're
39  * given).
40  *
41  * <p>These two communication directions are largely independent of each other, with the {@link
42  * Outbound} handling the gRPC to transport direction, and the {@link Inbound} class handling
43  * transport to gRPC direction.
44  *
45  * <p>Since the Inbound and Outbound halves are largely independent, their state is also
46  * synchronized independently.
47  */
48 final class SingleMessageClientStream implements ClientStream {
49 
50   private final Inbound.ClientInbound inbound;
51   private final Outbound.ClientOutbound outbound;
52   private final Attributes attributes;
53 
54   @Nullable private InputStream pendingSingleMessage;
55   @Nullable private Deadline pendingDeadline;
56 
SingleMessageClientStream( Inbound.ClientInbound inbound, Outbound.ClientOutbound outbound, Attributes attributes)57   SingleMessageClientStream(
58       Inbound.ClientInbound inbound, Outbound.ClientOutbound outbound, Attributes attributes) {
59     this.inbound = inbound;
60     this.outbound = outbound;
61     this.attributes = attributes;
62   }
63 
64   @Override
start(ClientStreamListener listener)65   public void start(ClientStreamListener listener) {
66     synchronized (inbound) {
67       inbound.init(outbound, listener);
68     }
69     if (outbound.isReady()) {
70       listener.onReady();
71     }
72   }
73 
74   @Override
isReady()75   public boolean isReady() {
76     return outbound.isReady();
77   }
78 
79   @Override
request(int numMessages)80   public void request(int numMessages) {
81     synchronized (inbound) {
82       inbound.requestMessages(numMessages);
83     }
84   }
85 
86   @Override
writeMessage(InputStream message)87   public void writeMessage(InputStream message) {
88     if (pendingSingleMessage != null) {
89       synchronized (inbound) {
90         inbound.closeAbnormal(Status.INTERNAL.withDescription("too many messages"));
91       }
92     } else {
93       pendingSingleMessage = message;
94     }
95   }
96 
97   @Override
halfClose()98   public void halfClose() {
99     try {
100       synchronized (outbound) {
101         if (pendingDeadline != null) {
102           outbound.setDeadline(pendingDeadline);
103         }
104         outbound.onPrefixReady();
105         outbound.sendSingleMessageAndHalfClose(pendingSingleMessage);
106       }
107     } catch (StatusException se) {
108       synchronized (inbound) {
109         inbound.closeAbnormal(se.getStatus());
110       }
111     }
112   }
113 
114   @Override
cancel(Status status)115   public void cancel(Status status) {
116     synchronized (inbound) {
117       inbound.closeOnCancel(status);
118     }
119   }
120 
121   @Override
setDeadline(@onnull Deadline deadline)122   public void setDeadline(@Nonnull Deadline deadline) {
123     this.pendingDeadline = deadline;
124   }
125 
126   @Override
getAttributes()127   public Attributes getAttributes() {
128     return attributes;
129   }
130 
131   @Override
toString()132   public final String toString() {
133     return "SingleMessageClientStream[" + inbound + "/" + outbound + "]";
134   }
135 
136   // =====================
137   // Misc stubbed & unsupported methods.
138 
139   @Override
flush()140   public final void flush() {
141     // Ignore.
142   }
143 
144   @Override
setCompressor(Compressor compressor)145   public final void setCompressor(Compressor compressor) {
146     // Ignore.
147   }
148 
149   @Override
setMessageCompression(boolean enable)150   public final void setMessageCompression(boolean enable) {
151     // Ignore.
152   }
153 
154   @Override
setAuthority(String authority)155   public void setAuthority(String authority) {
156     // Ignore.
157   }
158 
159   @Override
setMaxInboundMessageSize(int maxSize)160   public void setMaxInboundMessageSize(int maxSize) {
161     // Ignore.
162   }
163 
164   @Override
setMaxOutboundMessageSize(int maxSize)165   public void setMaxOutboundMessageSize(int maxSize) {
166     // Ignore.
167   }
168 
169   @Override
appendTimeoutInsight(InsightBuilder insight)170   public void appendTimeoutInsight(InsightBuilder insight) {
171     // Ignore
172   }
173 
174   @Override
setFullStreamDecompression(boolean fullStreamDecompression)175   public void setFullStreamDecompression(boolean fullStreamDecompression) {
176     // Ignore.
177   }
178 
179   @Override
setDecompressorRegistry(DecompressorRegistry decompressorRegistry)180   public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) {
181     // Ignore.
182   }
183 
184   @Override
optimizeForDirectExecutor()185   public void optimizeForDirectExecutor() {
186     // Ignore.
187   }
188 }
189