• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_rpc;
16 
17 import com.google.common.util.concurrent.ListenableFuture;
18 import com.google.common.util.concurrent.SettableFuture;
19 import com.google.protobuf.MessageLite;
20 import dev.pigweed.pw_log.Logger;
21 import java.util.concurrent.ExecutionException;
22 import java.util.concurrent.Executor;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.TimeoutException;
25 import java.util.function.BiFunction;
26 import java.util.function.Consumer;
27 import javax.annotation.Nullable;
28 
29 /**
30  * Call implementation that represents the call as a ListenableFuture.
31  *
32  * This class suppresses ShouldNotSubclass warnings from ListenableFuture. It implements
33  * ListenableFuture only because it cannot extend AbstractFuture since multiple inheritance is not
34  * supported. No Future funtionality is duplicated; FutureCall uses SettableFuture internally.
35  */
36 @SuppressWarnings("ShouldNotSubclass")
37 abstract class FutureCall<RequestT extends MessageLite, ResponseT extends MessageLite, ResultT>
38     extends AbstractCall<RequestT, ResponseT> implements ListenableFuture<ResultT> {
39   private static final Logger logger = Logger.forClass(FutureCall.class);
40 
41   private final SettableFuture<ResultT> future = SettableFuture.create();
42 
FutureCall(Endpoint endpoint, PendingRpc rpc)43   private FutureCall(Endpoint endpoint, PendingRpc rpc) {
44     super(endpoint, rpc);
45   }
46 
47   // Implement the ListenableFuture interface by forwarding the internal SettableFuture.
48 
49   @Override
addListener(Runnable runnable, Executor executor)50   public final void addListener(Runnable runnable, Executor executor) {
51     future.addListener(runnable, executor);
52   }
53 
54   /** Cancellation means that a cancel() or cancel(boolean) call succeeded. */
55   @Override
isCancelled()56   public final boolean isCancelled() {
57     return error() == Status.CANCELLED;
58   }
59 
60   @Override
isDone()61   public final boolean isDone() {
62     return future.isDone();
63   }
64 
65   @Override
get()66   public final ResultT get() throws InterruptedException, ExecutionException {
67     return future.get();
68   }
69 
70   @Override
get(long timeout, TimeUnit unit)71   public final ResultT get(long timeout, TimeUnit unit)
72       throws InterruptedException, ExecutionException, TimeoutException {
73     return future.get(timeout, unit);
74   }
75 
76   @Override
cancel(boolean mayInterruptIfRunning)77   public final boolean cancel(boolean mayInterruptIfRunning) {
78     try {
79       return this.cancel();
80     } catch (ChannelOutputException e) {
81       logger.atWarning().withCause(e).log("Failed to send cancellation packet for %s", rpc());
82       return true; // handleError() was already called, so the future was cancelled
83     }
84   }
85 
86   /** Used by derived classes to access the future instance. */
future()87   final SettableFuture<ResultT> future() {
88     return future;
89   }
90 
91   @Override
handleExceptionOnInitialPacket(ChannelOutputException e)92   void handleExceptionOnInitialPacket(ChannelOutputException e) {
93     // Stash the exception in the future and abort the call.
94     future.setException(e);
95 
96     // Set the status to mark the call completed. doHandleError() will have no effect since the
97     // exception was already set.
98     handleError(Status.ABORTED);
99   }
100 
101   @Override
doHandleError()102   public void doHandleError() {
103     future.setException(new RpcError(rpc(), error()));
104   }
105 
106   /** Future-based Call class for unary and client streaming RPCs. */
107   static class UnaryResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite>
108       extends FutureCall<RequestT, ResponseT, UnaryResult<ResponseT>>
109       implements ClientStreamingFuture<RequestT, ResponseT> {
110     @Nullable ResponseT response = null;
111 
UnaryResponseFuture(Endpoint endpoint, PendingRpc rpc)112     UnaryResponseFuture(Endpoint endpoint, PendingRpc rpc) {
113       super(endpoint, rpc);
114     }
115 
116     @Override
doHandleNext(ResponseT value)117     public void doHandleNext(ResponseT value) {
118       if (response == null) {
119         response = value;
120       } else {
121         future().setException(new IllegalStateException("Unary RPC received multiple responses."));
122       }
123     }
124 
125     @Override
doHandleCompleted()126     public void doHandleCompleted() {
127       if (response == null) {
128         future().setException(
129             new IllegalStateException("Unary RPC completed without a response payload"));
130       } else {
131         future().set(UnaryResult.create(response, status()));
132       }
133     }
134   }
135 
136   /** Future-based Call class for server and bidirectional streaming RPCs. */
137   static class StreamResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite>
138       extends FutureCall<RequestT, ResponseT, Status>
139       implements BidirectionalStreamingFuture<RequestT> {
140     private final Consumer<ResponseT> onNext;
141 
142     static <RequestT extends MessageLite, ResponseT extends MessageLite>
getFactory( Consumer<ResponseT> onNext)143         BiFunction<Endpoint, PendingRpc, StreamResponseFuture<RequestT, ResponseT>> getFactory(
144             Consumer<ResponseT> onNext) {
145       return (rpcManager, pendingRpc) -> new StreamResponseFuture<>(rpcManager, pendingRpc, onNext);
146     }
147 
StreamResponseFuture(Endpoint endpoint, PendingRpc rpc, Consumer<ResponseT> onNext)148     private StreamResponseFuture(Endpoint endpoint, PendingRpc rpc, Consumer<ResponseT> onNext) {
149       super(endpoint, rpc);
150       this.onNext = onNext;
151     }
152 
153     @Override
doHandleNext(ResponseT value)154     public void doHandleNext(ResponseT value) {
155       onNext.accept(value);
156     }
157 
158     @Override
doHandleCompleted()159     public void doHandleCompleted() {
160       future().set(status());
161     }
162   }
163 }
164