1 /*
2 *
3 * Copyright 2018 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <functional>
20 #include <mutex>
21 #include <thread>
22
23 #include <grpcpp/channel.h>
24 #include <grpcpp/client_context.h>
25 #include <grpcpp/create_channel.h>
26 #include <grpcpp/generic/generic_stub.h>
27 #include <grpcpp/impl/codegen/proto_utils.h>
28 #include <grpcpp/server.h>
29 #include <grpcpp/server_builder.h>
30 #include <grpcpp/server_context.h>
31 #include <grpcpp/support/client_callback.h>
32
33 #include "src/proto/grpc/testing/echo.grpc.pb.h"
34 #include "test/core/util/test_config.h"
35 #include "test/cpp/end2end/test_service_impl.h"
36 #include "test/cpp/util/byte_buffer_proto_helper.h"
37
38 #include <gtest/gtest.h>
39
40 namespace grpc {
41 namespace testing {
42 namespace {
43
44 class ClientCallbackEnd2endTest : public ::testing::Test {
45 protected:
ClientCallbackEnd2endTest()46 ClientCallbackEnd2endTest() {}
47
SetUp()48 void SetUp() override {
49 ServerBuilder builder;
50 builder.RegisterService(&service_);
51
52 server_ = builder.BuildAndStart();
53 is_server_started_ = true;
54 }
55
ResetStub()56 void ResetStub() {
57 ChannelArguments args;
58 channel_ = server_->InProcessChannel(args);
59 stub_ = grpc::testing::EchoTestService::NewStub(channel_);
60 generic_stub_.reset(new GenericStub(channel_));
61 }
62
TearDown()63 void TearDown() override {
64 if (is_server_started_) {
65 server_->Shutdown();
66 }
67 }
68
SendRpcs(int num_rpcs,bool with_binary_metadata)69 void SendRpcs(int num_rpcs, bool with_binary_metadata) {
70 grpc::string test_string("");
71 for (int i = 0; i < num_rpcs; i++) {
72 EchoRequest request;
73 EchoResponse response;
74 ClientContext cli_ctx;
75
76 test_string += "Hello world. ";
77 request.set_message(test_string);
78
79 if (with_binary_metadata) {
80 char bytes[8] = {'\0', '\1', '\2', '\3',
81 '\4', '\5', '\6', static_cast<char>(i)};
82 cli_ctx.AddMetadata("custom-bin", grpc::string(bytes, 8));
83 }
84
85 cli_ctx.set_compression_algorithm(GRPC_COMPRESS_GZIP);
86
87 std::mutex mu;
88 std::condition_variable cv;
89 bool done = false;
90 stub_->experimental_async()->Echo(
91 &cli_ctx, &request, &response,
92 [&request, &response, &done, &mu, &cv](Status s) {
93 GPR_ASSERT(s.ok());
94
95 EXPECT_EQ(request.message(), response.message());
96 std::lock_guard<std::mutex> l(mu);
97 done = true;
98 cv.notify_one();
99 });
100 std::unique_lock<std::mutex> l(mu);
101 while (!done) {
102 cv.wait(l);
103 }
104 }
105 }
106
SendRpcsGeneric(int num_rpcs,bool maybe_except)107 void SendRpcsGeneric(int num_rpcs, bool maybe_except) {
108 const grpc::string kMethodName("/grpc.testing.EchoTestService/Echo");
109 grpc::string test_string("");
110 for (int i = 0; i < num_rpcs; i++) {
111 EchoRequest request;
112 std::unique_ptr<ByteBuffer> send_buf;
113 ByteBuffer recv_buf;
114 ClientContext cli_ctx;
115
116 test_string += "Hello world. ";
117 request.set_message(test_string);
118 send_buf = SerializeToByteBuffer(&request);
119
120 std::mutex mu;
121 std::condition_variable cv;
122 bool done = false;
123 generic_stub_->experimental().UnaryCall(
124 &cli_ctx, kMethodName, send_buf.get(), &recv_buf,
125 [&request, &recv_buf, &done, &mu, &cv, maybe_except](Status s) {
126 GPR_ASSERT(s.ok());
127
128 EchoResponse response;
129 EXPECT_TRUE(ParseFromByteBuffer(&recv_buf, &response));
130 EXPECT_EQ(request.message(), response.message());
131 std::lock_guard<std::mutex> l(mu);
132 done = true;
133 cv.notify_one();
134 #if GRPC_ALLOW_EXCEPTIONS
135 if (maybe_except) {
136 throw - 1;
137 }
138 #else
139 GPR_ASSERT(!maybe_except);
140 #endif
141 });
142 std::unique_lock<std::mutex> l(mu);
143 while (!done) {
144 cv.wait(l);
145 }
146 }
147 }
148
149 bool is_server_started_;
150 std::shared_ptr<Channel> channel_;
151 std::unique_ptr<grpc::testing::EchoTestService::Stub> stub_;
152 std::unique_ptr<grpc::GenericStub> generic_stub_;
153 TestServiceImpl service_;
154 std::unique_ptr<Server> server_;
155 };
156
TEST_F(ClientCallbackEnd2endTest,SimpleRpc)157 TEST_F(ClientCallbackEnd2endTest, SimpleRpc) {
158 ResetStub();
159 SendRpcs(1, false);
160 }
161
TEST_F(ClientCallbackEnd2endTest,SequentialRpcs)162 TEST_F(ClientCallbackEnd2endTest, SequentialRpcs) {
163 ResetStub();
164 SendRpcs(10, false);
165 }
166
TEST_F(ClientCallbackEnd2endTest,SequentialRpcsWithVariedBinaryMetadataValue)167 TEST_F(ClientCallbackEnd2endTest, SequentialRpcsWithVariedBinaryMetadataValue) {
168 ResetStub();
169 SendRpcs(10, true);
170 }
171
TEST_F(ClientCallbackEnd2endTest,SequentialGenericRpcs)172 TEST_F(ClientCallbackEnd2endTest, SequentialGenericRpcs) {
173 ResetStub();
174 SendRpcsGeneric(10, false);
175 }
176
177 #if GRPC_ALLOW_EXCEPTIONS
TEST_F(ClientCallbackEnd2endTest,ExceptingRpc)178 TEST_F(ClientCallbackEnd2endTest, ExceptingRpc) {
179 ResetStub();
180 SendRpcsGeneric(10, true);
181 }
182 #endif
183
TEST_F(ClientCallbackEnd2endTest,MultipleRpcsWithVariedBinaryMetadataValue)184 TEST_F(ClientCallbackEnd2endTest, MultipleRpcsWithVariedBinaryMetadataValue) {
185 ResetStub();
186 std::vector<std::thread> threads;
187 threads.reserve(10);
188 for (int i = 0; i < 10; ++i) {
189 threads.emplace_back([this] { SendRpcs(10, true); });
190 }
191 for (int i = 0; i < 10; ++i) {
192 threads[i].join();
193 }
194 }
195
TEST_F(ClientCallbackEnd2endTest,MultipleRpcs)196 TEST_F(ClientCallbackEnd2endTest, MultipleRpcs) {
197 ResetStub();
198 std::vector<std::thread> threads;
199 threads.reserve(10);
200 for (int i = 0; i < 10; ++i) {
201 threads.emplace_back([this] { SendRpcs(10, false); });
202 }
203 for (int i = 0; i < 10; ++i) {
204 threads[i].join();
205 }
206 }
207
TEST_F(ClientCallbackEnd2endTest,CancelRpcBeforeStart)208 TEST_F(ClientCallbackEnd2endTest, CancelRpcBeforeStart) {
209 ResetStub();
210 EchoRequest request;
211 EchoResponse response;
212 ClientContext context;
213 request.set_message("hello");
214 context.TryCancel();
215
216 std::mutex mu;
217 std::condition_variable cv;
218 bool done = false;
219 stub_->experimental_async()->Echo(
220 &context, &request, &response, [&response, &done, &mu, &cv](Status s) {
221 EXPECT_EQ("", response.message());
222 EXPECT_EQ(grpc::StatusCode::CANCELLED, s.error_code());
223 std::lock_guard<std::mutex> l(mu);
224 done = true;
225 cv.notify_one();
226 });
227 std::unique_lock<std::mutex> l(mu);
228 while (!done) {
229 cv.wait(l);
230 }
231 }
232
233 } // namespace
234 } // namespace testing
235 } // namespace grpc
236
main(int argc,char ** argv)237 int main(int argc, char** argv) {
238 grpc_test_init(argc, argv);
239 ::testing::InitGoogleTest(&argc, argv);
240 return RUN_ALL_TESTS();
241 }
242