1 /* 2 * Copyright 2020 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.binder.internal; 18 19 import io.grpc.Attributes; 20 import io.grpc.Compressor; 21 import io.grpc.Deadline; 22 import io.grpc.DecompressorRegistry; 23 import io.grpc.Status; 24 import io.grpc.StatusException; 25 import io.grpc.internal.ClientStream; 26 import io.grpc.internal.ClientStreamListener; 27 import io.grpc.internal.InsightBuilder; 28 import java.io.InputStream; 29 import javax.annotation.Nonnull; 30 31 /** 32 * The client side of a single RPC, which sends a stream of request messages. 33 * 34 * <p>An instance of this class is effectively a go-between, receiving messages from the gRPC 35 * ClientCall instance (via calls on the ClientStream interface we implement), and sending them out 36 * on the transport, as well as receiving messages from the transport, and passing the resultant 37 * data back to the gRPC ClientCall instance (via calls on the ClientStreamListener instance we're 38 * given). 39 * 40 * <p>These two communication directions are largely independent of each other, with the {@link 41 * Outbound} handling the gRPC to transport direction, and the {@link Inbound} class handling 42 * transport to gRPC direction. 43 * 44 * <p>Since the Inbound and Outbound halves are largely independent, their state is also 45 * synchronized independently. 46 */ 47 final class MultiMessageClientStream implements ClientStream { 48 49 private final Inbound.ClientInbound inbound; 50 private final Outbound.ClientOutbound outbound; 51 private final Attributes attributes; 52 MultiMessageClientStream( Inbound.ClientInbound inbound, Outbound.ClientOutbound outbound, Attributes attributes)53 MultiMessageClientStream( 54 Inbound.ClientInbound inbound, Outbound.ClientOutbound outbound, Attributes attributes) { 55 this.inbound = inbound; 56 this.outbound = outbound; 57 this.attributes = attributes; 58 } 59 60 @Override start(ClientStreamListener listener)61 public void start(ClientStreamListener listener) { 62 synchronized (inbound) { 63 inbound.init(outbound, listener); 64 } 65 if (outbound.isReady()) { 66 listener.onReady(); 67 } 68 try { 69 synchronized (outbound) { 70 // The ClientStream contract promises no more header changes after start(). 71 outbound.onPrefixReady(); 72 outbound.send(); 73 } 74 } catch (StatusException se) { 75 synchronized (inbound) { 76 inbound.closeAbnormal(se.getStatus()); 77 } 78 } 79 } 80 81 @Override request(int numMessages)82 public void request(int numMessages) { 83 synchronized (inbound) { 84 inbound.requestMessages(numMessages); 85 } 86 } 87 88 @Override isReady()89 public boolean isReady() { 90 return outbound.isReady(); 91 } 92 93 @Override writeMessage(InputStream message)94 public void writeMessage(InputStream message) { 95 try { 96 synchronized (outbound) { 97 outbound.addMessage(message); 98 outbound.send(); 99 } 100 } catch (StatusException se) { 101 synchronized (inbound) { 102 inbound.closeAbnormal(se.getStatus()); 103 } 104 } 105 } 106 107 @Override halfClose()108 public void halfClose() { 109 try { 110 synchronized (outbound) { 111 outbound.sendHalfClose(); 112 } 113 } catch (StatusException se) { 114 synchronized (inbound) { 115 inbound.closeAbnormal(se.getStatus()); 116 } 117 } 118 } 119 120 @Override cancel(Status status)121 public void cancel(Status status) { 122 synchronized (inbound) { 123 inbound.closeOnCancel(status); 124 } 125 } 126 127 @Override setDeadline(@onnull Deadline deadline)128 public void setDeadline(@Nonnull Deadline deadline) { 129 synchronized (outbound) { 130 outbound.setDeadline(deadline); 131 } 132 } 133 134 @Override getAttributes()135 public Attributes getAttributes() { 136 return attributes; 137 } 138 139 @Override toString()140 public final String toString() { 141 return "MultiMessageClientStream[" + inbound + "/" + outbound + "]"; 142 } 143 144 // ===================== 145 // Misc stubbed & unsupported methods. 146 147 @Override flush()148 public final void flush() { 149 // Ignore. 150 } 151 152 @Override setCompressor(Compressor compressor)153 public final void setCompressor(Compressor compressor) { 154 // Ignore. 155 } 156 157 @Override setMessageCompression(boolean enable)158 public final void setMessageCompression(boolean enable) { 159 // Ignore. 160 } 161 162 @Override setAuthority(String authority)163 public void setAuthority(String authority) { 164 // Ignore. 165 } 166 167 @Override setMaxInboundMessageSize(int maxSize)168 public void setMaxInboundMessageSize(int maxSize) { 169 // Ignore. 170 } 171 172 @Override setMaxOutboundMessageSize(int maxSize)173 public void setMaxOutboundMessageSize(int maxSize) { 174 // Ignore. 175 } 176 177 @Override appendTimeoutInsight(InsightBuilder insight)178 public void appendTimeoutInsight(InsightBuilder insight) { 179 // Ignore 180 } 181 182 @Override setFullStreamDecompression(boolean fullStreamDecompression)183 public void setFullStreamDecompression(boolean fullStreamDecompression) { 184 // Ignore. 185 } 186 187 @Override setDecompressorRegistry(DecompressorRegistry decompressorRegistry)188 public void setDecompressorRegistry(DecompressorRegistry decompressorRegistry) { 189 // Ignore. 190 } 191 192 @Override optimizeForDirectExecutor()193 public void optimizeForDirectExecutor() { 194 // Ignore. 195 } 196 } 197