• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2017 Google LLC
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  *     * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *     * Redistributions in binary form must reproduce the above
11  * copyright notice, this list of conditions and the following disclaimer
12  * in the documentation and/or other materials provided with the
13  * distribution.
14  *     * Neither the name of Google LLC nor the names of its
15  * contributors may be used to endorse or promote products derived from
16  * this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
19  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
20  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
21  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
22  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
23  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
24  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
25  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
26  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
27  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29  */
30 package com.google.api.gax.rpc.testing;
31 
32 import com.google.api.core.SettableApiFuture;
33 import com.google.api.gax.rpc.ApiCallContext;
34 import com.google.api.gax.rpc.ResponseObserver;
35 import com.google.api.gax.rpc.ServerStreamingCallable;
36 import com.google.api.gax.rpc.StateCheckingResponseObserver;
37 import com.google.api.gax.rpc.StreamController;
38 import com.google.common.collect.Queues;
39 import java.util.concurrent.BlockingQueue;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.TimeUnit;
42 
43 public class MockStreamingApi {
44   public static class MockServerStreamingCallable<RequestT, ResponseT>
45       extends ServerStreamingCallable<RequestT, ResponseT> {
46     private final BlockingQueue<MockServerStreamingCall<RequestT, ResponseT>> calls =
47         Queues.newLinkedBlockingDeque();
48 
49     @Override
call( RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context)50     public void call(
51         RequestT request, ResponseObserver<ResponseT> responseObserver, ApiCallContext context) {
52       MockStreamController<ResponseT> controller = new MockStreamController<>(responseObserver);
53       calls.add(new MockServerStreamingCall<>(request, controller));
54       responseObserver.onStart(controller);
55     }
56 
popLastCall()57     public MockServerStreamingCall<RequestT, ResponseT> popLastCall() {
58       try {
59         return calls.poll(1, TimeUnit.SECONDS);
60       } catch (Throwable e) {
61         return null;
62       }
63     }
64   }
65 
66   public static class MockServerStreamingCall<RequestT, ResponseT> {
67     private final RequestT request;
68     private final MockStreamController<ResponseT> controller;
69 
MockServerStreamingCall(RequestT request, MockStreamController<ResponseT> controller)70     public MockServerStreamingCall(RequestT request, MockStreamController<ResponseT> controller) {
71       this.request = request;
72       this.controller = controller;
73     }
74 
getRequest()75     public RequestT getRequest() {
76       return request;
77     }
78 
getController()79     public MockStreamController<ResponseT> getController() {
80       return controller;
81     }
82   }
83 
84   public static class MockStreamController<ResponseT> implements StreamController {
85     private final ResponseObserver<ResponseT> downstreamObserver;
86     private final BlockingQueue<Integer> pulls = Queues.newLinkedBlockingQueue();
87     private SettableApiFuture<Boolean> cancelFuture = SettableApiFuture.create();
88     private boolean autoFlowControl = true;
89 
MockStreamController(ResponseObserver<ResponseT> downstreamObserver)90     public MockStreamController(ResponseObserver<ResponseT> downstreamObserver) {
91       this.downstreamObserver = downstreamObserver;
92     }
93 
94     @Override
disableAutoInboundFlowControl()95     public void disableAutoInboundFlowControl() {
96       autoFlowControl = false;
97     }
98 
99     @Override
request(int count)100     public void request(int count) {
101       pulls.add(count);
102     }
103 
104     @Override
cancel()105     public void cancel() {
106       cancelFuture.set(true);
107     }
108 
getObserver()109     public ResponseObserver<ResponseT> getObserver() {
110       return downstreamObserver;
111     }
112 
isAutoFlowControlEnabled()113     public boolean isAutoFlowControlEnabled() {
114       return autoFlowControl;
115     }
116 
isCancelled()117     public boolean isCancelled() {
118       return cancelFuture.isDone();
119     }
120 
waitForCancel()121     public void waitForCancel() {
122       try {
123         cancelFuture.get(1, TimeUnit.SECONDS);
124       } catch (Exception e) {
125         throw new RuntimeException(e);
126       }
127     }
128 
popLastPull()129     public int popLastPull() {
130       Integer results;
131 
132       try {
133         results = pulls.poll(1, TimeUnit.SECONDS);
134       } catch (InterruptedException e) {
135         Thread.currentThread().interrupt();
136         throw new RuntimeException(e);
137       }
138 
139       if (results == null) {
140         return 0;
141       } else {
142         return results;
143       }
144     }
145   }
146 
147   public static class MockResponseObserver<T> extends StateCheckingResponseObserver<T> {
148     private final boolean autoFlowControl;
149     private StreamController controller;
150     private final BlockingQueue<T> responses = Queues.newLinkedBlockingDeque();
151     private final SettableApiFuture<Void> done = SettableApiFuture.create();
152 
MockResponseObserver(boolean autoFlowControl)153     public MockResponseObserver(boolean autoFlowControl) {
154       this.autoFlowControl = autoFlowControl;
155     }
156 
157     @Override
onStartImpl(StreamController controller)158     protected void onStartImpl(StreamController controller) {
159       this.controller = controller;
160       if (!autoFlowControl) {
161         controller.disableAutoInboundFlowControl();
162       }
163     }
164 
165     @Override
onResponseImpl(T response)166     protected void onResponseImpl(T response) {
167       responses.add(response);
168     }
169 
170     @Override
onErrorImpl(Throwable t)171     protected void onErrorImpl(Throwable t) {
172       done.setException(t);
173     }
174 
175     @Override
onCompleteImpl()176     protected void onCompleteImpl() {
177       done.set(null);
178     }
179 
getController()180     public StreamController getController() {
181       return controller;
182     }
183 
popNextResponse()184     public T popNextResponse() {
185       try {
186         return responses.poll(1, TimeUnit.SECONDS);
187       } catch (InterruptedException e) {
188         Thread.currentThread().interrupt();
189         throw new RuntimeException(e);
190       }
191     }
192 
getFinalError()193     public Throwable getFinalError() {
194       try {
195         done.get(1, TimeUnit.SECONDS);
196         return null;
197       } catch (ExecutionException e) {
198         return e.getCause();
199       } catch (Throwable t) {
200         return t;
201       }
202     }
203 
isDone()204     public boolean isDone() {
205       return done.isDone();
206     }
207   }
208 }
209