1<?php 2/* 3 * 4 * Copyright 2015 gRPC authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 */ 19 20namespace Grpc; 21 22/** 23 * Base class for generated client stubs. Stub methods are expected to call 24 * _simpleRequest or _streamRequest and return the result. 25 */ 26class BaseStub 27{ 28 private $hostname; 29 private $hostname_override; 30 private $channel; 31 private $call_invoker; 32 33 // a callback function 34 private $update_metadata; 35 36 /** 37 * @param string $hostname 38 * @param array $opts 39 * - 'update_metadata': (optional) a callback function which takes in a 40 * metadata array, and returns an updated metadata array 41 * - 'grpc.primary_user_agent': (optional) a user-agent string 42 * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional) 43 */ 44 public function __construct($hostname, $opts, $channel = null) 45 { 46 if (!method_exists('ChannelCredentials', 'isDefaultRootsPemSet') || 47 !ChannelCredentials::isDefaultRootsPemSet()) { 48 $ssl_roots = file_get_contents( 49 dirname(__FILE__).'/../../../../etc/roots.pem' 50 ); 51 ChannelCredentials::setDefaultRootsPem($ssl_roots); 52 } 53 54 $this->hostname = $hostname; 55 $this->update_metadata = null; 56 if (isset($opts['update_metadata'])) { 57 if (is_callable($opts['update_metadata'])) { 58 $this->update_metadata = $opts['update_metadata']; 59 } 60 unset($opts['update_metadata']); 61 } 62 if (!empty($opts['grpc.ssl_target_name_override'])) { 63 $this->hostname_override = $opts['grpc.ssl_target_name_override']; 64 } 65 if (isset($opts['grpc_call_invoker'])) { 66 $this->call_invoker = $opts['grpc_call_invoker']; 67 unset($opts['grpc_call_invoker']); 68 $channel_opts = $this->updateOpts($opts); 69 // If the grpc_call_invoker is defined, use the channel created by the call invoker. 70 $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts); 71 return; 72 } 73 $this->call_invoker = new DefaultCallInvoker(); 74 if ($channel) { 75 if (!is_a($channel, 'Grpc\Channel') && 76 !is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 77 throw new \Exception('The channel argument is not a Channel object '. 78 'or an InterceptorChannel object created by '. 79 'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)'); 80 } 81 $this->channel = $channel; 82 return; 83 } 84 85 $this->channel = static::getDefaultChannel($hostname, $opts); 86 } 87 88 private static function updateOpts($opts) { 89 if (!file_exists($composerFile = __DIR__.'/../../composer.json')) { 90 // for grpc/grpc-php subpackage 91 $composerFile = __DIR__.'/../composer.json'; 92 } 93 $package_config = json_decode(file_get_contents($composerFile), true); 94 if (!empty($opts['grpc.primary_user_agent'])) { 95 $opts['grpc.primary_user_agent'] .= ' '; 96 } else { 97 $opts['grpc.primary_user_agent'] = ''; 98 } 99 $opts['grpc.primary_user_agent'] .= 100 'grpc-php/'.$package_config['version']; 101 if (!array_key_exists('credentials', $opts)) { 102 throw new \Exception("The opts['credentials'] key is now ". 103 'required. Please see one of the '. 104 'ChannelCredentials::create methods'); 105 } 106 return $opts; 107 } 108 109 /** 110 * Creates and returns the default Channel 111 * 112 * @param array $opts Channel constructor options 113 * 114 * @return Channel The channel 115 */ 116 public static function getDefaultChannel($hostname, array $opts) 117 { 118 $channel_opts = self::updateOpts($opts); 119 return new Channel($hostname, $opts); 120 } 121 122 /** 123 * @return string The URI of the endpoint 124 */ 125 public function getTarget() 126 { 127 return $this->channel->getTarget(); 128 } 129 130 /** 131 * @param bool $try_to_connect (optional) 132 * 133 * @return int The grpc connectivity state 134 */ 135 public function getConnectivityState($try_to_connect = false) 136 { 137 return $this->channel->getConnectivityState($try_to_connect); 138 } 139 140 /** 141 * @param int $timeout in microseconds 142 * 143 * @return bool true if channel is ready 144 * @throw Exception if channel is in FATAL_ERROR state 145 */ 146 public function waitForReady($timeout) 147 { 148 $new_state = $this->getConnectivityState(true); 149 if ($this->_checkConnectivityState($new_state)) { 150 return true; 151 } 152 153 $now = Timeval::now(); 154 $delta = new Timeval($timeout); 155 $deadline = $now->add($delta); 156 157 while ($this->channel->watchConnectivityState($new_state, $deadline)) { 158 // state has changed before deadline 159 $new_state = $this->getConnectivityState(); 160 if ($this->_checkConnectivityState($new_state)) { 161 return true; 162 } 163 } 164 // deadline has passed 165 $new_state = $this->getConnectivityState(); 166 167 return $this->_checkConnectivityState($new_state); 168 } 169 170 /** 171 * Close the communication channel associated with this stub. 172 */ 173 public function close() 174 { 175 $this->channel->close(); 176 } 177 178 /** 179 * @param $new_state Connect state 180 * 181 * @return bool true if state is CHANNEL_READY 182 * @throw Exception if state is CHANNEL_FATAL_FAILURE 183 */ 184 private function _checkConnectivityState($new_state) 185 { 186 if ($new_state == \Grpc\CHANNEL_READY) { 187 return true; 188 } 189 if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) { 190 throw new \Exception('Failed to connect to server'); 191 } 192 193 return false; 194 } 195 196 /** 197 * constructs the auth uri for the jwt. 198 * 199 * @param string $method The method string 200 * 201 * @return string The URL string 202 */ 203 private function _get_jwt_aud_uri($method) 204 { 205 // TODO(jtattermusch): This is not the correct implementation 206 // of extracting JWT "aud" claim. We should rely on 207 // grpc_metadata_credentials_plugin which 208 // also provides the correct value of "aud" claim 209 // in the grpc_auth_metadata_context.service_url field. 210 // Trying to do the construction of "aud" field ourselves 211 // is bad. 212 $last_slash_idx = strrpos($method, '/'); 213 if ($last_slash_idx === false) { 214 throw new \InvalidArgumentException( 215 'service name must have a slash' 216 ); 217 } 218 $service_name = substr($method, 0, $last_slash_idx); 219 220 if ($this->hostname_override) { 221 $hostname = $this->hostname_override; 222 } else { 223 $hostname = $this->hostname; 224 } 225 226 // Remove the port if it is 443 227 // See https://github.com/grpc/grpc/blob/07c9f7a36b2a0d34fcffebc85649cf3b8c339b5d/src/core/lib/security/transport/client_auth_filter.cc#L205 228 if ((strlen($hostname) > 4) && (substr($hostname, -4) === ":443")) { 229 $hostname = substr($hostname, 0, -4); 230 } 231 232 return 'https://'.$hostname.$service_name; 233 } 234 235 /** 236 * validate and normalize the metadata array. 237 * 238 * @param array $metadata The metadata map 239 * 240 * @return array $metadata Validated and key-normalized metadata map 241 * @throw InvalidArgumentException if key contains invalid characters 242 */ 243 private function _validate_and_normalize_metadata($metadata) 244 { 245 $metadata_copy = []; 246 foreach ($metadata as $key => $value) { 247 if (!preg_match('/^[.A-Za-z\d_-]+$/', $key)) { 248 throw new \InvalidArgumentException( 249 'Metadata keys must be nonempty strings containing only '. 250 'alphanumeric characters, hyphens, underscores and dots' 251 ); 252 } 253 $metadata_copy[strtolower($key)] = $value; 254 } 255 256 return $metadata_copy; 257 } 258 259 /** 260 * Create a function which can be used to create UnaryCall 261 * 262 * @param Channel|InterceptorChannel $channel 263 * @param callable $deserialize A function that deserializes the response 264 * 265 * @return \Closure 266 */ 267 private function _GrpcUnaryUnary($channel) 268 { 269 return function ($method, 270 $argument, 271 $deserialize, 272 array $metadata = [], 273 array $options = []) use ($channel) { 274 $call = $this->call_invoker->UnaryCall( 275 $channel, 276 $method, 277 $deserialize, 278 $options 279 ); 280 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 281 if (is_callable($this->update_metadata)) { 282 $metadata = call_user_func( 283 $this->update_metadata, 284 $metadata, 285 $jwt_aud_uri 286 ); 287 } 288 $metadata = $this->_validate_and_normalize_metadata( 289 $metadata 290 ); 291 $call->start($argument, $metadata, $options); 292 return $call; 293 }; 294 } 295 296 /** 297 * Create a function which can be used to create ServerStreamingCall 298 * 299 * @param Channel|InterceptorChannel $channel 300 * @param callable $deserialize A function that deserializes the response 301 * 302 * @return \Closure 303 */ 304 private function _GrpcStreamUnary($channel) 305 { 306 return function ($method, 307 $deserialize, 308 array $metadata = [], 309 array $options = []) use ($channel) { 310 $call = $this->call_invoker->ClientStreamingCall( 311 $channel, 312 $method, 313 $deserialize, 314 $options 315 ); 316 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 317 if (is_callable($this->update_metadata)) { 318 $metadata = call_user_func( 319 $this->update_metadata, 320 $metadata, 321 $jwt_aud_uri 322 ); 323 } 324 $metadata = $this->_validate_and_normalize_metadata( 325 $metadata 326 ); 327 $call->start($metadata); 328 return $call; 329 }; 330 } 331 332 /** 333 * Create a function which can be used to create ClientStreamingCall 334 * 335 * @param Channel|InterceptorChannel $channel 336 * @param callable $deserialize A function that deserializes the response 337 * 338 * @return \Closure 339 */ 340 private function _GrpcUnaryStream($channel) 341 { 342 return function ($method, 343 $argument, 344 $deserialize, 345 array $metadata = [], 346 array $options = []) use ($channel) { 347 $call = $this->call_invoker->ServerStreamingCall( 348 $channel, 349 $method, 350 $deserialize, 351 $options 352 ); 353 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 354 if (is_callable($this->update_metadata)) { 355 $metadata = call_user_func( 356 $this->update_metadata, 357 $metadata, 358 $jwt_aud_uri 359 ); 360 } 361 $metadata = $this->_validate_and_normalize_metadata( 362 $metadata 363 ); 364 $call->start($argument, $metadata, $options); 365 return $call; 366 }; 367 } 368 369 /** 370 * Create a function which can be used to create BidiStreamingCall 371 * 372 * @param Channel|InterceptorChannel $channel 373 * @param callable $deserialize A function that deserializes the response 374 * 375 * @return \Closure 376 */ 377 private function _GrpcStreamStream($channel) 378 { 379 return function ($method, 380 $deserialize, 381 array $metadata = [], 382 array $options = []) use ($channel) { 383 $call = $this->call_invoker->BidiStreamingCall( 384 $channel, 385 $method, 386 $deserialize, 387 $options 388 ); 389 $jwt_aud_uri = $this->_get_jwt_aud_uri($method); 390 if (is_callable($this->update_metadata)) { 391 $metadata = call_user_func( 392 $this->update_metadata, 393 $metadata, 394 $jwt_aud_uri 395 ); 396 } 397 $metadata = $this->_validate_and_normalize_metadata( 398 $metadata 399 ); 400 $call->start($metadata); 401 402 return $call; 403 }; 404 } 405 406 /** 407 * Create a function which can be used to create UnaryCall 408 * 409 * @param Channel|InterceptorChannel $channel 410 * @param callable $deserialize A function that deserializes the response 411 * 412 * @return \Closure 413 */ 414 private function _UnaryUnaryCallFactory($channel) 415 { 416 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 417 return function ($method, 418 $argument, 419 $deserialize, 420 array $metadata = [], 421 array $options = []) use ($channel) { 422 return $channel->getInterceptor()->interceptUnaryUnary( 423 $method, 424 $argument, 425 $deserialize, 426 $this->_UnaryUnaryCallFactory($channel->getNext()), 427 $metadata, 428 $options 429 ); 430 }; 431 } 432 return $this->_GrpcUnaryUnary($channel); 433 } 434 435 /** 436 * Create a function which can be used to create ServerStreamingCall 437 * 438 * @param Channel|InterceptorChannel $channel 439 * @param callable $deserialize A function that deserializes the response 440 * 441 * @return \Closure 442 */ 443 private function _UnaryStreamCallFactory($channel) 444 { 445 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 446 return function ($method, 447 $argument, 448 $deserialize, 449 array $metadata = [], 450 array $options = []) use ($channel) { 451 return $channel->getInterceptor()->interceptUnaryStream( 452 $method, 453 $argument, 454 $deserialize, 455 $this->_UnaryStreamCallFactory($channel->getNext()), 456 $metadata, 457 $options 458 ); 459 }; 460 } 461 return $this->_GrpcUnaryStream($channel); 462 } 463 464 /** 465 * Create a function which can be used to create ClientStreamingCall 466 * 467 * @param Channel|InterceptorChannel $channel 468 * @param callable $deserialize A function that deserializes the response 469 * 470 * @return \Closure 471 */ 472 private function _StreamUnaryCallFactory($channel) 473 { 474 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 475 return function ($method, 476 $deserialize, 477 array $metadata = [], 478 array $options = []) use ($channel) { 479 return $channel->getInterceptor()->interceptStreamUnary( 480 $method, 481 $deserialize, 482 $this->_StreamUnaryCallFactory($channel->getNext()), 483 $metadata, 484 $options 485 ); 486 }; 487 } 488 return $this->_GrpcStreamUnary($channel); 489 } 490 491 /** 492 * Create a function which can be used to create BidiStreamingCall 493 * 494 * @param Channel|InterceptorChannel $channel 495 * @param callable $deserialize A function that deserializes the response 496 * 497 * @return \Closure 498 */ 499 private function _StreamStreamCallFactory($channel) 500 { 501 if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) { 502 return function ($method, 503 $deserialize, 504 array $metadata = [], 505 array $options = []) use ($channel) { 506 return $channel->getInterceptor()->interceptStreamStream( 507 $method, 508 $deserialize, 509 $this->_StreamStreamCallFactory($channel->getNext()), 510 $metadata, 511 $options 512 ); 513 }; 514 } 515 return $this->_GrpcStreamStream($channel); 516 } 517 518 /* This class is intended to be subclassed by generated code, so 519 * all functions begin with "_" to avoid name collisions. */ 520 /** 521 * Call a remote method that takes a single argument and has a 522 * single output. 523 * 524 * @param string $method The name of the method to call 525 * @param mixed $argument The argument to the method 526 * @param callable $deserialize A function that deserializes the response 527 * @param array $metadata A metadata map to send to the server 528 * (optional) 529 * @param array $options An array of options (optional) 530 * 531 * @return UnaryCall The active call object 532 */ 533 protected function _simpleRequest( 534 $method, 535 $argument, 536 $deserialize, 537 array $metadata = [], 538 array $options = [] 539 ) { 540 $call_factory = $this->_UnaryUnaryCallFactory($this->channel); 541 $call = $call_factory($method, $argument, $deserialize, $metadata, $options); 542 return $call; 543 } 544 545 /** 546 * Call a remote method that takes a stream of arguments and has a single 547 * output. 548 * 549 * @param string $method The name of the method to call 550 * @param callable $deserialize A function that deserializes the response 551 * @param array $metadata A metadata map to send to the server 552 * (optional) 553 * @param array $options An array of options (optional) 554 * 555 * @return ClientStreamingCall The active call object 556 */ 557 protected function _clientStreamRequest( 558 $method, 559 $deserialize, 560 array $metadata = [], 561 array $options = [] 562 ) { 563 $call_factory = $this->_StreamUnaryCallFactory($this->channel); 564 $call = $call_factory($method, $deserialize, $metadata, $options); 565 return $call; 566 } 567 568 /** 569 * Call a remote method that takes a single argument and returns a stream 570 * of responses. 571 * 572 * @param string $method The name of the method to call 573 * @param mixed $argument The argument to the method 574 * @param callable $deserialize A function that deserializes the responses 575 * @param array $metadata A metadata map to send to the server 576 * (optional) 577 * @param array $options An array of options (optional) 578 * 579 * @return ServerStreamingCall The active call object 580 */ 581 protected function _serverStreamRequest( 582 $method, 583 $argument, 584 $deserialize, 585 array $metadata = [], 586 array $options = [] 587 ) { 588 $call_factory = $this->_UnaryStreamCallFactory($this->channel); 589 $call = $call_factory($method, $argument, $deserialize, $metadata, $options); 590 return $call; 591 } 592 593 /** 594 * Call a remote method with messages streaming in both directions. 595 * 596 * @param string $method The name of the method to call 597 * @param callable $deserialize A function that deserializes the responses 598 * @param array $metadata A metadata map to send to the server 599 * (optional) 600 * @param array $options An array of options (optional) 601 * 602 * @return BidiStreamingCall The active call object 603 */ 604 protected function _bidiRequest( 605 $method, 606 $deserialize, 607 array $metadata = [], 608 array $options = [] 609 ) { 610 $call_factory = $this->_StreamStreamCallFactory($this->channel); 611 $call = $call_factory($method, $deserialize, $metadata, $options); 612 return $call; 613 } 614} 615