1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/cpp/util/cli_call.h"
20
21 #include <iostream>
22 #include <utility>
23
24 #include <grpc/grpc.h>
25 #include <grpc/slice.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/support/byte_buffer.h>
30
31 namespace grpc {
32 namespace testing {
33 namespace {
tag(int i)34 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
35 } // namespace
36
Call(const std::shared_ptr<grpc::Channel> & channel,const std::string & method,const std::string & request,std::string * response,const OutgoingMetadataContainer & metadata,IncomingMetadataContainer * server_initial_metadata,IncomingMetadataContainer * server_trailing_metadata)37 Status CliCall::Call(const std::shared_ptr<grpc::Channel>& channel,
38 const std::string& method, const std::string& request,
39 std::string* response,
40 const OutgoingMetadataContainer& metadata,
41 IncomingMetadataContainer* server_initial_metadata,
42 IncomingMetadataContainer* server_trailing_metadata) {
43 CliCall call(channel, method, metadata);
44 call.Write(request);
45 call.WritesDone();
46 if (!call.Read(response, server_initial_metadata)) {
47 fprintf(stderr, "Failed to read response.\n");
48 }
49 return call.Finish(server_trailing_metadata);
50 }
51
CliCall(const std::shared_ptr<grpc::Channel> & channel,const std::string & method,const OutgoingMetadataContainer & metadata)52 CliCall::CliCall(const std::shared_ptr<grpc::Channel>& channel,
53 const std::string& method,
54 const OutgoingMetadataContainer& metadata)
55 : stub_(new grpc::GenericStub(channel)) {
56 gpr_mu_init(&write_mu_);
57 gpr_cv_init(&write_cv_);
58 if (!metadata.empty()) {
59 for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
60 iter != metadata.end(); ++iter) {
61 ctx_.AddMetadata(iter->first, iter->second);
62 }
63 }
64 call_ = stub_->PrepareCall(&ctx_, method, &cq_);
65 call_->StartCall(tag(1));
66 void* got_tag;
67 bool ok;
68 cq_.Next(&got_tag, &ok);
69 GPR_ASSERT(ok);
70 }
71
~CliCall()72 CliCall::~CliCall() {
73 gpr_cv_destroy(&write_cv_);
74 gpr_mu_destroy(&write_mu_);
75 }
76
Write(const std::string & request)77 void CliCall::Write(const std::string& request) {
78 void* got_tag;
79 bool ok;
80
81 gpr_slice s = gpr_slice_from_copied_buffer(request.data(), request.size());
82 grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
83 grpc::ByteBuffer send_buffer(&req_slice, 1);
84 call_->Write(send_buffer, tag(2));
85 cq_.Next(&got_tag, &ok);
86 GPR_ASSERT(ok);
87 }
88
Read(std::string * response,IncomingMetadataContainer * server_initial_metadata)89 bool CliCall::Read(std::string* response,
90 IncomingMetadataContainer* server_initial_metadata) {
91 void* got_tag;
92 bool ok;
93
94 grpc::ByteBuffer recv_buffer;
95 call_->Read(&recv_buffer, tag(3));
96
97 if (!cq_.Next(&got_tag, &ok) || !ok) {
98 return false;
99 }
100 std::vector<grpc::Slice> slices;
101 GPR_ASSERT(recv_buffer.Dump(&slices).ok());
102
103 response->clear();
104 for (size_t i = 0; i < slices.size(); i++) {
105 response->append(reinterpret_cast<const char*>(slices[i].begin()),
106 slices[i].size());
107 }
108 if (server_initial_metadata) {
109 *server_initial_metadata = ctx_.GetServerInitialMetadata();
110 }
111 return true;
112 }
113
WritesDone()114 void CliCall::WritesDone() {
115 void* got_tag;
116 bool ok;
117
118 call_->WritesDone(tag(4));
119 cq_.Next(&got_tag, &ok);
120 GPR_ASSERT(ok);
121 }
122
WriteAndWait(const std::string & request)123 void CliCall::WriteAndWait(const std::string& request) {
124 grpc::Slice req_slice(request);
125 grpc::ByteBuffer send_buffer(&req_slice, 1);
126
127 gpr_mu_lock(&write_mu_);
128 call_->Write(send_buffer, tag(2));
129 write_done_ = false;
130 while (!write_done_) {
131 gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
132 }
133 gpr_mu_unlock(&write_mu_);
134 }
135
WritesDoneAndWait()136 void CliCall::WritesDoneAndWait() {
137 gpr_mu_lock(&write_mu_);
138 call_->WritesDone(tag(4));
139 write_done_ = false;
140 while (!write_done_) {
141 gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
142 }
143 gpr_mu_unlock(&write_mu_);
144 }
145
ReadAndMaybeNotifyWrite(std::string * response,IncomingMetadataContainer * server_initial_metadata)146 bool CliCall::ReadAndMaybeNotifyWrite(
147 std::string* response, IncomingMetadataContainer* server_initial_metadata) {
148 void* got_tag;
149 bool ok;
150 grpc::ByteBuffer recv_buffer;
151
152 call_->Read(&recv_buffer, tag(3));
153 bool cq_result = cq_.Next(&got_tag, &ok);
154
155 while (got_tag != tag(3)) {
156 gpr_mu_lock(&write_mu_);
157 write_done_ = true;
158 gpr_cv_signal(&write_cv_);
159 gpr_mu_unlock(&write_mu_);
160
161 cq_result = cq_.Next(&got_tag, &ok);
162 if (got_tag == tag(2)) {
163 GPR_ASSERT(ok);
164 }
165 }
166
167 if (!cq_result || !ok) {
168 // If the RPC is ended on the server side, we should still wait for the
169 // pending write on the client side to be done.
170 if (!ok) {
171 gpr_mu_lock(&write_mu_);
172 if (!write_done_) {
173 cq_.Next(&got_tag, &ok);
174 GPR_ASSERT(got_tag != tag(2));
175 write_done_ = true;
176 gpr_cv_signal(&write_cv_);
177 }
178 gpr_mu_unlock(&write_mu_);
179 }
180 return false;
181 }
182
183 std::vector<grpc::Slice> slices;
184 GPR_ASSERT(recv_buffer.Dump(&slices).ok());
185 response->clear();
186 for (size_t i = 0; i < slices.size(); i++) {
187 response->append(reinterpret_cast<const char*>(slices[i].begin()),
188 slices[i].size());
189 }
190 if (server_initial_metadata) {
191 *server_initial_metadata = ctx_.GetServerInitialMetadata();
192 }
193 return true;
194 }
195
Finish(IncomingMetadataContainer * server_trailing_metadata)196 Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
197 void* got_tag;
198 bool ok;
199 grpc::Status status;
200
201 call_->Finish(&status, tag(5));
202 cq_.Next(&got_tag, &ok);
203 GPR_ASSERT(ok);
204 if (server_trailing_metadata) {
205 *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
206 }
207
208 return status;
209 }
210
211 } // namespace testing
212 } // namespace grpc
213