1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <algorithm>
20 #include <atomic>
21 #include <condition_variable>
22 #include <functional>
23 #include <memory>
24 #include <mutex>
25 #include <sstream>
26 #include <thread>
27
28 #include <google/protobuf/arena.h>
29
30 #include <grpc/impl/codegen/log.h>
31 #include <gtest/gtest.h>
32
33 #include <grpcpp/channel.h>
34 #include <grpcpp/client_context.h>
35 #include <grpcpp/create_channel.h>
36 #include <grpcpp/server.h>
37 #include <grpcpp/server_builder.h>
38 #include <grpcpp/server_context.h>
39 #include <grpcpp/support/client_callback.h>
40 #include <grpcpp/support/message_allocator.h>
41
42 #include "src/core/lib/iomgr/iomgr.h"
43 #include "src/proto/grpc/testing/echo.grpc.pb.h"
44 #include "test/core/util/port.h"
45 #include "test/core/util/test_config.h"
46 #include "test/cpp/util/test_credentials_provider.h"
47
48 namespace grpc {
49 namespace testing {
50 namespace {
51
52 class CallbackTestServiceImpl
53 : public EchoTestService::ExperimentalCallbackService {
54 public:
CallbackTestServiceImpl()55 explicit CallbackTestServiceImpl() {}
56
SetAllocatorMutator(std::function<void (experimental::RpcAllocatorState * allocator_state,const EchoRequest * req,EchoResponse * resp)> mutator)57 void SetAllocatorMutator(
58 std::function<void(experimental::RpcAllocatorState* allocator_state,
59 const EchoRequest* req, EchoResponse* resp)>
60 mutator) {
61 allocator_mutator_ = mutator;
62 }
63
Echo(experimental::CallbackServerContext * context,const EchoRequest * request,EchoResponse * response)64 experimental::ServerUnaryReactor* Echo(
65 experimental::CallbackServerContext* context, const EchoRequest* request,
66 EchoResponse* response) override {
67 response->set_message(request->message());
68 if (allocator_mutator_) {
69 allocator_mutator_(context->GetRpcAllocatorState(), request, response);
70 }
71 auto* reactor = context->DefaultReactor();
72 reactor->Finish(Status::OK);
73 return reactor;
74 }
75
76 private:
77 std::function<void(experimental::RpcAllocatorState* allocator_state,
78 const EchoRequest* req, EchoResponse* resp)>
79 allocator_mutator_;
80 };
81
82 enum class Protocol { INPROC, TCP };
83
84 class TestScenario {
85 public:
TestScenario(Protocol protocol,const std::string & creds_type)86 TestScenario(Protocol protocol, const std::string& creds_type)
87 : protocol(protocol), credentials_type(creds_type) {}
88 void Log() const;
89 Protocol protocol;
90 const std::string credentials_type;
91 };
92
operator <<(std::ostream & out,const TestScenario & scenario)93 static std::ostream& operator<<(std::ostream& out,
94 const TestScenario& scenario) {
95 return out << "TestScenario{protocol="
96 << (scenario.protocol == Protocol::INPROC ? "INPROC" : "TCP")
97 << "," << scenario.credentials_type << "}";
98 }
99
Log() const100 void TestScenario::Log() const {
101 std::ostringstream out;
102 out << *this;
103 gpr_log(GPR_INFO, "%s", out.str().c_str());
104 }
105
106 class MessageAllocatorEnd2endTestBase
107 : public ::testing::TestWithParam<TestScenario> {
108 protected:
MessageAllocatorEnd2endTestBase()109 MessageAllocatorEnd2endTestBase() { GetParam().Log(); }
110
111 ~MessageAllocatorEnd2endTestBase() = default;
112
CreateServer(experimental::MessageAllocator<EchoRequest,EchoResponse> * allocator)113 void CreateServer(
114 experimental::MessageAllocator<EchoRequest, EchoResponse>* allocator) {
115 ServerBuilder builder;
116
117 auto server_creds = GetCredentialsProvider()->GetServerCredentials(
118 GetParam().credentials_type);
119 if (GetParam().protocol == Protocol::TCP) {
120 picked_port_ = grpc_pick_unused_port_or_die();
121 server_address_ << "localhost:" << picked_port_;
122 builder.AddListeningPort(server_address_.str(), server_creds);
123 }
124 callback_service_.SetMessageAllocatorFor_Echo(allocator);
125 builder.RegisterService(&callback_service_);
126
127 server_ = builder.BuildAndStart();
128 }
129
DestroyServer()130 void DestroyServer() {
131 if (server_) {
132 server_->Shutdown();
133 server_.reset();
134 }
135 }
136
ResetStub()137 void ResetStub() {
138 ChannelArguments args;
139 auto channel_creds = GetCredentialsProvider()->GetChannelCredentials(
140 GetParam().credentials_type, &args);
141 switch (GetParam().protocol) {
142 case Protocol::TCP:
143 channel_ = ::grpc::CreateCustomChannel(server_address_.str(),
144 channel_creds, args);
145 break;
146 case Protocol::INPROC:
147 channel_ = server_->InProcessChannel(args);
148 break;
149 default:
150 assert(false);
151 }
152 stub_ = EchoTestService::NewStub(channel_);
153 }
154
TearDown()155 void TearDown() override {
156 DestroyServer();
157 if (picked_port_ > 0) {
158 grpc_recycle_unused_port(picked_port_);
159 }
160 }
161
SendRpcs(int num_rpcs)162 void SendRpcs(int num_rpcs) {
163 std::string test_string("");
164 for (int i = 0; i < num_rpcs; i++) {
165 EchoRequest request;
166 EchoResponse response;
167 ClientContext cli_ctx;
168
169 test_string += std::string(1024, 'x');
170 request.set_message(test_string);
171 std::string val;
172 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
173
174 std::mutex mu;
175 std::condition_variable cv;
176 bool done = false;
177 stub_->experimental_async()->Echo(
178 &cli_ctx, &request, &response,
179 [&request, &response, &done, &mu, &cv, val](Status s) {
180 GPR_ASSERT(s.ok());
181
182 EXPECT_EQ(request.message(), response.message());
183 std::lock_guard<std::mutex> l(mu);
184 done = true;
185 cv.notify_one();
186 });
187 std::unique_lock<std::mutex> l(mu);
188 while (!done) {
189 cv.wait(l);
190 }
191 }
192 }
193
194 int picked_port_{0};
195 std::shared_ptr<Channel> channel_;
196 std::unique_ptr<EchoTestService::Stub> stub_;
197 CallbackTestServiceImpl callback_service_;
198 std::unique_ptr<Server> server_;
199 std::ostringstream server_address_;
200 };
201
202 class NullAllocatorTest : public MessageAllocatorEnd2endTestBase {};
203
TEST_P(NullAllocatorTest,SimpleRpc)204 TEST_P(NullAllocatorTest, SimpleRpc) {
205 CreateServer(nullptr);
206 ResetStub();
207 SendRpcs(1);
208 }
209
210 class SimpleAllocatorTest : public MessageAllocatorEnd2endTestBase {
211 public:
212 class SimpleAllocator
213 : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
214 public:
215 class MessageHolderImpl
216 : public experimental::MessageHolder<EchoRequest, EchoResponse> {
217 public:
MessageHolderImpl(std::atomic_int * request_deallocation_count,std::atomic_int * messages_deallocation_count)218 MessageHolderImpl(std::atomic_int* request_deallocation_count,
219 std::atomic_int* messages_deallocation_count)
220 : request_deallocation_count_(request_deallocation_count),
221 messages_deallocation_count_(messages_deallocation_count) {
222 set_request(new EchoRequest);
223 set_response(new EchoResponse);
224 }
Release()225 void Release() override {
226 (*messages_deallocation_count_)++;
227 delete request();
228 delete response();
229 delete this;
230 }
FreeRequest()231 void FreeRequest() override {
232 (*request_deallocation_count_)++;
233 delete request();
234 set_request(nullptr);
235 }
236
ReleaseRequest()237 EchoRequest* ReleaseRequest() {
238 auto* ret = request();
239 set_request(nullptr);
240 return ret;
241 }
242
243 private:
244 std::atomic_int* const request_deallocation_count_;
245 std::atomic_int* const messages_deallocation_count_;
246 };
AllocateMessages()247 experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
248 override {
249 allocation_count++;
250 return new MessageHolderImpl(&request_deallocation_count,
251 &messages_deallocation_count);
252 }
253 int allocation_count = 0;
254 std::atomic_int request_deallocation_count{0};
255 std::atomic_int messages_deallocation_count{0};
256 };
257 };
258
TEST_P(SimpleAllocatorTest,SimpleRpc)259 TEST_P(SimpleAllocatorTest, SimpleRpc) {
260 const int kRpcCount = 10;
261 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
262 CreateServer(allocator.get());
263 ResetStub();
264 SendRpcs(kRpcCount);
265 // messages_deallocaton_count is updated in Release after server side OnDone.
266 // Destroy server to make sure it has been updated.
267 DestroyServer();
268 EXPECT_EQ(kRpcCount, allocator->allocation_count);
269 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
270 EXPECT_EQ(0, allocator->request_deallocation_count);
271 }
272
TEST_P(SimpleAllocatorTest,RpcWithEarlyFreeRequest)273 TEST_P(SimpleAllocatorTest, RpcWithEarlyFreeRequest) {
274 const int kRpcCount = 10;
275 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
276 auto mutator = [](experimental::RpcAllocatorState* allocator_state,
277 const EchoRequest* req, EchoResponse* resp) {
278 auto* info =
279 static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
280 EXPECT_EQ(req, info->request());
281 EXPECT_EQ(resp, info->response());
282 allocator_state->FreeRequest();
283 EXPECT_EQ(nullptr, info->request());
284 };
285 callback_service_.SetAllocatorMutator(mutator);
286 CreateServer(allocator.get());
287 ResetStub();
288 SendRpcs(kRpcCount);
289 // messages_deallocaton_count is updated in Release after server side OnDone.
290 // Destroy server to make sure it has been updated.
291 DestroyServer();
292 EXPECT_EQ(kRpcCount, allocator->allocation_count);
293 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
294 EXPECT_EQ(kRpcCount, allocator->request_deallocation_count);
295 }
296
TEST_P(SimpleAllocatorTest,RpcWithReleaseRequest)297 TEST_P(SimpleAllocatorTest, RpcWithReleaseRequest) {
298 const int kRpcCount = 10;
299 std::unique_ptr<SimpleAllocator> allocator(new SimpleAllocator);
300 std::vector<EchoRequest*> released_requests;
301 auto mutator = [&released_requests](
302 experimental::RpcAllocatorState* allocator_state,
303 const EchoRequest* req, EchoResponse* resp) {
304 auto* info =
305 static_cast<SimpleAllocator::MessageHolderImpl*>(allocator_state);
306 EXPECT_EQ(req, info->request());
307 EXPECT_EQ(resp, info->response());
308 released_requests.push_back(info->ReleaseRequest());
309 EXPECT_EQ(nullptr, info->request());
310 };
311 callback_service_.SetAllocatorMutator(mutator);
312 CreateServer(allocator.get());
313 ResetStub();
314 SendRpcs(kRpcCount);
315 // messages_deallocaton_count is updated in Release after server side OnDone.
316 // Destroy server to make sure it has been updated.
317 DestroyServer();
318 EXPECT_EQ(kRpcCount, allocator->allocation_count);
319 EXPECT_EQ(kRpcCount, allocator->messages_deallocation_count);
320 EXPECT_EQ(0, allocator->request_deallocation_count);
321 EXPECT_EQ(static_cast<unsigned>(kRpcCount), released_requests.size());
322 for (auto* req : released_requests) {
323 delete req;
324 }
325 }
326
327 class ArenaAllocatorTest : public MessageAllocatorEnd2endTestBase {
328 public:
329 class ArenaAllocator
330 : public experimental::MessageAllocator<EchoRequest, EchoResponse> {
331 public:
332 class MessageHolderImpl
333 : public experimental::MessageHolder<EchoRequest, EchoResponse> {
334 public:
MessageHolderImpl()335 MessageHolderImpl() {
336 set_request(
337 google::protobuf::Arena::CreateMessage<EchoRequest>(&arena_));
338 set_response(
339 google::protobuf::Arena::CreateMessage<EchoResponse>(&arena_));
340 }
Release()341 void Release() override { delete this; }
FreeRequest()342 void FreeRequest() override { GPR_ASSERT(0); }
343
344 private:
345 google::protobuf::Arena arena_;
346 };
AllocateMessages()347 experimental::MessageHolder<EchoRequest, EchoResponse>* AllocateMessages()
348 override {
349 allocation_count++;
350 return new MessageHolderImpl;
351 }
352 int allocation_count = 0;
353 };
354 };
355
TEST_P(ArenaAllocatorTest,SimpleRpc)356 TEST_P(ArenaAllocatorTest, SimpleRpc) {
357 const int kRpcCount = 10;
358 std::unique_ptr<ArenaAllocator> allocator(new ArenaAllocator);
359 CreateServer(allocator.get());
360 ResetStub();
361 SendRpcs(kRpcCount);
362 EXPECT_EQ(kRpcCount, allocator->allocation_count);
363 }
364
CreateTestScenarios(bool test_insecure)365 std::vector<TestScenario> CreateTestScenarios(bool test_insecure) {
366 std::vector<TestScenario> scenarios;
367 std::vector<std::string> credentials_types{
368 GetCredentialsProvider()->GetSecureCredentialsTypeList()};
369 auto insec_ok = [] {
370 // Only allow insecure credentials type when it is registered with the
371 // provider. User may create providers that do not have insecure.
372 return GetCredentialsProvider()->GetChannelCredentials(
373 kInsecureCredentialsType, nullptr) != nullptr;
374 };
375 if (test_insecure && insec_ok()) {
376 credentials_types.push_back(kInsecureCredentialsType);
377 }
378 GPR_ASSERT(!credentials_types.empty());
379
380 Protocol parr[]{Protocol::INPROC, Protocol::TCP};
381 for (Protocol p : parr) {
382 for (const auto& cred : credentials_types) {
383 // TODO(vjpai): Test inproc with secure credentials when feasible
384 if (p == Protocol::INPROC &&
385 (cred != kInsecureCredentialsType || !insec_ok())) {
386 continue;
387 }
388 scenarios.emplace_back(p, cred);
389 }
390 }
391 return scenarios;
392 }
393
394 INSTANTIATE_TEST_SUITE_P(NullAllocatorTest, NullAllocatorTest,
395 ::testing::ValuesIn(CreateTestScenarios(true)));
396 INSTANTIATE_TEST_SUITE_P(SimpleAllocatorTest, SimpleAllocatorTest,
397 ::testing::ValuesIn(CreateTestScenarios(true)));
398 INSTANTIATE_TEST_SUITE_P(ArenaAllocatorTest, ArenaAllocatorTest,
399 ::testing::ValuesIn(CreateTestScenarios(true)));
400
401 } // namespace
402 } // namespace testing
403 } // namespace grpc
404
main(int argc,char ** argv)405 int main(int argc, char** argv) {
406 grpc::testing::TestEnvironment env(argc, argv);
407 ::testing::InitGoogleTest(&argc, argv);
408 return RUN_ALL_TESTS();
409 }
410