1 /* 2 * Copyright 2017 Google LLC 3 * 4 * Redistribution and use in source and binary forms, with or without 5 * modification, are permitted provided that the following conditions are 6 * met: 7 * 8 * * Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * * Redistributions in binary form must reproduce the above 11 * copyright notice, this list of conditions and the following disclaimer 12 * in the documentation and/or other materials provided with the 13 * distribution. 14 * * Neither the name of Google LLC nor the names of its 15 * contributors may be used to endorse or promote products derived from 16 * this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 */ 30 package com.google.api.gax.rpc.testing; 31 32 import com.google.api.core.SettableApiFuture; 33 import com.google.api.gax.rpc.ApiCallContext; 34 import com.google.api.gax.rpc.ResponseObserver; 35 import com.google.api.gax.rpc.ServerStreamingCallable; 36 import com.google.api.gax.rpc.StateCheckingResponseObserver; 37 import com.google.api.gax.rpc.StreamController; 38 import com.google.common.collect.Queues; 39 import java.util.concurrent.BlockingQueue; 40 import java.util.concurrent.ExecutionException; 41 import java.util.concurrent.TimeUnit; 42 43 public class MockStreamingApi { 44 public static class MockServerStreamingCallable<RequestT, ResponseT> 45 extends ServerStreamingCallable<RequestT, ResponseT> { 46 private final BlockingQueue<MockServerStreamingCall<RequestT, ResponseT>> calls = 47 Queues.newLinkedBlockingDeque(); 48 49 @Override call( RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context)50 public void call( 51 RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) { 52 MockStreamController<ResponseT> controller = new MockStreamController<>(responseObserver); 53 calls.add(new MockServerStreamingCall<>(request, controller)); 54 responseObserver.onStart(controller); 55 } 56 popLastCall()57 public MockServerStreamingCall<RequestT, ResponseT> popLastCall() { 58 try { 59 return calls.poll(1, TimeUnit.SECONDS); 60 } catch (Throwable e) { 61 return null; 62 } 63 } 64 } 65 66 public static class MockServerStreamingCall<RequestT, ResponseT> { 67 private final RequestT request; 68 private final MockStreamController<ResponseT> controller; 69 MockServerStreamingCall(RequestT request, MockStreamController<ResponseT> controller)70 public MockServerStreamingCall(RequestT request, MockStreamController<ResponseT> controller) { 71 this.request = request; 72 this.controller = controller; 73 } 74 getRequest()75 public RequestT getRequest() { 76 return request; 77 } 78 getController()79 public MockStreamController<ResponseT> getController() { 80 return controller; 81 } 82 } 83 84 public static class MockStreamController<ResponseT> implements StreamController { 85 private final ResponseObserver<ResponseT> downstreamObserver; 86 private final BlockingQueue<Integer> pulls = Queues.newLinkedBlockingQueue(); 87 private SettableApiFuture<Boolean> cancelFuture = SettableApiFuture.create(); 88 private boolean autoFlowControl = true; 89 MockStreamController(ResponseObserver<ResponseT> downstreamObserver)90 public MockStreamController(ResponseObserver<ResponseT> downstreamObserver) { 91 this.downstreamObserver = downstreamObserver; 92 } 93 94 @Override disableAutoInboundFlowControl()95 public void disableAutoInboundFlowControl() { 96 autoFlowControl = false; 97 } 98 99 @Override request(int count)100 public void request(int count) { 101 pulls.add(count); 102 } 103 104 @Override cancel()105 public void cancel() { 106 cancelFuture.set(true); 107 } 108 getObserver()109 public ResponseObserver<ResponseT> getObserver() { 110 return downstreamObserver; 111 } 112 isAutoFlowControlEnabled()113 public boolean isAutoFlowControlEnabled() { 114 return autoFlowControl; 115 } 116 isCancelled()117 public boolean isCancelled() { 118 return cancelFuture.isDone(); 119 } 120 waitForCancel()121 public void waitForCancel() { 122 try { 123 cancelFuture.get(1, TimeUnit.SECONDS); 124 } catch (Exception e) { 125 throw new RuntimeException(e); 126 } 127 } 128 popLastPull()129 public int popLastPull() { 130 Integer results; 131 132 try { 133 results = pulls.poll(1, TimeUnit.SECONDS); 134 } catch (InterruptedException e) { 135 Thread.currentThread().interrupt(); 136 throw new RuntimeException(e); 137 } 138 139 if (results == null) { 140 return 0; 141 } else { 142 return results; 143 } 144 } 145 } 146 147 public static class MockResponseObserver<T> extends StateCheckingResponseObserver<T> { 148 private final boolean autoFlowControl; 149 private StreamController controller; 150 private final BlockingQueue<T> responses = Queues.newLinkedBlockingDeque(); 151 private final SettableApiFuture<Void> done = SettableApiFuture.create(); 152 MockResponseObserver(boolean autoFlowControl)153 public MockResponseObserver(boolean autoFlowControl) { 154 this.autoFlowControl = autoFlowControl; 155 } 156 157 @Override onStartImpl(StreamController controller)158 protected void onStartImpl(StreamController controller) { 159 this.controller = controller; 160 if (!autoFlowControl) { 161 controller.disableAutoInboundFlowControl(); 162 } 163 } 164 165 @Override onResponseImpl(T response)166 protected void onResponseImpl(T response) { 167 responses.add(response); 168 } 169 170 @Override onErrorImpl(Throwable t)171 protected void onErrorImpl(Throwable t) { 172 done.setException(t); 173 } 174 175 @Override onCompleteImpl()176 protected void onCompleteImpl() { 177 done.set(null); 178 } 179 getController()180 public StreamController getController() { 181 return controller; 182 } 183 popNextResponse()184 public T popNextResponse() { 185 try { 186 return responses.poll(1, TimeUnit.SECONDS); 187 } catch (InterruptedException e) { 188 Thread.currentThread().interrupt(); 189 throw new RuntimeException(e); 190 } 191 } 192 getFinalError()193 public Throwable getFinalError() { 194 try { 195 done.get(1, TimeUnit.SECONDS); 196 return null; 197 } catch (ExecutionException e) { 198 return e.getCause(); 199 } catch (Throwable t) { 200 return t; 201 } 202 } 203 isDone()204 public boolean isDone() { 205 return done.isDone(); 206 } 207 } 208 } 209