• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2019 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27 
28 #include <google/protobuf/arena.h>
29 
30 #include <grpc/impl/codegen/log.h>
31 #include <gtest/gtest.h>
32 
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 #include <grpcpp/server.h>
37 #include <grpcpp/server_builder.h>
38 #include <grpcpp/server_context.h>
39 #include <grpcpp/support/client_callback.h>
40 #include <grpcpp/support/message_allocator.h>
41 
42 #include "src/core/lib/iomgr/iomgr.h"
43 #include "src/proto/grpc/testing/echo.grpc.pb.h"
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/util/test_credentials_provider.h"
47 
48 namespace grpc {
49 namespace testing {
50 namespace {
51 
52 class CallbackTestServiceImpl
53     : public EchoTestService::ExperimentalCallbackService {
54  public:
CallbackTestServiceImpl()55   explicit CallbackTestServiceImpl() {}
56 
SetAllocatorMutator(std::function<void (experimental::RpcAllocatorState * allocator_state,const EchoRequest * req,EchoResponse * resp)> mutator)57   void SetAllocatorMutator(
58       std::function<void(experimental::RpcAllocatorState* allocator_state,
59                          const EchoRequest* req, EchoResponse* resp)>
60           mutator) {
61     allocator_mutator_ = mutator;
62   }
63 
Echo(experimental::CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)64   experimental::ServerUnaryReactor* Echo(
65       experimental::CallbackServerContext* context, const EchoRequest* request,
66       EchoResponse* response) override {
67     response->set_message(request->message());
68     if (allocator_mutator_) {
69       allocator_mutator_(context->GetRpcAllocatorState(), request, response);
70     }
71     auto* reactor = context->DefaultReactor();
72     reactor->Finish(Status::OK);
73     return reactor;
74   }
75 
76  private:
77   std::function<void(experimental::RpcAllocatorState* allocator_state,
78                      const EchoRequest* req, EchoResponse* resp)>
79       allocator_mutator_;
80 };
81 
82 enum class Protocol { INPROC, TCP };
83 
84 class TestScenario {
85  public:
TestScenario(Protocol protocol,const std::string & creds_type)86   TestScenario(Protocol protocol, const std::string& creds_type)
87       : protocol(protocol), credentials_type(creds_type) {}
88   void Log() const;
89   Protocol protocol;
90   const std::string credentials_type;
91 };
92 
operator <<(std::ostream & out,const TestScenario & scenario)93 static std::ostream& operator<<(std::ostream& out,
94                                 const TestScenario& scenario) {
95   return out << "TestScenario{protocol="
96              << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
97              << "," << scenario.credentials_type << "}";
98 }
99 
Log() const100 void TestScenario::Log() const {
101   std::ostringstream out;
102   out << *this;
103   gpr_log(GPR_INFO, "%s", out.str().c_str());
104 }
105 
106 class MessageAllocatorEnd2endTestBase
107     : public ::testing::TestWithParam<TestScenario> {
108  protected:
MessageAllocatorEnd2endTestBase()109   MessageAllocatorEnd2endTestBase() { GetParam().Log(); }
110 
111   ~MessageAllocatorEnd2endTestBase() = default;
112 
CreateServer(experimental::MessageAllocator<EchoRequest,EchoResponse> * allocator)113   void CreateServer(
114       experimental::MessageAllocator<EchoRequest, EchoResponse>* allocator) {
115     ServerBuilder builder;
116 
117     auto server_creds = GetCredentialsProvider()->GetServerCredentials(
118         GetParam().credentials_type);
119     if (GetParam().protocol == Protocol::TCP) {
120       picked_port_ = grpc_pick_unused_port_or_die();
121       server_address_ << "localhost:" << picked_port_;
122       builder.AddListeningPort(server_address_.str(), server_creds);
123     }
124     callback_service_.SetMessageAllocatorFor_Echo(allocator);
125     builder.RegisterService(&callback_service_);
126 
127     server_ = builder.BuildAndStart();
128   }
129 
DestroyServer()130   void DestroyServer() {
131     if (server_) {
132       server_->Shutdown();
133       server_.reset();
134     }
135   }
136 
ResetStub()137   void ResetStub() {
138     ChannelArguments args;
139     auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
140         GetParam().credentials_type, &args);
141     switch (GetParam().protocol) {
142       case Protocol::TCP:
143         channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
144                                                channel_creds, args);
145         break;
146       case Protocol::INPROC:
147         channel_ = server_->InProcessChannel(args);
148         break;
149       default:
150         assert(false);
151     }
152     stub_ = EchoTestService::NewStub(channel_);
153   }
154 
TearDown()155   void TearDown() override {
156     DestroyServer();
157     if (picked_port_ > 0) {
158       grpc_recycle_unused_port(picked_port_);
159     }
160   }
161 
SendRpcs(int num_rpcs)162   void SendRpcs(int num_rpcs) {
163     std::string test_string("");
164     for (int i = 0; i < num_rpcs; i++) {
165       EchoRequest request;
166       EchoResponse response;
167       ClientContext cli_ctx;
168 
169       test_string += std::string(1024, 'x');
170       request.set_message(test_string);
171       std::string val;
172       cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
173 
174       std::mutex mu;
175       std::condition_variable cv;
176       bool done = false;
177       stub_->experimental_async()->Echo(
178           &cli_ctx, &request, &response,
179           [&request, &response, &done, &mu, &cv, val](Status s) {
180             GPR_ASSERT(s.ok());
181 
182             EXPECT_EQ(request.message(), response.message());
183             std::lock_guard<std::mutex> l(mu);
184             done = true;
185             cv.notify_one();
186           });
187       std::unique_lock<std::mutex> l(mu);
188       while (!done) {
189         cv.wait(l);
190       }
191     }
192   }
193 
194   int picked_port_{0};
195   std::shared_ptr<Channel> channel_;
196   std::unique_ptr<EchoTestService::Stub> stub_;
197   CallbackTestServiceImpl callback_service_;
198   std::unique_ptr<Server> server_;
199   std::ostringstream server_address_;
200 };
201 
202 class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {};
203 
TEST_P(NullAllocatorTest,SimpleRpc)204 TEST_P(NullAllocatorTest, SimpleRpc) {
205   CreateServer(nullptr);
206   ResetStub();
207   SendRpcs(1);
208 }
209 
210 class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
211  public:
212   class SimpleAllocator
213       : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
214    public:
215     class MessageHolderImpl
216         : public experimental::MessageHolder<EchoRequest, EchoResponse> {
217      public:
MessageHolderImpl(std::atomic_int * request_deallocation_count,std::atomic_int * messages_deallocation_count)218       MessageHolderImpl(std::atomic_int* request_deallocation_count,
219                         std::atomic_int* messages_deallocation_count)
220           : request_deallocation_count_(request_deallocation_count),
221             messages_deallocation_count_(messages_deallocation_count) {
222         set_request(new EchoRequest);
223         set_response(new EchoResponse);
224       }
Release()225       void Release() override {
226         (*messages_deallocation_count_)++;
227         delete request();
228         delete response();
229         delete this;
230       }
FreeRequest()231       void FreeRequest() override {
232         (*request_deallocation_count_)++;
233         delete request();
234         set_request(nullptr);
235       }
236 
ReleaseRequest()237       EchoRequest* ReleaseRequest() {
238         auto* ret = request();
239         set_request(nullptr);
240         return ret;
241       }
242 
243      private:
244       std::atomic_int* const request_deallocation_count_;
245       std::atomic_int* const messages_deallocation_count_;
246     };
AllocateMessages()247     experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
248         override {
249       allocation_count++;
250       return new MessageHolderImpl(&request_deallocation_count,
251                                    &messages_deallocation_count);
252     }
253     int allocation_count = 0;
254     std::atomic_int request_deallocation_count{0};
255     std::atomic_int messages_deallocation_count{0};
256   };
257 };
258 
TEST_P(SimpleAllocatorTest,SimpleRpc)259 TEST_P(SimpleAllocatorTest, SimpleRpc) {
260   const int kRpcCount = 10;
261   std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
262   CreateServer(allocator.get());
263   ResetStub();
264   SendRpcs(kRpcCount);
265   // messages_deallocaton_count is updated in Release after server side OnDone.
266   // Destroy server to make sure it has been updated.
267   DestroyServer();
268   EXPECT_EQ(kRpcCount, allocator->allocation_count);
269   EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
270   EXPECT_EQ(0, allocator->request_deallocation_count);
271 }
272 
TEST_P(SimpleAllocatorTest,RpcWithEarlyFreeRequest)273 TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) {
274   const int kRpcCount = 10;
275   std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
276   auto mutator = [](experimental::RpcAllocatorState* allocator_state,
277                     const EchoRequest* req, EchoResponse* resp) {
278     auto* info =
279         static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
280     EXPECT_EQ(req, info->request());
281     EXPECT_EQ(resp, info->response());
282     allocator_state->FreeRequest();
283     EXPECT_EQ(nullptr, info->request());
284   };
285   callback_service_.SetAllocatorMutator(mutator);
286   CreateServer(allocator.get());
287   ResetStub();
288   SendRpcs(kRpcCount);
289   // messages_deallocaton_count is updated in Release after server side OnDone.
290   // Destroy server to make sure it has been updated.
291   DestroyServer();
292   EXPECT_EQ(kRpcCount, allocator->allocation_count);
293   EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
294   EXPECT_EQ(kRpcCount, allocator->request_deallocation_count);
295 }
296 
TEST_P(SimpleAllocatorTest,RpcWithReleaseRequest)297 TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
298   const int kRpcCount = 10;
299   std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
300   std::vector<EchoRequest*> released_requests;
301   auto mutator = [&released_requests](
302                      experimental::RpcAllocatorState* allocator_state,
303                      const EchoRequest* req, EchoResponse* resp) {
304     auto* info =
305         static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
306     EXPECT_EQ(req, info->request());
307     EXPECT_EQ(resp, info->response());
308     released_requests.push_back(info->ReleaseRequest());
309     EXPECT_EQ(nullptr, info->request());
310   };
311   callback_service_.SetAllocatorMutator(mutator);
312   CreateServer(allocator.get());
313   ResetStub();
314   SendRpcs(kRpcCount);
315   // messages_deallocaton_count is updated in Release after server side OnDone.
316   // Destroy server to make sure it has been updated.
317   DestroyServer();
318   EXPECT_EQ(kRpcCount, allocator->allocation_count);
319   EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
320   EXPECT_EQ(0, allocator->request_deallocation_count);
321   EXPECT_EQ(static_cast<unsigned>(kRpcCount), released_requests.size());
322   for (auto* req : released_requests) {
323     delete req;
324   }
325 }
326 
327 class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
328  public:
329   class ArenaAllocator
330       : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
331    public:
332     class MessageHolderImpl
333         : public experimental::MessageHolder<EchoRequest, EchoResponse> {
334      public:
MessageHolderImpl()335       MessageHolderImpl() {
336         set_request(
337             google::protobuf::Arena::CreateMessage<EchoRequest>(&arena_));
338         set_response(
339             google::protobuf::Arena::CreateMessage<EchoResponse>(&arena_));
340       }
Release()341       void Release() override { delete this; }
FreeRequest()342       void FreeRequest() override { GPR_ASSERT(0); }
343 
344      private:
345       google::protobuf::Arena arena_;
346     };
AllocateMessages()347     experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
348         override {
349       allocation_count++;
350       return new MessageHolderImpl;
351     }
352     int allocation_count = 0;
353   };
354 };
355 
TEST_P(ArenaAllocatorTest,SimpleRpc)356 TEST_P(ArenaAllocatorTest, SimpleRpc) {
357   const int kRpcCount = 10;
358   std::unique_ptr<ArenaAllocator> allocator(new ArenaAllocator);
359   CreateServer(allocator.get());
360   ResetStub();
361   SendRpcs(kRpcCount);
362   EXPECT_EQ(kRpcCount, allocator->allocation_count);
363 }
364 
CreateTestScenarios(bool test_insecure)365 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
366   std::vector<TestScenario> scenarios;
367   std::vector<std::string> credentials_types{
368       GetCredentialsProvider()->GetSecureCredentialsTypeList()};
369   auto insec_ok = [] {
370     // Only allow insecure credentials type when it is registered with the
371     // provider. User may create providers that do not have insecure.
372     return GetCredentialsProvider()->GetChannelCredentials(
373                kInsecureCredentialsType, nullptr) != nullptr;
374   };
375   if (test_insecure && insec_ok()) {
376     credentials_types.push_back(kInsecureCredentialsType);
377   }
378   GPR_ASSERT(!credentials_types.empty());
379 
380   Protocol parr[]{Protocol::INPROC, Protocol::TCP};
381   for (Protocol p : parr) {
382     for (const auto& cred : credentials_types) {
383       // TODO(vjpai): Test inproc with secure credentials when feasible
384       if (p == Protocol::INPROC &&
385           (cred != kInsecureCredentialsType || !insec_ok())) {
386         continue;
387       }
388       scenarios.emplace_back(p, cred);
389     }
390   }
391   return scenarios;
392 }
393 
394 INSTANTIATE_TEST_SUITE_P(NullAllocatorTest, NullAllocatorTest,
395                          ::testing::ValuesIn(CreateTestScenarios(true)));
396 INSTANTIATE_TEST_SUITE_P(SimpleAllocatorTest, SimpleAllocatorTest,
397                          ::testing::ValuesIn(CreateTestScenarios(true)));
398 INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest,
399                          ::testing::ValuesIn(CreateTestScenarios(true)));
400 
401 }  // namespace
402 }  // namespace testing
403 }  // namespace grpc
404 
main(int argc,char ** argv)405 int main(int argc, char** argv) {
406   grpc::testing::TestEnvironment env(argc, argv);
407   ::testing::InitGoogleTest(&argc, argv);
408   return RUN_ALL_TESTS();
409 }
410