1 /* 2 * Copyright 2015 The gRPC Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package io.grpc.stub; 18 19 import com.google.common.base.Preconditions; 20 import io.grpc.ExperimentalApi; 21 import java.util.Iterator; 22 23 /** 24 * Utility functions for working with {@link StreamObserver} and it's common subclasses like 25 * {@link CallStreamObserver}. 26 */ 27 @ExperimentalApi("https://github.com/grpc/grpc-java/issues/4694") 28 public final class StreamObservers { 29 /** 30 * Copy the values of an {@link Iterator} to the target {@link CallStreamObserver} while properly 31 * accounting for outbound flow-control. After calling this method, {@code target} should no 32 * longer be used. 33 * 34 * <p>For clients this method is safe to call inside {@link ClientResponseObserver#beforeStart}, 35 * on servers it is safe to call inside the service method implementation. 36 * </p> 37 * 38 * @param source of values expressed as an {@link Iterator}. 39 * @param target {@link CallStreamObserver} which accepts values from the source. 40 */ copyWithFlowControl(final Iterator<V> source, final CallStreamObserver<V> target)41 public static <V> void copyWithFlowControl(final Iterator<V> source, 42 final CallStreamObserver<V> target) { 43 Preconditions.checkNotNull(source, "source"); 44 Preconditions.checkNotNull(target, "target"); 45 46 final class FlowControllingOnReadyHandler implements Runnable { 47 @Override 48 public void run() { 49 while (target.isReady() && source.hasNext()) { 50 target.onNext(source.next()); 51 } 52 if (!source.hasNext()) { 53 target.onCompleted(); 54 } 55 } 56 } 57 58 target.setOnReadyHandler(new FlowControllingOnReadyHandler()); 59 } 60 61 /** 62 * Copy the values of an {@link Iterable} to the target {@link CallStreamObserver} while properly 63 * accounting for outbound flow-control. After calling this method, {@code target} should no 64 * longer be used. 65 * 66 * <p>For clients this method is safe to call inside {@link ClientResponseObserver#beforeStart}, 67 * on servers it is safe to call inside the service method implementation. 68 * </p> 69 * 70 * @param source of values expressed as an {@link Iterable}. 71 * @param target {@link CallStreamObserver} which accepts values from the source. 72 */ copyWithFlowControl(final Iterable<V> source, CallStreamObserver<V> target)73 public static <V> void copyWithFlowControl(final Iterable<V> source, 74 CallStreamObserver<V> target) { 75 Preconditions.checkNotNull(source, "source"); 76 copyWithFlowControl(source.iterator(), target); 77 } 78 } 79