1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "test/cpp/util/cli_call.h"
20
21 #include <iostream>
22 #include <utility>
23
24 #include <grpc/grpc.h>
25 #include <grpc/slice.h>
26 #include <grpc/support/log.h>
27 #include <grpcpp/channel.h>
28 #include <grpcpp/client_context.h>
29 #include <grpcpp/support/byte_buffer.h>
30
31 namespace grpc {
32 namespace testing {
33 namespace {
tag(int i)34 void* tag(int i) { return (void*)static_cast<intptr_t>(i); }
35 } // namespace
36
Call(std::shared_ptr<grpc::Channel> channel,const grpc::string & method,const grpc::string & request,grpc::string * response,const OutgoingMetadataContainer & metadata,IncomingMetadataContainer * server_initial_metadata,IncomingMetadataContainer * server_trailing_metadata)37 Status CliCall::Call(std::shared_ptr<grpc::Channel> channel,
38 const grpc::string& method, const grpc::string& request,
39 grpc::string* response,
40 const OutgoingMetadataContainer& metadata,
41 IncomingMetadataContainer* server_initial_metadata,
42 IncomingMetadataContainer* server_trailing_metadata) {
43 CliCall call(std::move(channel), method, metadata);
44 call.Write(request);
45 call.WritesDone();
46 if (!call.Read(response, server_initial_metadata)) {
47 fprintf(stderr, "Failed to read response.\n");
48 }
49 return call.Finish(server_trailing_metadata);
50 }
51
CliCall(const std::shared_ptr<grpc::Channel> & channel,const grpc::string & method,const OutgoingMetadataContainer & metadata)52 CliCall::CliCall(const std::shared_ptr<grpc::Channel>& channel,
53 const grpc::string& method,
54 const OutgoingMetadataContainer& metadata)
55 : stub_(new grpc::GenericStub(channel)) {
56 gpr_mu_init(&write_mu_);
57 gpr_cv_init(&write_cv_);
58 if (!metadata.empty()) {
59 for (OutgoingMetadataContainer::const_iterator iter = metadata.begin();
60 iter != metadata.end(); ++iter) {
61 ctx_.AddMetadata(iter->first, iter->second);
62 }
63 }
64 call_ = stub_->PrepareCall(&ctx_, method, &cq_);
65 call_->StartCall(tag(1));
66 void* got_tag;
67 bool ok;
68 cq_.Next(&got_tag, &ok);
69 GPR_ASSERT(ok);
70 }
71
~CliCall()72 CliCall::~CliCall() {
73 gpr_cv_destroy(&write_cv_);
74 gpr_mu_destroy(&write_mu_);
75 }
76
Write(const grpc::string & request)77 void CliCall::Write(const grpc::string& request) {
78 void* got_tag;
79 bool ok;
80
81 gpr_slice s = gpr_slice_from_copied_buffer(request.data(), request.size());
82 grpc::Slice req_slice(s, grpc::Slice::STEAL_REF);
83 grpc::ByteBuffer send_buffer(&req_slice, 1);
84 call_->Write(send_buffer, tag(2));
85 cq_.Next(&got_tag, &ok);
86 GPR_ASSERT(ok);
87 }
88
Read(grpc::string * response,IncomingMetadataContainer * server_initial_metadata)89 bool CliCall::Read(grpc::string* response,
90 IncomingMetadataContainer* server_initial_metadata) {
91 void* got_tag;
92 bool ok;
93
94 grpc::ByteBuffer recv_buffer;
95 call_->Read(&recv_buffer, tag(3));
96
97 if (!cq_.Next(&got_tag, &ok) || !ok) {
98 return false;
99 }
100 std::vector<grpc::Slice> slices;
101 GPR_ASSERT(recv_buffer.Dump(&slices).ok());
102
103 response->clear();
104 for (size_t i = 0; i < slices.size(); i++) {
105 response->append(reinterpret_cast<const char*>(slices[i].begin()),
106 slices[i].size());
107 }
108 if (server_initial_metadata) {
109 *server_initial_metadata = ctx_.GetServerInitialMetadata();
110 }
111 return true;
112 }
113
WritesDone()114 void CliCall::WritesDone() {
115 void* got_tag;
116 bool ok;
117
118 call_->WritesDone(tag(4));
119 cq_.Next(&got_tag, &ok);
120 GPR_ASSERT(ok);
121 }
122
WriteAndWait(const grpc::string & request)123 void CliCall::WriteAndWait(const grpc::string& request) {
124 grpc::Slice req_slice(request);
125 grpc::ByteBuffer send_buffer(&req_slice, 1);
126
127 gpr_mu_lock(&write_mu_);
128 call_->Write(send_buffer, tag(2));
129 write_done_ = false;
130 while (!write_done_) {
131 gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
132 }
133 gpr_mu_unlock(&write_mu_);
134 }
135
WritesDoneAndWait()136 void CliCall::WritesDoneAndWait() {
137 gpr_mu_lock(&write_mu_);
138 call_->WritesDone(tag(4));
139 write_done_ = false;
140 while (!write_done_) {
141 gpr_cv_wait(&write_cv_, &write_mu_, gpr_inf_future(GPR_CLOCK_MONOTONIC));
142 }
143 gpr_mu_unlock(&write_mu_);
144 }
145
ReadAndMaybeNotifyWrite(grpc::string * response,IncomingMetadataContainer * server_initial_metadata)146 bool CliCall::ReadAndMaybeNotifyWrite(
147 grpc::string* response,
148 IncomingMetadataContainer* server_initial_metadata) {
149 void* got_tag;
150 bool ok;
151 grpc::ByteBuffer recv_buffer;
152
153 call_->Read(&recv_buffer, tag(3));
154 bool cq_result = cq_.Next(&got_tag, &ok);
155
156 while (got_tag != tag(3)) {
157 gpr_mu_lock(&write_mu_);
158 write_done_ = true;
159 gpr_cv_signal(&write_cv_);
160 gpr_mu_unlock(&write_mu_);
161
162 cq_result = cq_.Next(&got_tag, &ok);
163 if (got_tag == tag(2)) {
164 GPR_ASSERT(ok);
165 }
166 }
167
168 if (!cq_result || !ok) {
169 // If the RPC is ended on the server side, we should still wait for the
170 // pending write on the client side to be done.
171 if (!ok) {
172 gpr_mu_lock(&write_mu_);
173 if (!write_done_) {
174 cq_.Next(&got_tag, &ok);
175 GPR_ASSERT(got_tag != tag(2));
176 write_done_ = true;
177 gpr_cv_signal(&write_cv_);
178 }
179 gpr_mu_unlock(&write_mu_);
180 }
181 return false;
182 }
183
184 std::vector<grpc::Slice> slices;
185 GPR_ASSERT(recv_buffer.Dump(&slices).ok());
186 response->clear();
187 for (size_t i = 0; i < slices.size(); i++) {
188 response->append(reinterpret_cast<const char*>(slices[i].begin()),
189 slices[i].size());
190 }
191 if (server_initial_metadata) {
192 *server_initial_metadata = ctx_.GetServerInitialMetadata();
193 }
194 return true;
195 }
196
Finish(IncomingMetadataContainer * server_trailing_metadata)197 Status CliCall::Finish(IncomingMetadataContainer* server_trailing_metadata) {
198 void* got_tag;
199 bool ok;
200 grpc::Status status;
201
202 call_->Finish(&status, tag(5));
203 cq_.Next(&got_tag, &ok);
204 GPR_ASSERT(ok);
205 if (server_trailing_metadata) {
206 *server_trailing_metadata = ctx_.GetServerTrailingMetadata();
207 }
208
209 return status;
210 }
211
212 } // namespace testing
213 } // namespace grpc
214