1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_rpc; 16 17 import com.google.protobuf.MessageLite; 18 import java.util.function.BiFunction; 19 20 /** 21 * Represents an ongoing RPC call. 22 * 23 * <p>This call class implements all features of unary, server streaming, client streaming, and 24 * bidirectional streaming RPCs. It provides static methods for creating call objects for each RPC 25 * type. 26 * 27 * @param <RequestT> request type of the RPC; used for client or bidirectional streaming RPCs 28 * @param <ResponseT> response type of the RPC; used for all types of RPCs 29 */ 30 final class StreamObserverCall<RequestT extends MessageLite, ResponseT extends MessageLite> 31 extends AbstractCall<RequestT, ResponseT> { 32 private final StreamObserver<ResponseT> observer; 33 34 static <RequestT extends MessageLite, ResponseT extends MessageLite> getFactory( StreamObserver<ResponseT> observer)35 BiFunction<Endpoint, PendingRpc, StreamObserverCall<RequestT, ResponseT>> getFactory( 36 StreamObserver<ResponseT> observer) { 37 return (endpoint, pendingRpc) -> new StreamObserverCall<>(endpoint, pendingRpc, observer); 38 } 39 StreamObserverCall( Endpoint endpoint, PendingRpc rpc, StreamObserver<ResponseT> observer)40 private StreamObserverCall( 41 Endpoint endpoint, PendingRpc rpc, StreamObserver<ResponseT> observer) { 42 super(endpoint, rpc); 43 this.observer = observer; 44 } 45 46 @Override doHandleNext(ResponseT response)47 void doHandleNext(ResponseT response) { 48 observer.onNext(response); 49 } 50 51 @Override doHandleCompleted()52 void doHandleCompleted() { 53 observer.onCompleted(status()); 54 } 55 56 @Override doHandleError()57 void doHandleError() { 58 observer.onError(error()); 59 } 60 } 61