• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright 2014 Google Inc. All rights reserved.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include <thread>
18 
19 #include <grpc++/grpc++.h>
20 
21 #include "monster_test_generated.h"
22 #include "monster_test.grpc.fb.h"
23 
24 using namespace MyGame::Example;
25 
26 // The callback implementation of our server, that derives from the generated
27 // code. It implements all rpcs specified in the FlatBuffers schema.
28 class ServiceImpl final : public MyGame::Example::MonsterStorage::Service {
Store(::grpc::ServerContext * context,const flatbuffers::grpc::Message<Monster> * request,flatbuffers::grpc::Message<Stat> * response)29   virtual ::grpc::Status Store(::grpc::ServerContext* context,
30                                const flatbuffers::grpc::Message<Monster> *request,
31                                flatbuffers::grpc::Message<Stat> *response)
32                                override {
33     // Create a response from the incoming request name.
34     fbb_.Clear();
35     auto stat_offset = CreateStat(fbb_, fbb_.CreateString("Hello, " +
36                                         request->GetRoot()->name()->str()));
37     fbb_.Finish(stat_offset);
38     // Transfer ownership of the message to gRPC
39     *response = fbb_.ReleaseMessage<Stat>();
40     return grpc::Status::OK;
41   }
Retrieve(::grpc::ServerContext * context,const flatbuffers::grpc::Message<Stat> * request,::grpc::ServerWriter<flatbuffers::grpc::Message<Monster>> * writer)42   virtual ::grpc::Status Retrieve(::grpc::ServerContext *context,
43                                const flatbuffers::grpc::Message<Stat> *request,
44                                ::grpc::ServerWriter< flatbuffers::grpc::Message<Monster>>* writer)
45                                override {
46 
47     for (int i=0; i<10; i++) {
48        fbb_.Clear();
49        // Create 10 monsters for resposne.
50        auto monster_offset =
51        CreateMonster(fbb_, 0, 0, 0, fbb_.CreateString(
52          request->GetRoot()->id()->str() + " No." + std::to_string(i)));
53        fbb_.Finish(monster_offset);
54 
55        flatbuffers::grpc::Message<Monster> monster = fbb_.ReleaseMessage<Monster>();
56 
57        // Send monster to client using streaming.
58        writer->Write(monster);
59      }
60      return grpc::Status::OK;
61   }
62 
63  private:
64   flatbuffers::grpc::MessageBuilder fbb_;
65 };
66 
67 // Track the server instance, so we can terminate it later.
68 grpc::Server *server_instance = nullptr;
69 // Mutex to protec this variable.
70 std::mutex wait_for_server;
71 std::condition_variable server_instance_cv;
72 
73 // This function implements the server thread.
RunServer()74 void RunServer() {
75   auto server_address = "0.0.0.0:50051";
76   // Callback interface we implemented above.
77   ServiceImpl service;
78   grpc::ServerBuilder builder;
79   builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());
80   builder.RegisterService(&service);
81 
82   // Start the server. Lock to change the variable we're changing.
83   wait_for_server.lock();
84   server_instance = builder.BuildAndStart().release();
85   wait_for_server.unlock();
86   server_instance_cv.notify_one();
87 
88   std::cout << "Server listening on " << server_address << std::endl;
89   // This will block the thread and serve requests.
90   server_instance->Wait();
91 }
92 
main(int,const char * [])93 int main(int /*argc*/, const char * /*argv*/[]) {
94   // Launch server.
95   std::thread server_thread(RunServer);
96 
97   // wait for server to spin up.
98   std::unique_lock<std::mutex> lock(wait_for_server);
99   while (!server_instance) server_instance_cv.wait(lock);
100 
101   // Now connect the client.
102   auto channel = grpc::CreateChannel("localhost:50051",
103                                      grpc::InsecureChannelCredentials());
104   auto stub = MyGame::Example::MonsterStorage::NewStub(channel);
105 
106 
107   flatbuffers::grpc::MessageBuilder fbb;
108   {
109     grpc::ClientContext context;
110     // Build a request with the name set.
111     auto monster_offset = CreateMonster(fbb, 0, 0, 0, fbb.CreateString("Fred"));
112     fbb.Finish(monster_offset);
113     auto request = fbb.ReleaseMessage<Monster>();
114     flatbuffers::grpc::Message<Stat> response;
115 
116     // The actual RPC.
117     auto status = stub->Store(&context, request, &response);
118 
119     if (status.ok()) {
120       auto resp = response.GetRoot()->id();
121       std::cout << "RPC response: " << resp->str() << std::endl;
122     } else {
123       std::cout << "RPC failed" << std::endl;
124     }
125   }
126   {
127     grpc::ClientContext context;
128     fbb.Clear();
129     auto stat_offset = CreateStat(fbb, fbb.CreateString("Fred"));
130     fbb.Finish(stat_offset);
131     auto request = fbb.ReleaseMessage<Stat>();
132 
133     flatbuffers::grpc::Message<Monster> response;
134     auto stream = stub->Retrieve(&context, request);
135     while (stream->Read(&response)) {
136       auto resp = response.GetRoot()->name();
137       std::cout << "RPC Streaming response: " << resp->str() << std::endl;
138     }
139   }
140 
141   #if !FLATBUFFERS_GRPC_DISABLE_AUTO_VERIFICATION
142   {
143     // Test that an invalid request errors out correctly
144     grpc::ClientContext context;
145     flatbuffers::grpc::Message<Monster> request;  // simulate invalid message
146     flatbuffers::grpc::Message<Stat> response;
147     auto status = stub->Store(&context, request, &response);
148     // The rpc status should be INTERNAL to indicate a verification error. This
149     // matches the protobuf gRPC status code for an unparseable message.
150     assert(!status.ok());
151     assert(status.error_code() == ::grpc::StatusCode::INTERNAL);
152     assert(strcmp(status.error_message().c_str(), "Message verification failed") == 0);
153   }
154   #endif
155 
156   server_instance->Shutdown();
157 
158   server_thread.join();
159 
160   delete server_instance;
161 
162   return 0;
163 }
164