1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_rpc; 16 17 import com.google.common.util.concurrent.ListenableFuture; 18 import com.google.common.util.concurrent.SettableFuture; 19 import com.google.protobuf.MessageLite; 20 import dev.pigweed.pw_log.Logger; 21 import java.util.concurrent.ExecutionException; 22 import java.util.concurrent.Executor; 23 import java.util.concurrent.TimeUnit; 24 import java.util.concurrent.TimeoutException; 25 import java.util.function.BiFunction; 26 import java.util.function.Consumer; 27 import javax.annotation.Nullable; 28 29 /** 30 * Call implementation that represents the call as a ListenableFuture. 31 * 32 * This class suppresses ShouldNotSubclass warnings from ListenableFuture. It implements 33 * ListenableFuture only because it cannot extend AbstractFuture since multiple inheritance is not 34 * supported. No Future funtionality is duplicated; FutureCall uses SettableFuture internally. 35 */ 36 @SuppressWarnings("ShouldNotSubclass") 37 abstract class FutureCall<RequestT extends MessageLite, ResponseT extends MessageLite, ResultT> 38 extends AbstractCall<RequestT, ResponseT> implements ListenableFuture<ResultT> { 39 private static final Logger logger = Logger.forClass(FutureCall.class); 40 41 private final SettableFuture<ResultT> future = SettableFuture.create(); 42 FutureCall(Endpoint endpoint, PendingRpc rpc)43 private FutureCall(Endpoint endpoint, PendingRpc rpc) { 44 super(endpoint, rpc); 45 } 46 47 // Implement the ListenableFuture interface by forwarding the internal SettableFuture. 48 49 @Override addListener(Runnable runnable, Executor executor)50 public final void addListener(Runnable runnable, Executor executor) { 51 future.addListener(runnable, executor); 52 } 53 54 /** Cancellation means that a cancel() or cancel(boolean) call succeeded. */ 55 @Override isCancelled()56 public final boolean isCancelled() { 57 return error() == Status.CANCELLED; 58 } 59 60 @Override isDone()61 public final boolean isDone() { 62 return future.isDone(); 63 } 64 65 @Override get()66 public final ResultT get() throws InterruptedException, ExecutionException { 67 return future.get(); 68 } 69 70 @Override get(long timeout, TimeUnit unit)71 public final ResultT get(long timeout, TimeUnit unit) 72 throws InterruptedException, ExecutionException, TimeoutException { 73 return future.get(timeout, unit); 74 } 75 76 @Override cancel(boolean mayInterruptIfRunning)77 public final boolean cancel(boolean mayInterruptIfRunning) { 78 try { 79 return this.cancel(); 80 } catch (ChannelOutputException e) { 81 logger.atWarning().withCause(e).log("Failed to send cancellation packet for %s", rpc()); 82 return true; // handleError() was already called, so the future was cancelled 83 } 84 } 85 86 /** Used by derived classes to access the future instance. */ future()87 final SettableFuture<ResultT> future() { 88 return future; 89 } 90 91 @Override handleExceptionOnInitialPacket(ChannelOutputException e)92 void handleExceptionOnInitialPacket(ChannelOutputException e) { 93 // Stash the exception in the future and abort the call. 94 future.setException(e); 95 96 // Set the status to mark the call completed. doHandleError() will have no effect since the 97 // exception was already set. 98 handleError(Status.ABORTED); 99 } 100 101 @Override doHandleError()102 public void doHandleError() { 103 future.setException(new RpcError(rpc(), error())); 104 } 105 106 /** Future-based Call class for unary and client streaming RPCs. */ 107 static class UnaryResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite> 108 extends FutureCall<RequestT, ResponseT, UnaryResult<ResponseT>> 109 implements ClientStreamingFuture<RequestT, ResponseT> { 110 @Nullable ResponseT response = null; 111 UnaryResponseFuture(Endpoint endpoint, PendingRpc rpc)112 UnaryResponseFuture(Endpoint endpoint, PendingRpc rpc) { 113 super(endpoint, rpc); 114 } 115 116 @Override doHandleNext(ResponseT value)117 public void doHandleNext(ResponseT value) { 118 if (response == null) { 119 response = value; 120 } else { 121 future().setException(new IllegalStateException("Unary RPC received multiple responses.")); 122 } 123 } 124 125 @Override doHandleCompleted()126 public void doHandleCompleted() { 127 if (response == null) { 128 future().setException( 129 new IllegalStateException("Unary RPC completed without a response payload")); 130 } else { 131 future().set(UnaryResult.create(response, status())); 132 } 133 } 134 } 135 136 /** Future-based Call class for server and bidirectional streaming RPCs. */ 137 static class StreamResponseFuture<RequestT extends MessageLite, ResponseT extends MessageLite> 138 extends FutureCall<RequestT, ResponseT, Status> 139 implements BidirectionalStreamingFuture<RequestT> { 140 private final Consumer<ResponseT> onNext; 141 142 static <RequestT extends MessageLite, ResponseT extends MessageLite> getFactory( Consumer<ResponseT> onNext)143 BiFunction<Endpoint, PendingRpc, StreamResponseFuture<RequestT, ResponseT>> getFactory( 144 Consumer<ResponseT> onNext) { 145 return (rpcManager, pendingRpc) -> new StreamResponseFuture<>(rpcManager, pendingRpc, onNext); 146 } 147 StreamResponseFuture(Endpoint endpoint, PendingRpc rpc, Consumer<ResponseT> onNext)148 private StreamResponseFuture(Endpoint endpoint, PendingRpc rpc, Consumer<ResponseT> onNext) { 149 super(endpoint, rpc); 150 this.onNext = onNext; 151 } 152 153 @Override doHandleNext(ResponseT value)154 public void doHandleNext(ResponseT value) { 155 onNext.accept(value); 156 } 157 158 @Override doHandleCompleted()159 public void doHandleCompleted() { 160 future().set(status()); 161 } 162 } 163 } 164