1 /*
2 *
3 * Copyright 2021 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/grpc.h>
20 #include <grpcpp/security/server_credentials.h>
21 #include <grpcpp/server.h>
22 #include <grpcpp/server_builder.h>
23 #include <grpcpp/server_context.h>
24
25 #include <algorithm>
26 #include <chrono>
27 #include <cmath>
28 #include <iostream>
29 #include <memory>
30 #include <string>
31 #include <thread>
32
33 #include "absl/flags/parse.h"
34 #include "absl/log/globals.h"
35 #include "absl/log/initialize.h"
36 #include "absl/log/log.h"
37 #include "helper.h"
38 #ifdef BAZEL_BUILD
39 #include "examples/protos/route_guide.grpc.pb.h"
40 #else
41 #include "route_guide.grpc.pb.h"
42 #endif
43
44 using grpc::CallbackServerContext;
45 using grpc::Server;
46 using grpc::ServerBuilder;
47 using grpc::Status;
48 using routeguide::Feature;
49 using routeguide::Point;
50 using routeguide::Rectangle;
51 using routeguide::RouteGuide;
52 using routeguide::RouteNote;
53 using routeguide::RouteSummary;
54 using std::chrono::system_clock;
55
ConvertToRadians(float num)56 float ConvertToRadians(float num) { return num * 3.1415926 / 180; }
57
58 // The formula is based on http://mathforum.org/library/drmath/view/51879.html
GetDistance(const Point & start,const Point & end)59 float GetDistance(const Point& start, const Point& end) {
60 const float kCoordFactor = 10000000.0;
61 float lat_1 = start.latitude() / kCoordFactor;
62 float lat_2 = end.latitude() / kCoordFactor;
63 float lon_1 = start.longitude() / kCoordFactor;
64 float lon_2 = end.longitude() / kCoordFactor;
65 float lat_rad_1 = ConvertToRadians(lat_1);
66 float lat_rad_2 = ConvertToRadians(lat_2);
67 float delta_lat_rad = ConvertToRadians(lat_2 - lat_1);
68 float delta_lon_rad = ConvertToRadians(lon_2 - lon_1);
69
70 float a = pow(sin(delta_lat_rad / 2), 2) +
71 cos(lat_rad_1) * cos(lat_rad_2) * pow(sin(delta_lon_rad / 2), 2);
72 float c = 2 * atan2(sqrt(a), sqrt(1 - a));
73 int R = 6371000; // metres
74
75 return R * c;
76 }
77
GetFeatureName(const Point & point,const std::vector<Feature> & feature_list)78 std::string GetFeatureName(const Point& point,
79 const std::vector<Feature>& feature_list) {
80 for (const Feature& f : feature_list) {
81 if (f.location().latitude() == point.latitude() &&
82 f.location().longitude() == point.longitude()) {
83 return f.name();
84 }
85 }
86 return "";
87 }
88
89 class RouteGuideImpl final : public RouteGuide::CallbackService {
90 public:
RouteGuideImpl(const std::string & db)91 explicit RouteGuideImpl(const std::string& db) {
92 routeguide::ParseDb(db, &feature_list_);
93 }
94
GetFeature(grpc::CallbackServerContext * context,const Point * point,Feature * feature)95 grpc::ServerUnaryReactor* GetFeature(grpc::CallbackServerContext* context,
96 const Point* point,
97 Feature* feature) override {
98 class Reactor : public grpc::ServerUnaryReactor {
99 public:
100 Reactor(const Point& point, const std::vector<Feature>& feature_list,
101 Feature* feature) {
102 feature->set_name(GetFeatureName(point, feature_list));
103 *feature->mutable_location() = point;
104 Finish(grpc::Status::OK);
105 }
106
107 private:
108 void OnDone() override {
109 LOG(INFO) << "RPC Completed";
110 delete this;
111 }
112
113 void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
114 };
115 return new Reactor(*point, feature_list_, feature);
116 }
117
118 /* Alternate simple implementation of GetFeature that uses the DefaultReactor.
119 grpc::ServerUnaryReactor* GetFeature(CallbackServerContext* context,
120 const Point* point,
121 Feature* feature) override {
122 feature->set_name(GetFeatureName(*point, feature_list_));
123 feature->mutable_location()->CopyFrom(*point);
124 auto* reactor = context->DefaultReactor();
125 reactor->Finish(Status::OK);
126 return reactor;
127 }
128 */
129
ListFeatures(CallbackServerContext * context,const routeguide::Rectangle * rectangle)130 grpc::ServerWriteReactor<Feature>* ListFeatures(
131 CallbackServerContext* context,
132 const routeguide::Rectangle* rectangle) override {
133 class Lister : public grpc::ServerWriteReactor<Feature> {
134 public:
135 Lister(const routeguide::Rectangle* rectangle,
136 const std::vector<Feature>* feature_list)
137 : left_((std::min)(rectangle->lo().longitude(),
138 rectangle->hi().longitude())),
139 right_((std::max)(rectangle->lo().longitude(),
140 rectangle->hi().longitude())),
141 top_((std::max)(rectangle->lo().latitude(),
142 rectangle->hi().latitude())),
143 bottom_((std::min)(rectangle->lo().latitude(),
144 rectangle->hi().latitude())),
145 feature_list_(feature_list),
146 next_feature_(feature_list_->begin()) {
147 NextWrite();
148 }
149
150 void OnWriteDone(bool ok) override {
151 if (!ok) {
152 Finish(Status(grpc::StatusCode::UNKNOWN, "Unexpected Failure"));
153 }
154 NextWrite();
155 }
156
157 void OnDone() override {
158 LOG(INFO) << "RPC Completed";
159 delete this;
160 }
161
162 void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
163
164 private:
165 void NextWrite() {
166 while (next_feature_ != feature_list_->end()) {
167 const Feature& f = *next_feature_;
168 next_feature_++;
169 if (f.location().longitude() >= left_ &&
170 f.location().longitude() <= right_ &&
171 f.location().latitude() >= bottom_ &&
172 f.location().latitude() <= top_) {
173 StartWrite(&f);
174 return;
175 }
176 }
177 // Didn't write anything, all is done.
178 Finish(Status::OK);
179 }
180 const long left_;
181 const long right_;
182 const long top_;
183 const long bottom_;
184 const std::vector<Feature>* feature_list_;
185 std::vector<Feature>::const_iterator next_feature_;
186 };
187 return new Lister(rectangle, &feature_list_);
188 }
189
RecordRoute(CallbackServerContext * context,RouteSummary * summary)190 grpc::ServerReadReactor<Point>* RecordRoute(CallbackServerContext* context,
191 RouteSummary* summary) override {
192 class Recorder : public grpc::ServerReadReactor<Point> {
193 public:
194 Recorder(RouteSummary* summary, const std::vector<Feature>* feature_list)
195 : start_time_(system_clock::now()),
196 summary_(summary),
197 feature_list_(feature_list) {
198 StartRead(&point_);
199 }
200
201 void OnReadDone(bool ok) override {
202 if (ok) {
203 point_count_++;
204 if (!GetFeatureName(point_, *feature_list_).empty()) {
205 feature_count_++;
206 }
207 if (point_count_ != 1) {
208 distance_ += GetDistance(previous_, point_);
209 }
210 previous_ = point_;
211 StartRead(&point_);
212 } else {
213 summary_->set_point_count(point_count_);
214 summary_->set_feature_count(feature_count_);
215 summary_->set_distance(static_cast<long>(distance_));
216 auto secs = std::chrono::duration_cast<std::chrono::seconds>(
217 system_clock::now() - start_time_);
218 summary_->set_elapsed_time(secs.count());
219 Finish(Status::OK);
220 }
221 }
222
223 void OnDone() override {
224 LOG(INFO) << "RPC Completed";
225 delete this;
226 }
227
228 void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
229
230 private:
231 system_clock::time_point start_time_;
232 RouteSummary* summary_;
233 const std::vector<Feature>* feature_list_;
234 Point point_;
235 int point_count_ = 0;
236 int feature_count_ = 0;
237 float distance_ = 0.0;
238 Point previous_;
239 };
240 return new Recorder(summary, &feature_list_);
241 }
242
RouteChat(CallbackServerContext * context)243 grpc::ServerBidiReactor<RouteNote, RouteNote>* RouteChat(
244 CallbackServerContext* context) override {
245 class Chatter : public grpc::ServerBidiReactor<RouteNote, RouteNote> {
246 public:
247 Chatter(absl::Mutex* mu, std::vector<RouteNote>* received_notes)
248 : mu_(mu), received_notes_(received_notes) {
249 StartRead(¬e_);
250 }
251
252 void OnReadDone(bool ok) override {
253 if (ok) {
254 // Unlike the other example in this directory that's not using
255 // the reactor pattern, we can't grab a local lock to secure the
256 // access to the notes vector, because the reactor will most likely
257 // make us jump threads, so we'll have to use a different locking
258 // strategy. We'll grab the lock locally to build a copy of the
259 // list of nodes we're going to send, then we'll grab the lock
260 // again to append the received note to the existing vector.
261 mu_->Lock();
262 std::copy_if(received_notes_->begin(), received_notes_->end(),
263 std::back_inserter(to_send_notes_),
264 [this](const RouteNote& note) {
265 return note.location().latitude() ==
266 note_.location().latitude() &&
267 note.location().longitude() ==
268 note_.location().longitude();
269 });
270 mu_->Unlock();
271 notes_iterator_ = to_send_notes_.begin();
272 NextWrite();
273 } else {
274 Finish(Status::OK);
275 }
276 }
277 void OnWriteDone(bool /*ok*/) override { NextWrite(); }
278
279 void OnDone() override {
280 LOG(INFO) << "RPC Completed";
281 delete this;
282 }
283
284 void OnCancel() override { LOG(ERROR) << "RPC Cancelled"; }
285
286 private:
287 void NextWrite() {
288 if (notes_iterator_ != to_send_notes_.end()) {
289 StartWrite(&*notes_iterator_);
290 notes_iterator_++;
291 } else {
292 mu_->Lock();
293 received_notes_->push_back(note_);
294 mu_->Unlock();
295 StartRead(¬e_);
296 }
297 }
298
299 RouteNote note_;
300 absl::Mutex* mu_;
301 std::vector<RouteNote>* received_notes_ ABSL_GUARDED_BY(mu_);
302 std::vector<RouteNote> to_send_notes_;
303 std::vector<RouteNote>::iterator notes_iterator_;
304 };
305 return new Chatter(&mu_, &received_notes_);
306 }
307
308 private:
309 std::vector<Feature> feature_list_;
310 absl::Mutex mu_;
311 std::vector<RouteNote> received_notes_ ABSL_GUARDED_BY(mu_);
312 };
313
RunServer(const std::string & db_path)314 void RunServer(const std::string& db_path) {
315 std::string server_address("0.0.0.0:50051");
316 RouteGuideImpl service(db_path);
317
318 ServerBuilder builder;
319 builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
320 builder.RegisterService(&service);
321 std::unique_ptr<Server> server(builder.BuildAndStart());
322 LOG(INFO) << "Server listening on " << server_address;
323 server->Wait();
324 }
325
main(int argc,char ** argv)326 int main(int argc, char** argv) {
327 absl::ParseCommandLine(argc, argv);
328 absl::SetStderrThreshold(absl::LogSeverityAtLeast::kInfo);
329 absl::InitializeLog();
330 // Expect only arg: --db_path=path/to/route_guide_db.json.
331 std::string db = routeguide::GetDbFileContent(argc, argv);
332 RunServer(db);
333
334 return 0;
335 }
336