• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.routeguide;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.protobuf.Message;
21 import io.grpc.ManagedChannel;
22 import io.grpc.ManagedChannelBuilder;
23 import io.grpc.Status;
24 import io.grpc.StatusRuntimeException;
25 import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub;
26 import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideStub;
27 import io.grpc.stub.StreamObserver;
28 import java.io.IOException;
29 import java.util.Iterator;
30 import java.util.List;
31 import java.util.Random;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34 import java.util.logging.Level;
35 import java.util.logging.Logger;
36 
37 /**
38  * Sample client code that makes gRPC calls to the server.
39  */
40 public class RouteGuideClient {
41   private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName());
42 
43   private final ManagedChannel channel;
44   private final RouteGuideBlockingStub blockingStub;
45   private final RouteGuideStub asyncStub;
46 
47   private Random random = new Random();
48   private TestHelper testHelper;
49 
50   /** Construct client for accessing RouteGuide server at {@code host:port}. */
RouteGuideClient(String host, int port)51   public RouteGuideClient(String host, int port) {
52     this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
53   }
54 
55   /** Construct client for accessing RouteGuide server using the existing channel. */
RouteGuideClient(ManagedChannelBuilder<?> channelBuilder)56   public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
57     channel = channelBuilder.build();
58     blockingStub = RouteGuideGrpc.newBlockingStub(channel);
59     asyncStub = RouteGuideGrpc.newStub(channel);
60   }
61 
shutdown()62   public void shutdown() throws InterruptedException {
63     channel.shutdown().awaitTermination(5, TimeUnit.SECONDS);
64   }
65 
66   /**
67    * Blocking unary call example.  Calls getFeature and prints the response.
68    */
getFeature(int lat, int lon)69   public void getFeature(int lat, int lon) {
70     info("*** GetFeature: lat={0} lon={1}", lat, lon);
71 
72     Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
73 
74     Feature feature;
75     try {
76       feature = blockingStub.getFeature(request);
77       if (testHelper != null) {
78         testHelper.onMessage(feature);
79       }
80     } catch (StatusRuntimeException e) {
81       warning("RPC failed: {0}", e.getStatus());
82       if (testHelper != null) {
83         testHelper.onRpcError(e);
84       }
85       return;
86     }
87     if (RouteGuideUtil.exists(feature)) {
88       info("Found feature called \"{0}\" at {1}, {2}",
89           feature.getName(),
90           RouteGuideUtil.getLatitude(feature.getLocation()),
91           RouteGuideUtil.getLongitude(feature.getLocation()));
92     } else {
93       info("Found no feature at {0}, {1}",
94           RouteGuideUtil.getLatitude(feature.getLocation()),
95           RouteGuideUtil.getLongitude(feature.getLocation()));
96     }
97   }
98 
99   /**
100    * Blocking server-streaming example. Calls listFeatures with a rectangle of interest. Prints each
101    * response feature as it arrives.
102    */
listFeatures(int lowLat, int lowLon, int hiLat, int hiLon)103   public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
104     info("*** ListFeatures: lowLat={0} lowLon={1} hiLat={2} hiLon={3}", lowLat, lowLon, hiLat,
105         hiLon);
106 
107     Rectangle request =
108         Rectangle.newBuilder()
109             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
110             .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
111     Iterator<Feature> features;
112     try {
113       features = blockingStub.listFeatures(request);
114       for (int i = 1; features.hasNext(); i++) {
115         Feature feature = features.next();
116         info("Result #" + i + ": {0}", feature);
117         if (testHelper != null) {
118           testHelper.onMessage(feature);
119         }
120       }
121     } catch (StatusRuntimeException e) {
122       warning("RPC failed: {0}", e.getStatus());
123       if (testHelper != null) {
124         testHelper.onRpcError(e);
125       }
126     }
127   }
128 
129   /**
130    * Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
131    * features} with a variable delay in between. Prints the statistics when they are sent from the
132    * server.
133    */
recordRoute(List<Feature> features, int numPoints)134   public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
135     info("*** RecordRoute");
136     final CountDownLatch finishLatch = new CountDownLatch(1);
137     StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
138       @Override
139       public void onNext(RouteSummary summary) {
140         info("Finished trip with {0} points. Passed {1} features. "
141             + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
142             summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
143         if (testHelper != null) {
144           testHelper.onMessage(summary);
145         }
146       }
147 
148       @Override
149       public void onError(Throwable t) {
150         warning("RecordRoute Failed: {0}", Status.fromThrowable(t));
151         if (testHelper != null) {
152           testHelper.onRpcError(t);
153         }
154         finishLatch.countDown();
155       }
156 
157       @Override
158       public void onCompleted() {
159         info("Finished RecordRoute");
160         finishLatch.countDown();
161       }
162     };
163 
164     StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
165     try {
166       // Send numPoints points randomly selected from the features list.
167       for (int i = 0; i < numPoints; ++i) {
168         int index = random.nextInt(features.size());
169         Point point = features.get(index).getLocation();
170         info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
171             RouteGuideUtil.getLongitude(point));
172         requestObserver.onNext(point);
173         // Sleep for a bit before sending the next one.
174         Thread.sleep(random.nextInt(1000) + 500);
175         if (finishLatch.getCount() == 0) {
176           // RPC completed or errored before we finished sending.
177           // Sending further requests won't error, but they will just be thrown away.
178           return;
179         }
180       }
181     } catch (RuntimeException e) {
182       // Cancel RPC
183       requestObserver.onError(e);
184       throw e;
185     }
186     // Mark the end of requests
187     requestObserver.onCompleted();
188 
189     // Receiving happens asynchronously
190     if (!finishLatch.await(1, TimeUnit.MINUTES)) {
191       warning("recordRoute can not finish within 1 minutes");
192     }
193   }
194 
195   /**
196    * Bi-directional example, which can only be asynchronous. Send some chat messages, and print any
197    * chat messages that are sent from the server.
198    */
routeChat()199   public CountDownLatch routeChat() {
200     info("*** RouteChat");
201     final CountDownLatch finishLatch = new CountDownLatch(1);
202     StreamObserver<RouteNote> requestObserver =
203         asyncStub.routeChat(new StreamObserver<RouteNote>() {
204           @Override
205           public void onNext(RouteNote note) {
206             info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
207                 .getLatitude(), note.getLocation().getLongitude());
208             if (testHelper != null) {
209               testHelper.onMessage(note);
210             }
211           }
212 
213           @Override
214           public void onError(Throwable t) {
215             warning("RouteChat Failed: {0}", Status.fromThrowable(t));
216             if (testHelper != null) {
217               testHelper.onRpcError(t);
218             }
219             finishLatch.countDown();
220           }
221 
222           @Override
223           public void onCompleted() {
224             info("Finished RouteChat");
225             finishLatch.countDown();
226           }
227         });
228 
229     try {
230       RouteNote[] requests =
231           {newNote("First message", 0, 0), newNote("Second message", 0, 1),
232               newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
233 
234       for (RouteNote request : requests) {
235         info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
236             .getLatitude(), request.getLocation().getLongitude());
237         requestObserver.onNext(request);
238       }
239     } catch (RuntimeException e) {
240       // Cancel RPC
241       requestObserver.onError(e);
242       throw e;
243     }
244     // Mark the end of requests
245     requestObserver.onCompleted();
246 
247     // return the latch while receiving happens asynchronously
248     return finishLatch;
249   }
250 
251   /** Issues several different requests and then exits. */
main(String[] args)252   public static void main(String[] args) throws InterruptedException {
253     List<Feature> features;
254     try {
255       features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
256     } catch (IOException ex) {
257       ex.printStackTrace();
258       return;
259     }
260 
261     RouteGuideClient client = new RouteGuideClient("localhost", 8980);
262     try {
263       // Looking for a valid feature
264       client.getFeature(409146138, -746188906);
265 
266       // Feature missing.
267       client.getFeature(0, 0);
268 
269       // Looking for features between 40, -75 and 42, -73.
270       client.listFeatures(400000000, -750000000, 420000000, -730000000);
271 
272       // Record a few randomly selected points from the features file.
273       client.recordRoute(features, 10);
274 
275       // Send and receive some notes.
276       CountDownLatch finishLatch = client.routeChat();
277 
278       if (!finishLatch.await(1, TimeUnit.MINUTES)) {
279         client.warning("routeChat can not finish within 1 minutes");
280       }
281     } finally {
282       client.shutdown();
283     }
284   }
285 
info(String msg, Object... params)286   private void info(String msg, Object... params) {
287     logger.log(Level.INFO, msg, params);
288   }
289 
warning(String msg, Object... params)290   private void warning(String msg, Object... params) {
291     logger.log(Level.WARNING, msg, params);
292   }
293 
newNote(String message, int lat, int lon)294   private RouteNote newNote(String message, int lat, int lon) {
295     return RouteNote.newBuilder().setMessage(message)
296         .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
297   }
298 
299   /**
300    * Only used for unit test, as we do not want to introduce randomness in unit test.
301    */
302   @VisibleForTesting
setRandom(Random random)303   void setRandom(Random random) {
304     this.random = random;
305   }
306 
307   /**
308    * Only used for helping unit test.
309    */
310   @VisibleForTesting
311   interface TestHelper {
312     /**
313      * Used for verify/inspect message received from server.
314      */
onMessage(Message message)315     void onMessage(Message message);
316 
317     /**
318      * Used for verify/inspect error received from server.
319      */
onRpcError(Throwable exception)320     void onRpcError(Throwable exception);
321   }
322 
323   @VisibleForTesting
setTestHelper(TestHelper testHelper)324   void setTestHelper(TestHelper testHelper) {
325     this.testHelper = testHelper;
326   }
327 }
328