1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.binder.internal; 18 19 import io.grpc.Attributes; 20 import io.grpc.Compressor; 21 import io.grpc.Deadline; 22 import io.grpc.DecompressorRegistry; 23 import io.grpc.Status; 24 import io.grpc.StatusException; 25 import io.grpc.internal.ClientStream; 26 import io.grpc.internal.ClientStreamListener; 27 import io.grpc.internal.InsightBuilder; 28 import java.io.InputStream; 29 import javax.annotation.Nonnull; 30 import javax.annotation.Nullable; 31 32 /** 33 * The client side of a single RPC, which sends a single request message. 34 * 35 * <p>An instance of this class is effectively a go-between, receiving messages from the gRPC 36 * ClientCall instance (via calls on the ClientStream interface we implement), and sending them out 37 * on the transport, as well as receiving messages from the transport, and passing the resultant 38 * data back to the gRPC ClientCall instance (via calls on the ClientStreamListener instance we're 39 * given). 40 * 41 * <p>These two communication directions are largely independent of each other, with the {@link 42 * Outbound} handling the gRPC to transport direction, and the {@link Inbound} class handling 43 * transport to gRPC direction. 44 * 45 * <p>Since the Inbound and Outbound halves are largely independent, their state is also 46 * synchronized independently. 47 */ 48 final class SingleMessageClientStream implements ClientStream { 49 50 private final Inbound.ClientInbound inbound; 51 private final Outbound.ClientOutbound outbound; 52 private final Attributes attributes; 53 54 @Nullable private InputStream pendingSingleMessage; 55 @Nullable private Deadline pendingDeadline; 56 SingleMessageClientStream( Inbound.ClientInbound inbound, Outbound.ClientOutbound outbound, Attributes attributes)57 SingleMessageClientStream( 58 Inbound.ClientInbound inbound, Outbound.ClientOutbound outbound, Attributes attributes) { 59 this.inbound = inbound; 60 this.outbound = outbound; 61 this.attributes = attributes; 62 } 63 64 @Override start(ClientStreamListener listener)65 public void start(ClientStreamListener listener) { 66 synchronized (inbound) { 67 inbound.init(outbound, listener); 68 } 69 if (outbound.isReady()) { 70 listener.onReady(); 71 } 72 } 73 74 @Override isReady()75 public boolean isReady() { 76 return outbound.isReady(); 77 } 78 79 @Override request(int numMessages)80 public void request(int numMessages) { 81 synchronized (inbound) { 82 inbound.requestMessages(numMessages); 83 } 84 } 85 86 @Override writeMessage(InputStream message)87 public void writeMessage(InputStream message) { 88 if (pendingSingleMessage != null) { 89 synchronized (inbound) { 90 inbound.closeAbnormal(Status.INTERNAL.withDescription("too many messages")); 91 } 92 } else { 93 pendingSingleMessage = message; 94 } 95 } 96 97 @Override halfClose()98 public void halfClose() { 99 try { 100 synchronized (outbound) { 101 if (pendingDeadline != null) { 102 outbound.setDeadline(pendingDeadline); 103 } 104 outbound.onPrefixReady(); 105 outbound.sendSingleMessageAndHalfClose(pendingSingleMessage); 106 } 107 } catch (StatusException se) { 108 synchronized (inbound) { 109 inbound.closeAbnormal(se.getStatus()); 110 } 111 } 112 } 113 114 @Override cancel(Status status)115 public void cancel(Status status) { 116 synchronized (inbound) { 117 inbound.closeOnCancel(status); 118 } 119 } 120 121 @Override setDeadline(@onnull Deadline deadline)122 public void setDeadline(@Nonnull Deadline deadline) { 123 this.pendingDeadline = deadline; 124 } 125 126 @Override getAttributes()127 public Attributes getAttributes() { 128 return attributes; 129 } 130 131 @Override toString()132 public final String toString() { 133 return "SingleMessageClientStream[" + inbound + "/" + outbound + "]"; 134 } 135 136 // ===================== 137 // Misc stubbed & unsupported methods. 138 139 @Override flush()140 public final void flush() { 141 // Ignore. 142 } 143 144 @Override setCompressor(Compressor compressor)145 public final void setCompressor(Compressor compressor) { 146 // Ignore. 147 } 148 149 @Override setMessageCompression(boolean enable)150 public final void setMessageCompression(boolean enable) { 151 // Ignore. 152 } 153 154 @Override setAuthority(String authority)155 public void setAuthority(String authority) { 156 // Ignore. 157 } 158 159 @Override setMaxInboundMessageSize(int maxSize)160 public void setMaxInboundMessageSize(int maxSize) { 161 // Ignore. 162 } 163 164 @Override setMaxOutboundMessageSize(int maxSize)165 public void setMaxOutboundMessageSize(int maxSize) { 166 // Ignore. 167 } 168 169 @Override appendTimeoutInsight(InsightBuilder insight)170 public void appendTimeoutInsight(InsightBuilder insight) { 171 // Ignore 172 } 173 174 @Override setFullStreamDecompression(boolean fullStreamDecompression)175 public void setFullStreamDecompression(boolean fullStreamDecompression) { 176 // Ignore. 177 } 178 179 @Override setDecompressorRegistry(DecompressorRegistry decompressorRegistry)180 public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 181 // Ignore. 182 } 183 184 @Override optimizeForDirectExecutor()185 public void optimizeForDirectExecutor() { 186 // Ignore. 187 } 188 } 189