• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2015 The gRPC Authors
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package io.grpc.examples.routeguide;
18 
19 import com.google.common.annotations.VisibleForTesting;
20 import com.google.protobuf.Message;
21 import io.grpc.Channel;
22 import io.grpc.Grpc;
23 import io.grpc.InsecureChannelCredentials;
24 import io.grpc.ManagedChannel;
25 import io.grpc.Status;
26 import io.grpc.StatusRuntimeException;
27 import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideBlockingStub;
28 import io.grpc.examples.routeguide.RouteGuideGrpc.RouteGuideStub;
29 import io.grpc.stub.StreamObserver;
30 import java.io.IOException;
31 import java.util.Iterator;
32 import java.util.List;
33 import java.util.Random;
34 import java.util.concurrent.CountDownLatch;
35 import java.util.concurrent.TimeUnit;
36 import java.util.logging.Level;
37 import java.util.logging.Logger;
38 
39 /**
40  * Sample client code that makes gRPC calls to the server.
41  */
42 public class RouteGuideClient {
43   private static final Logger logger = Logger.getLogger(RouteGuideClient.class.getName());
44 
45   private final RouteGuideBlockingStub blockingStub;
46   private final RouteGuideStub asyncStub;
47 
48   private Random random = new Random();
49   private TestHelper testHelper;
50 
51   /** Construct client for accessing RouteGuide server using the existing channel. */
RouteGuideClient(Channel channel)52   public RouteGuideClient(Channel channel) {
53     blockingStub = RouteGuideGrpc.newBlockingStub(channel);
54     asyncStub = RouteGuideGrpc.newStub(channel);
55   }
56 
57   /**
58    * Blocking unary call example.  Calls getFeature and prints the response.
59    */
getFeature(int lat, int lon)60   public void getFeature(int lat, int lon) {
61     info("*** GetFeature: lat={0} lon={1}", lat, lon);
62 
63     Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
64 
65     Feature feature;
66     try {
67       feature = blockingStub.getFeature(request);
68       if (testHelper != null) {
69         testHelper.onMessage(feature);
70       }
71     } catch (StatusRuntimeException e) {
72       warning("RPC failed: {0}", e.getStatus());
73       if (testHelper != null) {
74         testHelper.onRpcError(e);
75       }
76       return;
77     }
78     if (RouteGuideUtil.exists(feature)) {
79       info("Found feature called \"{0}\" at {1}, {2}",
80           feature.getName(),
81           RouteGuideUtil.getLatitude(feature.getLocation()),
82           RouteGuideUtil.getLongitude(feature.getLocation()));
83     } else {
84       info("Found no feature at {0}, {1}",
85           RouteGuideUtil.getLatitude(feature.getLocation()),
86           RouteGuideUtil.getLongitude(feature.getLocation()));
87     }
88   }
89 
90   /**
91    * Blocking server-streaming example. Calls listFeatures with a rectangle of interest. Prints each
92    * response feature as it arrives.
93    */
listFeatures(int lowLat, int lowLon, int hiLat, int hiLon)94   public void listFeatures(int lowLat, int lowLon, int hiLat, int hiLon) {
95     info("*** ListFeatures: lowLat={0} lowLon={1} hiLat={2} hiLon={3}", lowLat, lowLon, hiLat,
96         hiLon);
97 
98     Rectangle request =
99         Rectangle.newBuilder()
100             .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
101             .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
102     Iterator<Feature> features;
103     try {
104       features = blockingStub.listFeatures(request);
105       for (int i = 1; features.hasNext(); i++) {
106         Feature feature = features.next();
107         info("Result #" + i + ": {0}", feature);
108         if (testHelper != null) {
109           testHelper.onMessage(feature);
110         }
111       }
112     } catch (StatusRuntimeException e) {
113       warning("RPC failed: {0}", e.getStatus());
114       if (testHelper != null) {
115         testHelper.onRpcError(e);
116       }
117     }
118   }
119 
120   /**
121    * Async client-streaming example. Sends {@code numPoints} randomly chosen points from {@code
122    * features} with a variable delay in between. Prints the statistics when they are sent from the
123    * server.
124    */
recordRoute(List<Feature> features, int numPoints)125   public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
126     info("*** RecordRoute");
127     final CountDownLatch finishLatch = new CountDownLatch(1);
128     StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
129       @Override
130       public void onNext(RouteSummary summary) {
131         info("Finished trip with {0} points. Passed {1} features. "
132             + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
133             summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
134         if (testHelper != null) {
135           testHelper.onMessage(summary);
136         }
137       }
138 
139       @Override
140       public void onError(Throwable t) {
141         warning("RecordRoute Failed: {0}", Status.fromThrowable(t));
142         if (testHelper != null) {
143           testHelper.onRpcError(t);
144         }
145         finishLatch.countDown();
146       }
147 
148       @Override
149       public void onCompleted() {
150         info("Finished RecordRoute");
151         finishLatch.countDown();
152       }
153     };
154 
155     StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
156     try {
157       // Send numPoints points randomly selected from the features list.
158       for (int i = 0; i < numPoints; ++i) {
159         int index = random.nextInt(features.size());
160         Point point = features.get(index).getLocation();
161         info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
162             RouteGuideUtil.getLongitude(point));
163         requestObserver.onNext(point);
164         // Sleep for a bit before sending the next one.
165         Thread.sleep(random.nextInt(1000) + 500);
166         if (finishLatch.getCount() == 0) {
167           // RPC completed or errored before we finished sending.
168           // Sending further requests won't error, but they will just be thrown away.
169           return;
170         }
171       }
172     } catch (RuntimeException e) {
173       // Cancel RPC
174       requestObserver.onError(e);
175       throw e;
176     }
177     // Mark the end of requests
178     requestObserver.onCompleted();
179 
180     // Receiving happens asynchronously
181     if (!finishLatch.await(1, TimeUnit.MINUTES)) {
182       warning("recordRoute can not finish within 1 minutes");
183     }
184   }
185 
186   /**
187    * Bi-directional example, which can only be asynchronous. Send some chat messages, and print any
188    * chat messages that are sent from the server.
189    */
routeChat()190   public CountDownLatch routeChat() {
191     info("*** RouteChat");
192     final CountDownLatch finishLatch = new CountDownLatch(1);
193     StreamObserver<RouteNote> requestObserver =
194         asyncStub.routeChat(new StreamObserver<RouteNote>() {
195           @Override
196           public void onNext(RouteNote note) {
197             info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation()
198                 .getLatitude(), note.getLocation().getLongitude());
199             if (testHelper != null) {
200               testHelper.onMessage(note);
201             }
202           }
203 
204           @Override
205           public void onError(Throwable t) {
206             warning("RouteChat Failed: {0}", Status.fromThrowable(t));
207             if (testHelper != null) {
208               testHelper.onRpcError(t);
209             }
210             finishLatch.countDown();
211           }
212 
213           @Override
214           public void onCompleted() {
215             info("Finished RouteChat");
216             finishLatch.countDown();
217           }
218         });
219 
220     try {
221       RouteNote[] requests =
222           {newNote("First message", 0, 0), newNote("Second message", 0, 10_000_000),
223               newNote("Third message", 10_000_000, 0), newNote("Fourth message", 10_000_000, 10_000_000)};
224 
225       for (RouteNote request : requests) {
226         info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation()
227             .getLatitude(), request.getLocation().getLongitude());
228         requestObserver.onNext(request);
229       }
230     } catch (RuntimeException e) {
231       // Cancel RPC
232       requestObserver.onError(e);
233       throw e;
234     }
235     // Mark the end of requests
236     requestObserver.onCompleted();
237 
238     // return the latch while receiving happens asynchronously
239     return finishLatch;
240   }
241 
242   /** Issues several different requests and then exits. */
main(String[] args)243   public static void main(String[] args) throws InterruptedException {
244     String target = "localhost:8980";
245     if (args.length > 0) {
246       if ("--help".equals(args[0])) {
247         System.err.println("Usage: [target]");
248         System.err.println("");
249         System.err.println("  target  The server to connect to. Defaults to " + target);
250         System.exit(1);
251       }
252       target = args[0];
253     }
254 
255     List<Feature> features;
256     try {
257       features = RouteGuideUtil.parseFeatures(RouteGuideUtil.getDefaultFeaturesFile());
258     } catch (IOException ex) {
259       ex.printStackTrace();
260       return;
261     }
262 
263     ManagedChannel channel = Grpc.newChannelBuilder(target, InsecureChannelCredentials.create())
264         .build();
265     try {
266       RouteGuideClient client = new RouteGuideClient(channel);
267       // Looking for a valid feature
268       client.getFeature(409146138, -746188906);
269 
270       // Feature missing.
271       client.getFeature(0, 0);
272 
273       // Looking for features between 40, -75 and 42, -73.
274       client.listFeatures(400000000, -750000000, 420000000, -730000000);
275 
276       // Record a few randomly selected points from the features file.
277       client.recordRoute(features, 10);
278 
279       // Send and receive some notes.
280       CountDownLatch finishLatch = client.routeChat();
281 
282       if (!finishLatch.await(1, TimeUnit.MINUTES)) {
283         client.warning("routeChat can not finish within 1 minutes");
284       }
285     } finally {
286       channel.shutdownNow().awaitTermination(5, TimeUnit.SECONDS);
287     }
288   }
289 
info(String msg, Object... params)290   private void info(String msg, Object... params) {
291     logger.log(Level.INFO, msg, params);
292   }
293 
warning(String msg, Object... params)294   private void warning(String msg, Object... params) {
295     logger.log(Level.WARNING, msg, params);
296   }
297 
newNote(String message, int lat, int lon)298   private RouteNote newNote(String message, int lat, int lon) {
299     return RouteNote.newBuilder().setMessage(message)
300         .setLocation(Point.newBuilder().setLatitude(lat).setLongitude(lon).build()).build();
301   }
302 
303   /**
304    * Only used for unit test, as we do not want to introduce randomness in unit test.
305    */
306   @VisibleForTesting
setRandom(Random random)307   void setRandom(Random random) {
308     this.random = random;
309   }
310 
311   /**
312    * Only used for helping unit test.
313    */
314   @VisibleForTesting
315   interface TestHelper {
316     /**
317      * Used for verify/inspect message received from server.
318      */
onMessage(Message message)319     void onMessage(Message message);
320 
321     /**
322      * Used for verify/inspect error received from server.
323      */
onRpcError(Throwable exception)324     void onRpcError(Throwable exception);
325   }
326 
327   @VisibleForTesting
setTestHelper(TestHelper testHelper)328   void setTestHelper(TestHelper testHelper) {
329     this.testHelper = testHelper;
330   }
331 }
332