• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "test/cpp/util/grpc_tool.h"
20 
21 #include <grpc/grpc.h>
22 #include <grpc/support/port_platform.h>
23 #include <grpcpp/channel.h>
24 #include <grpcpp/create_channel.h>
25 #include <grpcpp/grpcpp.h>
26 #include <grpcpp/security/credentials.h>
27 #include <grpcpp/support/string_ref.h>
28 
29 #include <cstdio>
30 #include <fstream>
31 #include <iostream>
32 #include <memory>
33 #include <sstream>
34 #include <string>
35 #include <thread>
36 #include <utility>
37 
38 #include "absl/flags/flag.h"
39 #include "absl/strings/numbers.h"
40 #include "absl/strings/str_split.h"
41 #include "absl/strings/string_view.h"
42 #include "test/cpp/util/cli_call.h"
43 #include "test/cpp/util/proto_file_parser.h"
44 #include "test/cpp/util/proto_reflection_descriptor_database.h"
45 #include "test/cpp/util/service_describer.h"
46 
47 #if GPR_WINDOWS
48 #include <io.h>
49 #else
50 #include <unistd.h>
51 #endif
52 
53 ABSL_FLAG(bool, l, false, "Use a long listing format");
54 ABSL_FLAG(bool, remotedb, true,
55           "Use server types to parse and format messages");
56 ABSL_FLAG(std::string, metadata, "",
57           "Metadata to send to server, in the form of key1:val1:key2:val2");
58 ABSL_FLAG(std::string, proto_path, ".",
59           "Path to look for the proto file. "
60           "Multiple paths can be separated by " GRPC_CLI_PATH_SEPARATOR);
61 ABSL_FLAG(std::string, protofiles, "", "Name of the proto file.");
62 ABSL_FLAG(bool, binary_input, false, "Input in binary format");
63 ABSL_FLAG(bool, binary_output, false, "Output in binary format");
64 ABSL_FLAG(std::string, default_service_config, "",
65           "Default service config to use on the channel, if non-empty. Note "
66           "that this will be ignored if the name resolver returns a service "
67           "config.");
68 ABSL_FLAG(bool, display_peer_address, false,
69           "Log the peer socket address of the connection that each RPC is made "
70           "on to stderr.");
71 ABSL_FLAG(bool, json_input, false, "Input in json format");
72 ABSL_FLAG(bool, json_output, false, "Output in json format");
73 ABSL_FLAG(std::string, infile, "", "Input file (default is stdin)");
74 ABSL_FLAG(bool, batch, false,
75           "Input contains multiple requests. Please do not use this to send "
76           "more than a few RPCs. gRPC CLI has very different performance "
77           "characteristics compared with normal RPC calls which make it "
78           "unsuitable for loadtesting or significant production traffic.");
79 // TODO(Capstan): Consider using absl::Duration
80 ABSL_FLAG(double, timeout, -1,
81           "Specify timeout in seconds, used to set the deadline for all "
82           "RPCs. The default value of -1 means no deadline has been set.");
83 ABSL_FLAG(
84     int, max_recv_msg_size, 0,
85     "Specify the max receive message size in bytes for all RPCs. -1 indicates "
86     "unlimited. The default value of 0 means to use the gRPC default.");
87 ABSL_FLAG(std::string, channel_args, "",
88           "Comma-separated list of key=value gRPC ChannelArgs to apply "
89           "(a=b,c=d,...). Values may be integers or strings.");
90 
91 namespace grpc {
92 namespace testing {
93 namespace {
94 
95 class GrpcTool {
96  public:
97   explicit GrpcTool();
~GrpcTool()98   virtual ~GrpcTool() {}
99 
100   bool Help(int argc, const char** argv, const CliCredentials& cred,
101             const GrpcToolOutputCallback& callback);
102   bool CallMethod(int argc, const char** argv, const CliCredentials& cred,
103                   const GrpcToolOutputCallback& callback);
104   bool ListServices(int argc, const char** argv, const CliCredentials& cred,
105                     const GrpcToolOutputCallback& callback);
106   bool PrintType(int argc, const char** argv, const CliCredentials& cred,
107                  const GrpcToolOutputCallback& callback);
108   // TODO(zyc): implement the following methods
109   // bool ListServices(int argc, const char** argv, GrpcToolOutputCallback
110   // callback);
111   // bool PrintTypeId(int argc, const char** argv, GrpcToolOutputCallback
112   // callback);
113   bool ParseMessage(int argc, const char** argv, const CliCredentials& cred,
114                     const GrpcToolOutputCallback& callback);
115   bool ToText(int argc, const char** argv, const CliCredentials& cred,
116               const GrpcToolOutputCallback& callback);
117   bool ToJson(int argc, const char** argv, const CliCredentials& cred,
118               const GrpcToolOutputCallback& callback);
119   bool ToBinary(int argc, const char** argv, const CliCredentials& cred,
120                 const GrpcToolOutputCallback& callback);
121 
SetPrintCommandMode(int exit_status)122   void SetPrintCommandMode(int exit_status) {
123     print_command_usage_ = true;
124     usage_exit_status_ = exit_status;
125   }
126 
127  private:
128   void CommandUsage(const std::string& usage) const;
129   bool print_command_usage_;
130   int usage_exit_status_;
131   const std::string cred_usage_;
132 };
133 
134 template <typename T>
135 std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
136                    GrpcToolOutputCallback)>
BindWith5Args(T && func)137 BindWith5Args(T&& func) {
138   return std::bind(std::forward<T>(func), std::placeholders::_1,
139                    std::placeholders::_2, std::placeholders::_3,
140                    std::placeholders::_4, std::placeholders::_5);
141 }
142 
143 template <typename T>
ArraySize(T & a)144 size_t ArraySize(T& a) {
145   return ((sizeof(a) / sizeof(*(a))) /
146           static_cast<size_t>(!(sizeof(a) % sizeof(*(a)))));
147 }
148 
ParseMetadataFlag(std::multimap<std::string,std::string> * client_metadata)149 void ParseMetadataFlag(
150     std::multimap<std::string, std::string>* client_metadata) {
151   if (absl::GetFlag(FLAGS_metadata).empty()) {
152     return;
153   }
154   std::vector<std::string> fields;
155   const char delim = ':';
156   const char escape = '\\';
157   size_t cur = -1;
158   std::stringstream ss;
159   while (++cur < absl::GetFlag(FLAGS_metadata).length()) {
160     switch (absl::GetFlag(FLAGS_metadata).at(cur)) {
161       case escape:
162         if (cur < absl::GetFlag(FLAGS_metadata).length() - 1) {
163           char c = absl::GetFlag(FLAGS_metadata).at(++cur);
164           if (c == delim || c == escape) {
165             ss << c;
166             continue;
167           }
168         }
169         fprintf(stderr, "Failed to parse metadata flag.\n");
170         exit(1);
171       case delim:
172         fields.push_back(ss.str());
173         ss.str("");
174         ss.clear();
175         break;
176       default:
177         ss << absl::GetFlag(FLAGS_metadata).at(cur);
178     }
179   }
180   fields.push_back(ss.str());
181   if (fields.size() % 2) {
182     fprintf(stderr, "Failed to parse metadata flag.\n");
183     exit(1);
184   }
185   for (size_t i = 0; i < fields.size(); i += 2) {
186     client_metadata->insert(
187         std::pair<std::string, std::string>(fields[i], fields[i + 1]));
188   }
189 }
190 
191 template <typename T>
PrintMetadata(const T & m,const std::string & message)192 void PrintMetadata(const T& m, const std::string& message) {
193   if (m.empty()) {
194     return;
195   }
196   fprintf(stderr, "%s\n", message.c_str());
197   std::string pair;
198   for (typename T::const_iterator iter = m.begin(); iter != m.end(); ++iter) {
199     pair.clear();
200     pair.append(iter->first.data(), iter->first.size());
201     pair.append(" : ");
202     pair.append(iter->second.data(), iter->second.size());
203     fprintf(stderr, "%s\n", pair.c_str());
204   }
205 }
206 
ReadResponse(CliCall * call,const std::string & method_name,const GrpcToolOutputCallback & callback,ProtoFileParser * parser,gpr_mu * parser_mu,bool print_mode)207 void ReadResponse(CliCall* call, const std::string& method_name,
208                   const GrpcToolOutputCallback& callback,
209                   ProtoFileParser* parser, gpr_mu* parser_mu, bool print_mode) {
210   std::string serialized_response_proto;
211   std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata;
212 
213   for (bool receive_initial_metadata = true; call->ReadAndMaybeNotifyWrite(
214            &serialized_response_proto,
215            receive_initial_metadata ? &server_initial_metadata : nullptr);
216        receive_initial_metadata = false) {
217     fprintf(stderr, "got response.\n");
218     if (!absl::GetFlag(FLAGS_binary_output)) {
219       gpr_mu_lock(parser_mu);
220       serialized_response_proto = parser->GetFormattedStringFromMethod(
221           method_name, serialized_response_proto, false /* is_request */,
222           absl::GetFlag(FLAGS_json_output));
223       if (parser->HasError() && print_mode) {
224         fprintf(stderr, "Failed to parse response.\n");
225       }
226       gpr_mu_unlock(parser_mu);
227     }
228     if (receive_initial_metadata) {
229       PrintMetadata(server_initial_metadata,
230                     "Received initial metadata from server:");
231     }
232     if (!callback(serialized_response_proto) && print_mode) {
233       fprintf(stderr, "Failed to output response.\n");
234     }
235   }
236 }
237 
CreateCliChannel(const std::string & server_address,const CliCredentials & cred,const grpc::ChannelArguments & extra_args)238 std::shared_ptr<grpc::Channel> CreateCliChannel(
239     const std::string& server_address, const CliCredentials& cred,
240     const grpc::ChannelArguments& extra_args) {
241   grpc::ChannelArguments args(extra_args);
242   if (!cred.GetSslTargetNameOverride().empty()) {
243     args.SetSslTargetNameOverride(cred.GetSslTargetNameOverride());
244   }
245   if (!absl::GetFlag(FLAGS_default_service_config).empty()) {
246     args.SetString(GRPC_ARG_SERVICE_CONFIG,
247                    absl::GetFlag(FLAGS_default_service_config));
248   }
249   // See |GRPC_ARG_MAX_METADATA_SIZE| in |grpc_types.h|.
250   // Set to large enough size (10M) that should work for most use cases.
251   args.SetInt(GRPC_ARG_MAX_METADATA_SIZE, 10 * 1024 * 1024);
252 
253   // Extend channel args according to flag --channel_args.
254   const auto flag = absl::GetFlag(FLAGS_channel_args);
255   for (absl::string_view arg :
256        absl::StrSplit(flag, ',', absl::SkipWhitespace())) {
257     std::pair<std::string, std::string> kv =
258         absl::StrSplit(arg, absl::MaxSplits('=', 1), absl::SkipWhitespace());
259     int ival;
260     if (absl::SimpleAtoi(kv.second, &ival)) {
261       args.SetInt(kv.first, ival);
262     } else if (!kv.second.empty()) {
263       args.SetString(kv.first, kv.second);
264     }
265   }
266   return grpc::CreateCustomChannel(server_address, cred.GetCredentials(), args);
267 }
268 
269 struct Command {
270   const char* command;
271   std::function<bool(GrpcTool*, int, const char**, const CliCredentials&,
272                      GrpcToolOutputCallback)>
273       function;
274   int min_args;
275   int max_args;
276 };
277 
278 const Command ops[] = {
279     {"help", BindWith5Args(&GrpcTool::Help), 0, INT_MAX},
280     {"ls", BindWith5Args(&GrpcTool::ListServices), 1, 3},
281     {"list", BindWith5Args(&GrpcTool::ListServices), 1, 3},
282     {"call", BindWith5Args(&GrpcTool::CallMethod), 2, 3},
283     {"type", BindWith5Args(&GrpcTool::PrintType), 2, 2},
284     {"parse", BindWith5Args(&GrpcTool::ParseMessage), 2, 3},
285     {"totext", BindWith5Args(&GrpcTool::ToText), 2, 3},
286     {"tobinary", BindWith5Args(&GrpcTool::ToBinary), 2, 3},
287     {"tojson", BindWith5Args(&GrpcTool::ToJson), 2, 3},
288 };
289 
Usage(const std::string & msg)290 void Usage(const std::string& msg) {
291   fprintf(
292       stderr,
293       "%s\n"
294       "  grpc_cli ls ...         ; List services\n"
295       "  grpc_cli call ...       ; Call method\n"
296       "  grpc_cli type ...       ; Print type\n"
297       "  grpc_cli parse ...      ; Parse message\n"
298       "  grpc_cli totext ...     ; Convert binary message to text\n"
299       "  grpc_cli tojson ...     ; Convert binary message to json\n"
300       "  grpc_cli tobinary ...   ; Convert text message to binary\n"
301       "  grpc_cli help ...       ; Print this message, or per-command usage\n"
302       "\n",
303       msg.c_str());
304 
305   exit(1);
306 }
307 
FindCommand(const std::string & name)308 const Command* FindCommand(const std::string& name) {
309   for (int i = 0; i < static_cast<int>(ArraySize(ops)); i++) {
310     if (name == ops[i].command) {
311       return &ops[i];
312     }
313   }
314   return nullptr;
315 }
316 }  // namespace
317 
GrpcToolMainLib(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)318 int GrpcToolMainLib(int argc, const char** argv, const CliCredentials& cred,
319                     const GrpcToolOutputCallback& callback) {
320   if (argc < 2) {
321     Usage("No command specified");
322   }
323 
324   std::string command = argv[1];
325   argc -= 2;
326   argv += 2;
327 
328   const Command* cmd = FindCommand(command);
329   if (cmd != nullptr) {
330     GrpcTool grpc_tool;
331     if (argc < cmd->min_args || argc > cmd->max_args) {
332       // Force the command to print its usage message
333       fprintf(stderr, "\nWrong number of arguments for %s\n", command.c_str());
334       grpc_tool.SetPrintCommandMode(1);
335       return cmd->function(&grpc_tool, -1, nullptr, cred, callback);
336     }
337     const bool ok = cmd->function(&grpc_tool, argc, argv, cred, callback);
338     return ok ? 0 : 1;
339   } else {
340     Usage("Invalid command '" + command + "'");
341   }
342   return 1;
343 }
344 
GrpcTool()345 GrpcTool::GrpcTool() : print_command_usage_(false), usage_exit_status_(0) {}
346 
CommandUsage(const std::string & usage) const347 void GrpcTool::CommandUsage(const std::string& usage) const {
348   if (print_command_usage_) {
349     fprintf(stderr, "\n%s%s\n", usage.c_str(),
350             (usage.empty() || usage[usage.size() - 1] != '\n') ? "\n" : "");
351     exit(usage_exit_status_);
352   }
353 }
354 
Help(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)355 bool GrpcTool::Help(int argc, const char** argv, const CliCredentials& cred,
356                     const GrpcToolOutputCallback& callback) {
357   CommandUsage(
358       "Print help\n"
359       "  grpc_cli help [subcommand]\n");
360 
361   if (argc == 0) {
362     Usage("");
363   } else {
364     const Command* cmd = FindCommand(argv[0]);
365     if (cmd == nullptr) {
366       Usage("Unknown command '" + std::string(argv[0]) + "'");
367     }
368     SetPrintCommandMode(0);
369     cmd->function(this, -1, nullptr, cred, callback);
370   }
371   return true;
372 }
373 
ListServices(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)374 bool GrpcTool::ListServices(int argc, const char** argv,
375                             const CliCredentials& cred,
376                             const GrpcToolOutputCallback& callback) {
377   CommandUsage(
378       "List services\n"
379       "  grpc_cli ls <address> [<service>[/<method>]]\n"
380       "    <address>                ; host:port\n"
381       "    <service>                ; Exported service name\n"
382       "    <method>                 ; Method name\n"
383       "    --l                      ; Use a long listing format\n"
384       "    --outfile                ; Output filename (defaults to stdout)\n" +
385       cred.GetCredentialUsage());
386 
387   std::string server_address(argv[0]);
388   std::shared_ptr<grpc::Channel> channel =
389       CreateCliChannel(server_address, cred, grpc::ChannelArguments());
390   grpc::ProtoReflectionDescriptorDatabase desc_db(channel);
391   grpc::protobuf::DescriptorPool desc_pool(&desc_db);
392 
393   std::vector<std::string> service_list;
394   if (!desc_db.GetServices(&service_list)) {
395     fprintf(stderr, "Received an error when querying services endpoint.\n");
396     return false;
397   }
398 
399   // If no service is specified, dump the list of services.
400   std::string output;
401   if (argc < 2) {
402     // List all services, if --l is passed, then include full description,
403     // otherwise include a summarized list only.
404     if (absl::GetFlag(FLAGS_l)) {
405       output = DescribeServiceList(service_list, desc_pool);
406     } else {
407       for (auto it = service_list.begin(); it != service_list.end(); it++) {
408         auto const& service = *it;
409         output.append(service);
410         output.append("\n");
411       }
412     }
413   } else {
414     std::string service_name;
415     std::string method_name;
416     std::stringstream ss(argv[1]);
417 
418     // Remove leading slashes.
419     while (ss.peek() == '/') {
420       ss.get();
421     }
422 
423     // Parse service and method names. Support the following patterns:
424     //   Service
425     //   Service Method
426     //   Service.Method
427     //   Service/Method
428     if (argc == 3) {
429       std::getline(ss, service_name, '/');
430       method_name = argv[2];
431     } else {
432       if (std::getline(ss, service_name, '/')) {
433         std::getline(ss, method_name);
434       }
435     }
436 
437     const grpc::protobuf::ServiceDescriptor* service =
438         desc_pool.FindServiceByName(service_name);
439     if (service != nullptr) {
440       if (method_name.empty()) {
441         output = absl::GetFlag(FLAGS_l) ? DescribeService(service)
442                                         : SummarizeService(service);
443       } else {
444         method_name.insert(0, 1, '.');
445         method_name.insert(0, service_name);
446         const grpc::protobuf::MethodDescriptor* method =
447             desc_pool.FindMethodByName(method_name);
448         if (method != nullptr) {
449           output = absl::GetFlag(FLAGS_l) ? DescribeMethod(method)
450                                           : SummarizeMethod(method);
451         } else {
452           fprintf(stderr, "Method %s not found in service %s.\n",
453                   method_name.c_str(), service_name.c_str());
454           return false;
455         }
456       }
457     } else {
458       if (!method_name.empty()) {
459         fprintf(stderr, "Service %s not found.\n", service_name.c_str());
460         return false;
461       } else {
462         const grpc::protobuf::MethodDescriptor* method =
463             desc_pool.FindMethodByName(service_name);
464         if (method != nullptr) {
465           output = absl::GetFlag(FLAGS_l) ? DescribeMethod(method)
466                                           : SummarizeMethod(method);
467         } else {
468           fprintf(stderr, "Service or method %s not found.\n",
469                   service_name.c_str());
470           return false;
471         }
472       }
473     }
474   }
475   return callback(output);
476 }
477 
PrintType(int,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)478 bool GrpcTool::PrintType(int /*argc*/, const char** argv,
479                          const CliCredentials& cred,
480                          const GrpcToolOutputCallback& callback) {
481   CommandUsage(
482       "Print type\n"
483       "  grpc_cli type <address> <type>\n"
484       "    <address>                ; host:port\n"
485       "    <type>                   ; Protocol buffer type name\n" +
486       cred.GetCredentialUsage());
487 
488   std::string server_address(argv[0]);
489   std::shared_ptr<grpc::Channel> channel =
490       CreateCliChannel(server_address, cred, grpc::ChannelArguments());
491   grpc::ProtoReflectionDescriptorDatabase desc_db(channel);
492   grpc::protobuf::DescriptorPool desc_pool(&desc_db);
493 
494   std::string output;
495   const grpc::protobuf::Descriptor* descriptor =
496       desc_pool.FindMessageTypeByName(argv[1]);
497   if (descriptor != nullptr) {
498     output = descriptor->DebugString();
499   } else {
500     fprintf(stderr, "Type %s not found.\n", argv[1]);
501     return false;
502   }
503   return callback(output);
504 }
505 
CallMethod(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)506 bool GrpcTool::CallMethod(int argc, const char** argv,
507                           const CliCredentials& cred,
508                           const GrpcToolOutputCallback& callback) {
509   CommandUsage(
510       "Call method\n"
511       "  grpc_cli call <address> <service>[.<method>] <request>\n"
512       "    <address>                ; host:port\n"
513       "    <service>                ; Exported service name\n"
514       "    <method>                 ; Method name\n"
515       "    <request>                ; Text protobuffer (overrides infile)\n"
516       "    --protofiles             ; Comma separated proto files used as a"
517       " fallback when parsing request/response\n"
518       "    --proto_path             ; The search paths of proto files"
519       " (" GRPC_CLI_PATH_SEPARATOR
520       " separated), valid only when --protofiles is given\n"
521       "    --noremotedb             ; Don't attempt to use reflection service"
522       " at all\n"
523       "    --metadata               ; The metadata to be sent to the server\n"
524       "    --infile                 ; Input filename (defaults to stdin)\n"
525       "    --outfile                ; Output filename (defaults to stdout)\n"
526       "    --binary_input           ; Input in binary format\n"
527       "    --binary_output          ; Output in binary format\n"
528       "    --json_input             ; Input in json format\n"
529       "    --json_output            ; Output in json format\n"
530       "    --max_recv_msg_size      ; Specify max receive message size in "
531       "bytes. -1 indicates unlimited. The default value of 0 means to use the "
532       "gRPC default.\n"
533       "    --timeout                ; Specify timeout (in seconds), used to "
534       "set the deadline for RPCs. The default value of -1 means no "
535       "deadline has been set.\n" +
536       cred.GetCredentialUsage());
537 
538   std::stringstream output_ss;
539   std::string request_text;
540   std::string server_address(argv[0]);
541   std::string method_name(argv[1]);
542   std::string formatted_method_name;
543   std::unique_ptr<ProtoFileParser> parser;
544   std::string serialized_request_proto;
545   CliArgs cli_args;
546   cli_args.timeout = absl::GetFlag(FLAGS_timeout);
547   bool print_mode = false;
548 
549   grpc::ChannelArguments args;
550   if (absl::GetFlag(FLAGS_max_recv_msg_size) != 0) {
551     args.SetMaxReceiveMessageSize(absl::GetFlag(FLAGS_max_recv_msg_size));
552   }
553   std::shared_ptr<grpc::Channel> channel =
554       CreateCliChannel(server_address, cred, args);
555 
556   if (!absl::GetFlag(FLAGS_binary_input) ||
557       !absl::GetFlag(FLAGS_binary_output)) {
558     parser = std::make_unique<grpc::testing::ProtoFileParser>(
559         absl::GetFlag(FLAGS_remotedb) ? channel : nullptr,
560         absl::GetFlag(FLAGS_proto_path), absl::GetFlag(FLAGS_protofiles));
561     if (parser->HasError()) {
562       fprintf(
563           stderr,
564           "Failed to find remote reflection service and local proto files.\n");
565       return false;
566     }
567   }
568 
569   if (absl::GetFlag(FLAGS_binary_input)) {
570     formatted_method_name = method_name;
571   } else {
572     formatted_method_name = parser->GetFormattedMethodName(method_name);
573     if (parser->HasError()) {
574       fprintf(stderr, "Failed to find method %s in proto files.\n",
575               method_name.c_str());
576     }
577   }
578 
579   if (argc == 3) {
580     request_text = argv[2];
581   }
582 
583   if (parser != nullptr &&
584       parser->IsStreaming(method_name, true /* is_request */)) {
585     std::istream* input_stream;
586     std::ifstream input_file;
587 
588     if (absl::GetFlag(FLAGS_batch)) {
589       fprintf(stderr, "Batch mode for streaming RPC is not supported.\n");
590       return false;
591     }
592 
593     std::multimap<std::string, std::string> client_metadata;
594     ParseMetadataFlag(&client_metadata);
595     PrintMetadata(client_metadata, "Sending client initial metadata:");
596 
597     CliCall call(channel, formatted_method_name, client_metadata, cli_args);
598     if (absl::GetFlag(FLAGS_display_peer_address)) {
599       fprintf(stderr, "New call for method_name:%s has peer address:|%s|\n",
600               formatted_method_name.c_str(), call.peer().c_str());
601     }
602 
603     if (absl::GetFlag(FLAGS_infile).empty()) {
604       if (isatty(fileno(stdin))) {
605         print_mode = true;
606         fprintf(stderr, "reading streaming request message from stdin...\n");
607       }
608       input_stream = &std::cin;
609     } else {
610       input_file.open(absl::GetFlag(FLAGS_infile),
611                       std::ios::in | std::ios::binary);
612       if (!input_file) {
613         fprintf(stderr, "Failed to open infile %s.\n",
614                 absl::GetFlag(FLAGS_infile).c_str());
615         return false;
616       }
617 
618       input_stream = &input_file;
619     }
620 
621     gpr_mu parser_mu;
622     gpr_mu_init(&parser_mu);
623     std::thread read_thread(ReadResponse, &call, method_name, callback,
624                             parser.get(), &parser_mu, print_mode);
625 
626     std::stringstream request_ss;
627     std::string line;
628     while (!request_text.empty() ||
629            (!input_stream->eof() && getline(*input_stream, line))) {
630       if (!request_text.empty()) {
631         if (absl::GetFlag(FLAGS_binary_input)) {
632           serialized_request_proto = request_text;
633           request_text.clear();
634         } else {
635           gpr_mu_lock(&parser_mu);
636           serialized_request_proto = parser->GetSerializedProtoFromMethod(
637               method_name, request_text, true /* is_request */,
638               absl::GetFlag(FLAGS_json_input));
639           request_text.clear();
640           if (parser->HasError()) {
641             if (print_mode) {
642               fprintf(stderr, "Failed to parse request.\n");
643             }
644             gpr_mu_unlock(&parser_mu);
645             continue;
646           }
647           gpr_mu_unlock(&parser_mu);
648         }
649 
650         call.WriteAndWait(serialized_request_proto);
651         if (print_mode) {
652           fprintf(stderr, "Request sent.\n");
653         }
654       } else {
655         if (line.empty()) {
656           request_text = request_ss.str();
657           request_ss.str(std::string());
658           request_ss.clear();
659         } else {
660           request_ss << line << ' ';
661         }
662       }
663     }
664     if (input_file.is_open()) {
665       input_file.close();
666     }
667 
668     call.WritesDoneAndWait();
669     read_thread.join();
670     gpr_mu_destroy(&parser_mu);
671 
672     std::multimap<grpc::string_ref, grpc::string_ref> server_trailing_metadata;
673     Status status = call.Finish(&server_trailing_metadata);
674     PrintMetadata(server_trailing_metadata,
675                   "Received trailing metadata from server:");
676 
677     if (status.ok()) {
678       fprintf(stderr, "Stream RPC succeeded with OK status\n");
679       return true;
680     } else {
681       fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
682               status.error_code(), status.error_message().c_str());
683       return false;
684     }
685 
686   } else {  // parser->IsStreaming(method_name, true /* is_request */)
687     if (absl::GetFlag(FLAGS_batch)) {
688       if (parser != nullptr &&
689           parser->IsStreaming(method_name, false /* is_request */)) {
690         fprintf(stderr, "Batch mode for streaming RPC is not supported.\n");
691         return false;
692       }
693 
694       std::istream* input_stream;
695       std::ifstream input_file;
696 
697       if (absl::GetFlag(FLAGS_infile).empty()) {
698         if (isatty(fileno(stdin))) {
699           print_mode = true;
700           fprintf(stderr, "reading request messages from stdin...\n");
701         }
702         input_stream = &std::cin;
703       } else {
704         input_file.open(absl::GetFlag(FLAGS_infile),
705                         std::ios::in | std::ios::binary);
706         input_stream = &input_file;
707       }
708 
709       std::multimap<std::string, std::string> client_metadata;
710       ParseMetadataFlag(&client_metadata);
711       if (print_mode) {
712         PrintMetadata(client_metadata, "Sending client initial metadata:");
713       }
714 
715       std::stringstream request_ss;
716       std::string line;
717       while (!request_text.empty() ||
718              (!input_stream->eof() && getline(*input_stream, line))) {
719         if (!request_text.empty()) {
720           if (absl::GetFlag(FLAGS_binary_input)) {
721             serialized_request_proto = request_text;
722             request_text.clear();
723           } else {
724             serialized_request_proto = parser->GetSerializedProtoFromMethod(
725                 method_name, request_text, true /* is_request */,
726                 absl::GetFlag(FLAGS_json_input));
727             request_text.clear();
728             if (parser->HasError()) {
729               if (print_mode) {
730                 fprintf(stderr, "Failed to parse request.\n");
731               }
732               continue;
733             }
734           }
735 
736           std::string serialized_response_proto;
737           std::multimap<grpc::string_ref, grpc::string_ref>
738               server_initial_metadata, server_trailing_metadata;
739           CliCall call(channel, formatted_method_name, client_metadata,
740                        cli_args);
741           if (absl::GetFlag(FLAGS_display_peer_address)) {
742             fprintf(stderr,
743                     "New call for method_name:%s has peer address:|%s|\n",
744                     formatted_method_name.c_str(), call.peer().c_str());
745           }
746           call.Write(serialized_request_proto);
747           call.WritesDone();
748           if (!call.Read(&serialized_response_proto,
749                          &server_initial_metadata)) {
750             fprintf(stderr, "Failed to read response.\n");
751           }
752           Status status = call.Finish(&server_trailing_metadata);
753 
754           if (status.ok()) {
755             if (print_mode) {
756               fprintf(stderr, "Rpc succeeded with OK status.\n");
757               PrintMetadata(server_initial_metadata,
758                             "Received initial metadata from server:");
759               PrintMetadata(server_trailing_metadata,
760                             "Received trailing metadata from server:");
761             }
762 
763             if (absl::GetFlag(FLAGS_binary_output)) {
764               if (!callback(serialized_response_proto)) {
765                 break;
766               }
767             } else {
768               std::string response_text = parser->GetFormattedStringFromMethod(
769                   method_name, serialized_response_proto,
770                   false /* is_request */, absl::GetFlag(FLAGS_json_output));
771 
772               if (parser->HasError() && print_mode) {
773                 fprintf(stderr, "Failed to parse response.\n");
774               } else {
775                 if (!callback(response_text)) {
776                   break;
777                 }
778               }
779             }
780           } else {
781             if (print_mode) {
782               fprintf(stderr,
783                       "Rpc failed with status code %d, error message: %s\n",
784                       status.error_code(), status.error_message().c_str());
785             }
786           }
787         } else {
788           if (line.empty()) {
789             request_text = request_ss.str();
790             request_ss.str(std::string());
791             request_ss.clear();
792           } else {
793             request_ss << line << ' ';
794           }
795         }
796       }
797 
798       if (input_file.is_open()) {
799         input_file.close();
800       }
801 
802       return true;
803     }
804 
805     if (argc == 3) {
806       if (!absl::GetFlag(FLAGS_infile).empty()) {
807         fprintf(stderr, "warning: request given in argv, ignoring --infile\n");
808       }
809     } else {
810       std::stringstream input_stream;
811       if (absl::GetFlag(FLAGS_infile).empty()) {
812         if (isatty(fileno(stdin))) {
813           fprintf(stderr, "reading request message from stdin...\n");
814         }
815         input_stream << std::cin.rdbuf();
816       } else {
817         std::ifstream input_file(absl::GetFlag(FLAGS_infile),
818                                  std::ios::in | std::ios::binary);
819         input_stream << input_file.rdbuf();
820         input_file.close();
821       }
822       request_text = input_stream.str();
823     }
824 
825     if (absl::GetFlag(FLAGS_binary_input)) {
826       serialized_request_proto = request_text;
827     } else {
828       serialized_request_proto = parser->GetSerializedProtoFromMethod(
829           method_name, request_text, true /* is_request */,
830           absl::GetFlag(FLAGS_json_input));
831       if (parser->HasError()) {
832         fprintf(stderr, "Failed to parse request.\n");
833         return false;
834       }
835     }
836     fprintf(stderr, "connecting to %s\n", server_address.c_str());
837 
838     std::string serialized_response_proto;
839     std::multimap<std::string, std::string> client_metadata;
840     std::multimap<grpc::string_ref, grpc::string_ref> server_initial_metadata,
841         server_trailing_metadata;
842     ParseMetadataFlag(&client_metadata);
843     PrintMetadata(client_metadata, "Sending client initial metadata:");
844 
845     CliCall call(channel, formatted_method_name, client_metadata, cli_args);
846     if (absl::GetFlag(FLAGS_display_peer_address)) {
847       fprintf(stderr, "New call for method_name:%s has peer address:|%s|\n",
848               formatted_method_name.c_str(), call.peer().c_str());
849     }
850     call.Write(serialized_request_proto);
851     call.WritesDone();
852 
853     for (bool receive_initial_metadata = true; call.Read(
854              &serialized_response_proto,
855              receive_initial_metadata ? &server_initial_metadata : nullptr);
856          receive_initial_metadata = false) {
857       if (!absl::GetFlag(FLAGS_binary_output)) {
858         serialized_response_proto = parser->GetFormattedStringFromMethod(
859             method_name, serialized_response_proto, false /* is_request */,
860             absl::GetFlag(FLAGS_json_output));
861         if (parser->HasError()) {
862           fprintf(stderr, "Failed to parse response.\n");
863           return false;
864         }
865       }
866 
867       if (receive_initial_metadata) {
868         PrintMetadata(server_initial_metadata,
869                       "Received initial metadata from server:");
870       }
871       if (!callback(serialized_response_proto)) {
872         return false;
873       }
874     }
875     Status status = call.Finish(&server_trailing_metadata);
876     PrintMetadata(server_trailing_metadata,
877                   "Received trailing metadata from server:");
878     if (status.ok()) {
879       fprintf(stderr, "Rpc succeeded with OK status\n");
880       return true;
881     } else {
882       fprintf(stderr, "Rpc failed with status code %d, error message: %s\n",
883               status.error_code(), status.error_message().c_str());
884       return false;
885     }
886   }
887   GPR_UNREACHABLE_CODE(return false);
888 }
889 
ParseMessage(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)890 bool GrpcTool::ParseMessage(int argc, const char** argv,
891                             const CliCredentials& cred,
892                             const GrpcToolOutputCallback& callback) {
893   CommandUsage(
894       "Parse message\n"
895       "  grpc_cli parse <address> <type> [<message>]\n"
896       "    <address>                ; host:port\n"
897       "    <type>                   ; Protocol buffer type name\n"
898       "    <message>                ; Text protobuffer (overrides --infile)\n"
899       "    --protofiles             ; Comma separated proto files used as a"
900       " fallback when parsing request/response\n"
901       "    --proto_path             ; The search paths of proto files"
902       " (" GRPC_CLI_PATH_SEPARATOR
903       " separated), valid  only when --protofiles is given\n"
904       "    --noremotedb             ; Don't attempt to use reflection service"
905       " at all\n"
906       "    --infile                 ; Input filename (defaults to stdin)\n"
907       "    --outfile                ; Output filename (defaults to stdout)\n"
908       "    --binary_input           ; Input in binary format\n"
909       "    --binary_output          ; Output in binary format\n"
910       "    --json_input             ; Input in json format\n"
911       "    --json_output            ; Output in json format\n" +
912       cred.GetCredentialUsage());
913 
914   std::stringstream output_ss;
915   std::string message_text;
916   std::string server_address(argv[0]);
917   std::string type_name(argv[1]);
918   std::unique_ptr<grpc::testing::ProtoFileParser> parser;
919   std::string serialized_request_proto;
920 
921   if (argc == 3) {
922     message_text = argv[2];
923     if (!absl::GetFlag(FLAGS_infile).empty()) {
924       fprintf(stderr, "warning: message given in argv, ignoring --infile.\n");
925     }
926   } else {
927     std::stringstream input_stream;
928     if (absl::GetFlag(FLAGS_infile).empty()) {
929       if (isatty(fileno(stdin))) {
930         fprintf(stderr, "reading request message from stdin...\n");
931       }
932       input_stream << std::cin.rdbuf();
933     } else {
934       std::ifstream input_file(absl::GetFlag(FLAGS_infile),
935                                std::ios::in | std::ios::binary);
936       input_stream << input_file.rdbuf();
937       input_file.close();
938     }
939     message_text = input_stream.str();
940   }
941 
942   if (!absl::GetFlag(FLAGS_binary_input) ||
943       !absl::GetFlag(FLAGS_binary_output)) {
944     std::shared_ptr<grpc::Channel> channel =
945         CreateCliChannel(server_address, cred, grpc::ChannelArguments());
946     parser = std::make_unique<grpc::testing::ProtoFileParser>(
947         absl::GetFlag(FLAGS_remotedb) ? channel : nullptr,
948         absl::GetFlag(FLAGS_proto_path), absl::GetFlag(FLAGS_protofiles));
949     if (parser->HasError()) {
950       fprintf(
951           stderr,
952           "Failed to find remote reflection service and local proto files.\n");
953       return false;
954     }
955   }
956 
957   if (absl::GetFlag(FLAGS_binary_input)) {
958     serialized_request_proto = message_text;
959   } else {
960     serialized_request_proto = parser->GetSerializedProtoFromMessageType(
961         type_name, message_text, absl::GetFlag(FLAGS_json_input));
962     if (parser->HasError()) {
963       fprintf(stderr, "Failed to serialize the message.\n");
964       return false;
965     }
966   }
967 
968   if (absl::GetFlag(FLAGS_binary_output)) {
969     output_ss << serialized_request_proto;
970   } else {
971     std::string output_text;
972     output_text = parser->GetFormattedStringFromMessageType(
973         type_name, serialized_request_proto, absl::GetFlag(FLAGS_json_output));
974     if (parser->HasError()) {
975       fprintf(stderr, "Failed to deserialize the message.\n");
976       return false;
977     }
978 
979     output_ss << output_text << std::endl;
980   }
981 
982   return callback(output_ss.str());
983 }
984 
ToText(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)985 bool GrpcTool::ToText(int argc, const char** argv, const CliCredentials& cred,
986                       const GrpcToolOutputCallback& callback) {
987   CommandUsage(
988       "Convert binary message to text\n"
989       "  grpc_cli totext <protofiles> <type>\n"
990       "    <protofiles>             ; Comma separated list of proto files\n"
991       "    <type>                   ; Protocol buffer type name\n"
992       "    --proto_path             ; The search paths of proto files"
993       " (" GRPC_CLI_PATH_SEPARATOR
994       " separated)\n"
995       "    --infile                 ; Input filename (defaults to stdin)\n"
996       "    --outfile                ; Output filename (defaults to stdout)\n");
997 
998   absl::SetFlag(&FLAGS_protofiles, argv[0]);
999   absl::SetFlag(&FLAGS_remotedb, false);
1000   absl::SetFlag(&FLAGS_binary_input, true);
1001   absl::SetFlag(&FLAGS_binary_output, false);
1002   return ParseMessage(argc, argv, cred, callback);
1003 }
1004 
ToJson(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)1005 bool GrpcTool::ToJson(int argc, const char** argv, const CliCredentials& cred,
1006                       const GrpcToolOutputCallback& callback) {
1007   CommandUsage(
1008       "Convert binary message to json\n"
1009       "  grpc_cli tojson <protofiles> <type>\n"
1010       "    <protofiles>             ; Comma separated list of proto files\n"
1011       "    <type>                   ; Protocol buffer type name\n"
1012       "    --proto_path             ; The search paths of proto files"
1013       " (" GRPC_CLI_PATH_SEPARATOR
1014       " separated)\n"
1015       "    --infile                 ; Input filename (defaults to stdin)\n"
1016       "    --outfile                ; Output filename (defaults to stdout)\n");
1017 
1018   absl::SetFlag(&FLAGS_protofiles, argv[0]);
1019   absl::SetFlag(&FLAGS_remotedb, false);
1020   absl::SetFlag(&FLAGS_binary_input, true);
1021   absl::SetFlag(&FLAGS_binary_output, false);
1022   absl::SetFlag(&FLAGS_json_output, true);
1023   return ParseMessage(argc, argv, cred, callback);
1024 }
1025 
ToBinary(int argc,const char ** argv,const CliCredentials & cred,const GrpcToolOutputCallback & callback)1026 bool GrpcTool::ToBinary(int argc, const char** argv, const CliCredentials& cred,
1027                         const GrpcToolOutputCallback& callback) {
1028   CommandUsage(
1029       "Convert text message to binary\n"
1030       "  grpc_cli tobinary <protofiles> <type> [<message>]\n"
1031       "    <protofiles>             ; Comma separated list of proto files\n"
1032       "    <type>                   ; Protocol buffer type name\n"
1033       "    --proto_path             ; The search paths of proto files"
1034       " (" GRPC_CLI_PATH_SEPARATOR
1035       " separated)\n"
1036       "    --infile                 ; Input filename (defaults to stdin)\n"
1037       "    --outfile                ; Output filename (defaults to stdout)\n");
1038 
1039   absl::SetFlag(&FLAGS_protofiles, argv[0]);
1040   absl::SetFlag(&FLAGS_remotedb, false);
1041   absl::SetFlag(&FLAGS_binary_input, false);
1042   absl::SetFlag(&FLAGS_binary_output, true);
1043   return ParseMessage(argc, argv, cred, callback);
1044 }
1045 
1046 }  // namespace testing
1047 }  // namespace grpc
1048