• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_rpc;
16 
17 import com.google.protobuf.ByteString;
18 import com.google.protobuf.InvalidProtocolBufferException;
19 import com.google.protobuf.MessageLite;
20 import dev.pigweed.pw_log.Logger;
21 import dev.pigweed.pw_rpc.Call.ClientStreaming;
22 import java.util.Locale;
23 import javax.annotation.Nullable;
24 
25 /**
26  * Partial implementation of the Call interface.
27  *
28  * Call objects never manipulate their own state through public functions. They only manipulate
29  * state through functions called by the RpcManager class.
30  */
31 abstract class AbstractCall<RequestT extends MessageLite, ResponseT extends MessageLite>
32     implements Call, ClientStreaming<RequestT> {
33   private static final Logger logger = Logger.forClass(StreamObserverCall.class);
34 
35   private final Endpoint endpoint;
36   private final PendingRpc rpc;
37 
38   @Nullable private Status status = null;
39   @Nullable private Status error = null;
40 
AbstractCall(Endpoint endpoint, PendingRpc rpc)41   AbstractCall(Endpoint endpoint, PendingRpc rpc) {
42     this.endpoint = endpoint;
43     this.rpc = rpc;
44   }
45 
46   @Nullable
47   @Override
status()48   public final Status status() {
49     return status;
50   }
51 
52   @Nullable
53   @Override
error()54   public final Status error() {
55     return error;
56   }
57 
58   @Override
cancel()59   public final boolean cancel() throws ChannelOutputException {
60     return endpoint.cancel(this);
61   }
62 
63   @Override
abandon()64   public final boolean abandon() {
65     return endpoint.abandon(this);
66   }
67 
68   @Override
write(RequestT request)69   public final boolean write(RequestT request) throws ChannelOutputException {
70     return endpoint.clientStream(this, request);
71   }
72 
73   @Override
finish()74   public final boolean finish() throws ChannelOutputException {
75     return endpoint.clientStreamEnd(this);
76   }
77 
getChannelId()78   final int getChannelId() {
79     return rpc.channel().id();
80   }
81 
sendPacket(byte[] packet)82   final void sendPacket(byte[] packet) throws ChannelOutputException {
83     rpc.channel().send(packet);
84   }
85 
rpc()86   final PendingRpc rpc() {
87     return rpc;
88   }
89 
90   // The following functions change the call's state and may ONLY be called by the RpcManager!
91 
handleNext(ByteString payload)92   final void handleNext(ByteString payload) {
93     ResponseT response = parseResponse(payload);
94     if (response != null) {
95       doHandleNext(response);
96     }
97   }
98 
doHandleNext(ResponseT response)99   abstract void doHandleNext(ResponseT response);
100 
handleStreamCompleted(Status status)101   final void handleStreamCompleted(Status status) {
102     this.status = status;
103     doHandleCompleted();
104   }
105 
handleUnaryCompleted(ByteString payload, Status status)106   final void handleUnaryCompleted(ByteString payload, Status status) {
107     this.status = status;
108     handleNext(payload);
109     doHandleCompleted();
110   }
111 
doHandleCompleted()112   abstract void doHandleCompleted();
113 
handleError(Status error)114   final void handleError(Status error) {
115     this.error = error;
116     doHandleError();
117   }
118 
doHandleError()119   abstract void doHandleError();
120 
handleExceptionOnInitialPacket(ChannelOutputException e)121   void handleExceptionOnInitialPacket(ChannelOutputException e) throws ChannelOutputException {
122     throw e;
123   }
124 
125   @SuppressWarnings("unchecked")
126   @Nullable
parseResponse(ByteString payload)127   private ResponseT parseResponse(ByteString payload) {
128     try {
129       return (ResponseT) rpc.method().decodeResponsePayload(payload);
130     } catch (InvalidProtocolBufferException e) {
131       logger.atWarning().withCause(e).log(
132           "Failed to decode response for method %s; skipping packet", rpc.method().name());
133       return null;
134     }
135   }
136 
137   @Override
toString()138   public String toString() {
139     return String.format(Locale.ENGLISH,
140         "RpcCall[%s|channel=%d|%s]",
141         rpc.method(),
142         rpc.channel().id(),
143         active() ? "active" : "inactive");
144   }
145 }
146