1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_rpc; 16 17 import com.google.protobuf.ByteString; 18 import com.google.protobuf.InvalidProtocolBufferException; 19 import com.google.protobuf.MessageLite; 20 import dev.pigweed.pw_log.Logger; 21 import dev.pigweed.pw_rpc.Call.ClientStreaming; 22 import java.util.Locale; 23 import javax.annotation.Nullable; 24 25 /** 26 * Partial implementation of the Call interface. 27 * 28 * Call objects never manipulate their own state through public functions. They only manipulate 29 * state through functions called by the RpcManager class. 30 */ 31 abstract class AbstractCall<RequestT extends MessageLite, ResponseT extends MessageLite> 32 implements Call, ClientStreaming<RequestT> { 33 private static final Logger logger = Logger.forClass(StreamObserverCall.class); 34 35 private final Endpoint endpoint; 36 private final PendingRpc rpc; 37 38 @Nullable private Status status = null; 39 @Nullable private Status error = null; 40 AbstractCall(Endpoint endpoint, PendingRpc rpc)41 AbstractCall(Endpoint endpoint, PendingRpc rpc) { 42 this.endpoint = endpoint; 43 this.rpc = rpc; 44 } 45 46 @Nullable 47 @Override status()48 public final Status status() { 49 return status; 50 } 51 52 @Nullable 53 @Override error()54 public final Status error() { 55 return error; 56 } 57 58 @Override cancel()59 public final boolean cancel() throws ChannelOutputException { 60 return endpoint.cancel(this); 61 } 62 63 @Override abandon()64 public final boolean abandon() { 65 return endpoint.abandon(this); 66 } 67 68 @Override write(RequestT request)69 public final boolean write(RequestT request) throws ChannelOutputException { 70 return endpoint.clientStream(this, request); 71 } 72 73 @Override finish()74 public final boolean finish() throws ChannelOutputException { 75 return endpoint.clientStreamEnd(this); 76 } 77 getChannelId()78 final int getChannelId() { 79 return rpc.channel().id(); 80 } 81 sendPacket(byte[] packet)82 final void sendPacket(byte[] packet) throws ChannelOutputException { 83 rpc.channel().send(packet); 84 } 85 rpc()86 final PendingRpc rpc() { 87 return rpc; 88 } 89 90 // The following functions change the call's state and may ONLY be called by the RpcManager! 91 handleNext(ByteString payload)92 final void handleNext(ByteString payload) { 93 ResponseT response = parseResponse(payload); 94 if (response != null) { 95 doHandleNext(response); 96 } 97 } 98 doHandleNext(ResponseT response)99 abstract void doHandleNext(ResponseT response); 100 handleStreamCompleted(Status status)101 final void handleStreamCompleted(Status status) { 102 this.status = status; 103 doHandleCompleted(); 104 } 105 handleUnaryCompleted(ByteString payload, Status status)106 final void handleUnaryCompleted(ByteString payload, Status status) { 107 this.status = status; 108 handleNext(payload); 109 doHandleCompleted(); 110 } 111 doHandleCompleted()112 abstract void doHandleCompleted(); 113 handleError(Status error)114 final void handleError(Status error) { 115 this.error = error; 116 doHandleError(); 117 } 118 doHandleError()119 abstract void doHandleError(); 120 handleExceptionOnInitialPacket(ChannelOutputException e)121 void handleExceptionOnInitialPacket(ChannelOutputException e) throws ChannelOutputException { 122 throw e; 123 } 124 125 @SuppressWarnings("unchecked") 126 @Nullable parseResponse(ByteString payload)127 private ResponseT parseResponse(ByteString payload) { 128 try { 129 return (ResponseT) rpc.method().decodeResponsePayload(payload); 130 } catch (InvalidProtocolBufferException e) { 131 logger.atWarning().withCause(e).log( 132 "Failed to decode response for method %s; skipping packet", rpc.method().name()); 133 return null; 134 } 135 } 136 137 @Override toString()138 public String toString() { 139 return String.format(Locale.ENGLISH, 140 "RpcCall[%s|channel=%d|%s]", 141 rpc.method(), 142 rpc.channel().id(), 143 active() ? "active" : "inactive"); 144 } 145 } 146