• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<?php
2/*
3 *
4 * Copyright 2015 gRPC authors.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
19
20namespace Grpc;
21
22/**
23 * Base class for generated client stubs. Stub methods are expected to call
24 * _simpleRequest or _streamRequest and return the result.
25 */
26class BaseStub
27{
28    private $hostname;
29    private $hostname_override;
30    private $channel;
31    private $call_invoker;
32
33    // a callback function
34    private $update_metadata;
35
36    /**
37     * @param string  $hostname
38     * @param array   $opts
39     *  - 'update_metadata': (optional) a callback function which takes in a
40     * metadata array, and returns an updated metadata array
41     *  - 'grpc.primary_user_agent': (optional) a user-agent string
42     * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
43     */
44    public function __construct($hostname, $opts, $channel = null)
45    {
46        if (!method_exists('ChannelCredentials', 'isDefaultRootsPemSet') ||
47            !ChannelCredentials::isDefaultRootsPemSet()) {
48            $ssl_roots = file_get_contents(
49                dirname(__FILE__).'/../../../../etc/roots.pem'
50            );
51            ChannelCredentials::setDefaultRootsPem($ssl_roots);
52        }
53
54        $this->hostname = $hostname;
55        $this->update_metadata = null;
56        if (isset($opts['update_metadata'])) {
57            if (is_callable($opts['update_metadata'])) {
58                $this->update_metadata = $opts['update_metadata'];
59            }
60            unset($opts['update_metadata']);
61        }
62        if (!empty($opts['grpc.ssl_target_name_override'])) {
63            $this->hostname_override = $opts['grpc.ssl_target_name_override'];
64        }
65        if (isset($opts['grpc_call_invoker'])) {
66            $this->call_invoker = $opts['grpc_call_invoker'];
67            unset($opts['grpc_call_invoker']);
68            $channel_opts = $this->updateOpts($opts);
69            // If the grpc_call_invoker is defined, use the channel created by the call invoker.
70            $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts);
71            return;
72        }
73        $this->call_invoker = new DefaultCallInvoker();
74        if ($channel) {
75            if (!is_a($channel, 'Grpc\Channel') &&
76                !is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
77                throw new \Exception('The channel argument is not a Channel object '.
78                    'or an InterceptorChannel object created by '.
79                    'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
80            }
81            $this->channel = $channel;
82            return;
83        }
84
85        $this->channel = static::getDefaultChannel($hostname, $opts);
86    }
87
88    private static function updateOpts($opts) {
89        if (!file_exists($composerFile = __DIR__.'/../../composer.json')) {
90            // for grpc/grpc-php subpackage
91            $composerFile = __DIR__.'/../composer.json';
92        }
93        $package_config = json_decode(file_get_contents($composerFile), true);
94        if (!empty($opts['grpc.primary_user_agent'])) {
95            $opts['grpc.primary_user_agent'] .= ' ';
96        } else {
97            $opts['grpc.primary_user_agent'] = '';
98        }
99        $opts['grpc.primary_user_agent'] .=
100            'grpc-php/'.$package_config['version'];
101        if (!array_key_exists('credentials', $opts)) {
102            throw new \Exception("The opts['credentials'] key is now ".
103                'required. Please see one of the '.
104                'ChannelCredentials::create methods');
105        }
106        return $opts;
107    }
108
109    /**
110     * Creates and returns the default Channel
111     *
112     * @param array $opts Channel constructor options
113     *
114     * @return Channel The channel
115     */
116    public static function getDefaultChannel($hostname, array $opts)
117    {
118        $channel_opts = self::updateOpts($opts);
119        return new Channel($hostname, $opts);
120    }
121
122    /**
123     * @return string The URI of the endpoint
124     */
125    public function getTarget()
126    {
127        return $this->channel->getTarget();
128    }
129
130    /**
131     * @param bool $try_to_connect (optional)
132     *
133     * @return int The grpc connectivity state
134     */
135    public function getConnectivityState($try_to_connect = false)
136    {
137        return $this->channel->getConnectivityState($try_to_connect);
138    }
139
140    /**
141     * @param int $timeout in microseconds
142     *
143     * @return bool true if channel is ready
144     * @throw Exception if channel is in FATAL_ERROR state
145     */
146    public function waitForReady($timeout)
147    {
148        $new_state = $this->getConnectivityState(true);
149        if ($this->_checkConnectivityState($new_state)) {
150            return true;
151        }
152
153        $now = Timeval::now();
154        $delta = new Timeval($timeout);
155        $deadline = $now->add($delta);
156
157        while ($this->channel->watchConnectivityState($new_state, $deadline)) {
158            // state has changed before deadline
159            $new_state = $this->getConnectivityState();
160            if ($this->_checkConnectivityState($new_state)) {
161                return true;
162            }
163        }
164        // deadline has passed
165        $new_state = $this->getConnectivityState();
166
167        return $this->_checkConnectivityState($new_state);
168    }
169
170    /**
171     * Close the communication channel associated with this stub.
172     */
173    public function close()
174    {
175        $this->channel->close();
176    }
177
178    /**
179     * @param $new_state Connect state
180     *
181     * @return bool true if state is CHANNEL_READY
182     * @throw Exception if state is CHANNEL_FATAL_FAILURE
183     */
184    private function _checkConnectivityState($new_state)
185    {
186        if ($new_state == \Grpc\CHANNEL_READY) {
187            return true;
188        }
189        if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) {
190            throw new \Exception('Failed to connect to server');
191        }
192
193        return false;
194    }
195
196    /**
197     * constructs the auth uri for the jwt.
198     *
199     * @param string $method The method string
200     *
201     * @return string The URL string
202     */
203    private function _get_jwt_aud_uri($method)
204    {
205        // TODO(jtattermusch): This is not the correct implementation
206        // of extracting JWT "aud" claim. We should rely on
207        // grpc_metadata_credentials_plugin which
208        // also provides the correct value of "aud" claim
209        // in the grpc_auth_metadata_context.service_url field.
210        // Trying to do the construction of "aud" field ourselves
211        // is bad.
212        $last_slash_idx = strrpos($method, '/');
213        if ($last_slash_idx === false) {
214            throw new \InvalidArgumentException(
215                'service name must have a slash'
216            );
217        }
218        $service_name = substr($method, 0, $last_slash_idx);
219
220        if ($this->hostname_override) {
221            $hostname = $this->hostname_override;
222        } else {
223            $hostname = $this->hostname;
224        }
225
226        // Remove the port if it is 443
227        // See https://github.com/grpc/grpc/blob/07c9f7a36b2a0d34fcffebc85649cf3b8c339b5d/src/core/lib/security/transport/client_auth_filter.cc#L205
228        if ((strlen($hostname) > 4) && (substr($hostname, -4) === ":443")) {
229            $hostname = substr($hostname, 0, -4);
230        }
231
232        return 'https://'.$hostname.$service_name;
233    }
234
235    /**
236     * validate and normalize the metadata array.
237     *
238     * @param array $metadata The metadata map
239     *
240     * @return array $metadata Validated and key-normalized metadata map
241     * @throw InvalidArgumentException if key contains invalid characters
242     */
243    private function _validate_and_normalize_metadata($metadata)
244    {
245        $metadata_copy = [];
246        foreach ($metadata as $key => $value) {
247            if (!preg_match('/^[.A-Za-z\d_-]+$/', $key)) {
248                throw new \InvalidArgumentException(
249                    'Metadata keys must be nonempty strings containing only '.
250                    'alphanumeric characters, hyphens, underscores and dots'
251                );
252            }
253            $metadata_copy[strtolower($key)] = $value;
254        }
255
256        return $metadata_copy;
257    }
258
259    /**
260     * Create a function which can be used to create UnaryCall
261     *
262     * @param Channel|InterceptorChannel   $channel
263     * @param callable $deserialize A function that deserializes the response
264     *
265     * @return \Closure
266     */
267    private function _GrpcUnaryUnary($channel)
268    {
269        return function ($method,
270                         $argument,
271                         $deserialize,
272                         array $metadata = [],
273                         array $options = []) use ($channel) {
274            $call = $this->call_invoker->UnaryCall(
275                $channel,
276                $method,
277                $deserialize,
278                $options
279            );
280            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
281            if (is_callable($this->update_metadata)) {
282                $metadata = call_user_func(
283                    $this->update_metadata,
284                    $metadata,
285                    $jwt_aud_uri
286                );
287            }
288            $metadata = $this->_validate_and_normalize_metadata(
289                $metadata
290            );
291            $call->start($argument, $metadata, $options);
292            return $call;
293        };
294    }
295
296    /**
297     * Create a function which can be used to create ServerStreamingCall
298     *
299     * @param Channel|InterceptorChannel   $channel
300     * @param callable $deserialize A function that deserializes the response
301     *
302     * @return \Closure
303     */
304    private function _GrpcStreamUnary($channel)
305    {
306        return function ($method,
307                         $deserialize,
308                         array $metadata = [],
309                         array $options = []) use ($channel) {
310            $call = $this->call_invoker->ClientStreamingCall(
311                $channel,
312                $method,
313                $deserialize,
314                $options
315            );
316            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
317            if (is_callable($this->update_metadata)) {
318                $metadata = call_user_func(
319                    $this->update_metadata,
320                    $metadata,
321                    $jwt_aud_uri
322                );
323            }
324            $metadata = $this->_validate_and_normalize_metadata(
325                $metadata
326            );
327            $call->start($metadata);
328            return $call;
329        };
330    }
331
332    /**
333     * Create a function which can be used to create ClientStreamingCall
334     *
335     * @param Channel|InterceptorChannel   $channel
336     * @param callable $deserialize A function that deserializes the response
337     *
338     * @return \Closure
339     */
340    private function _GrpcUnaryStream($channel)
341    {
342        return function ($method,
343                         $argument,
344                         $deserialize,
345                         array $metadata = [],
346                         array $options = []) use ($channel) {
347            $call = $this->call_invoker->ServerStreamingCall(
348                $channel,
349                $method,
350                $deserialize,
351                $options
352            );
353            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
354            if (is_callable($this->update_metadata)) {
355                $metadata = call_user_func(
356                    $this->update_metadata,
357                    $metadata,
358                    $jwt_aud_uri
359                );
360            }
361            $metadata = $this->_validate_and_normalize_metadata(
362                $metadata
363            );
364            $call->start($argument, $metadata, $options);
365            return $call;
366        };
367    }
368
369    /**
370     * Create a function which can be used to create BidiStreamingCall
371     *
372     * @param Channel|InterceptorChannel   $channel
373     * @param callable $deserialize A function that deserializes the response
374     *
375     * @return \Closure
376     */
377    private function _GrpcStreamStream($channel)
378    {
379        return function ($method,
380                         $deserialize,
381                         array $metadata = [],
382                         array $options = []) use ($channel) {
383            $call = $this->call_invoker->BidiStreamingCall(
384                $channel,
385                $method,
386                $deserialize,
387                $options
388            );
389            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
390            if (is_callable($this->update_metadata)) {
391                $metadata = call_user_func(
392                    $this->update_metadata,
393                    $metadata,
394                    $jwt_aud_uri
395                );
396            }
397            $metadata = $this->_validate_and_normalize_metadata(
398                $metadata
399            );
400            $call->start($metadata);
401
402            return $call;
403        };
404    }
405
406    /**
407     * Create a function which can be used to create UnaryCall
408     *
409     * @param Channel|InterceptorChannel   $channel
410     * @param callable $deserialize A function that deserializes the response
411     *
412     * @return \Closure
413     */
414    private function _UnaryUnaryCallFactory($channel)
415    {
416        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
417            return function ($method,
418                             $argument,
419                             $deserialize,
420                             array $metadata = [],
421                             array $options = []) use ($channel) {
422                return $channel->getInterceptor()->interceptUnaryUnary(
423                    $method,
424                    $argument,
425                    $deserialize,
426                    $this->_UnaryUnaryCallFactory($channel->getNext()),
427                    $metadata,
428                    $options
429                );
430            };
431        }
432        return $this->_GrpcUnaryUnary($channel);
433    }
434
435    /**
436     * Create a function which can be used to create ServerStreamingCall
437     *
438     * @param Channel|InterceptorChannel   $channel
439     * @param callable $deserialize A function that deserializes the response
440     *
441     * @return \Closure
442     */
443    private function _UnaryStreamCallFactory($channel)
444    {
445        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
446            return function ($method,
447                             $argument,
448                             $deserialize,
449                             array $metadata = [],
450                             array $options = []) use ($channel) {
451                return $channel->getInterceptor()->interceptUnaryStream(
452                    $method,
453                    $argument,
454                    $deserialize,
455                    $this->_UnaryStreamCallFactory($channel->getNext()),
456                    $metadata,
457                    $options
458                );
459            };
460        }
461        return $this->_GrpcUnaryStream($channel);
462    }
463
464    /**
465     * Create a function which can be used to create ClientStreamingCall
466     *
467     * @param Channel|InterceptorChannel   $channel
468     * @param callable $deserialize A function that deserializes the response
469     *
470     * @return \Closure
471     */
472    private function _StreamUnaryCallFactory($channel)
473    {
474        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
475            return function ($method,
476                             $deserialize,
477                             array $metadata = [],
478                             array $options = []) use ($channel) {
479                return $channel->getInterceptor()->interceptStreamUnary(
480                    $method,
481                    $deserialize,
482                    $this->_StreamUnaryCallFactory($channel->getNext()),
483                    $metadata,
484                    $options
485                );
486            };
487        }
488        return $this->_GrpcStreamUnary($channel);
489    }
490
491    /**
492     * Create a function which can be used to create BidiStreamingCall
493     *
494     * @param Channel|InterceptorChannel   $channel
495     * @param callable $deserialize A function that deserializes the response
496     *
497     * @return \Closure
498     */
499    private function _StreamStreamCallFactory($channel)
500    {
501        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
502            return function ($method,
503                             $deserialize,
504                             array $metadata = [],
505                             array $options = []) use ($channel) {
506                return $channel->getInterceptor()->interceptStreamStream(
507                    $method,
508                    $deserialize,
509                    $this->_StreamStreamCallFactory($channel->getNext()),
510                    $metadata,
511                    $options
512                );
513            };
514        }
515        return $this->_GrpcStreamStream($channel);
516    }
517
518    /* This class is intended to be subclassed by generated code, so
519     * all functions begin with "_" to avoid name collisions. */
520    /**
521     * Call a remote method that takes a single argument and has a
522     * single output.
523     *
524     * @param string   $method      The name of the method to call
525     * @param mixed    $argument    The argument to the method
526     * @param callable $deserialize A function that deserializes the response
527     * @param array    $metadata    A metadata map to send to the server
528     *                              (optional)
529     * @param array    $options     An array of options (optional)
530     *
531     * @return UnaryCall The active call object
532     */
533    protected function _simpleRequest(
534        $method,
535        $argument,
536        $deserialize,
537        array $metadata = [],
538        array $options = []
539    ) {
540        $call_factory = $this->_UnaryUnaryCallFactory($this->channel);
541        $call = $call_factory($method, $argument, $deserialize, $metadata, $options);
542        return $call;
543    }
544
545    /**
546     * Call a remote method that takes a stream of arguments and has a single
547     * output.
548     *
549     * @param string   $method      The name of the method to call
550     * @param callable $deserialize A function that deserializes the response
551     * @param array    $metadata    A metadata map to send to the server
552     *                              (optional)
553     * @param array    $options     An array of options (optional)
554     *
555     * @return ClientStreamingCall The active call object
556     */
557    protected function _clientStreamRequest(
558        $method,
559        $deserialize,
560        array $metadata = [],
561        array $options = []
562    ) {
563        $call_factory = $this->_StreamUnaryCallFactory($this->channel);
564        $call = $call_factory($method, $deserialize, $metadata, $options);
565        return $call;
566    }
567
568    /**
569     * Call a remote method that takes a single argument and returns a stream
570     * of responses.
571     *
572     * @param string   $method      The name of the method to call
573     * @param mixed    $argument    The argument to the method
574     * @param callable $deserialize A function that deserializes the responses
575     * @param array    $metadata    A metadata map to send to the server
576     *                              (optional)
577     * @param array    $options     An array of options (optional)
578     *
579     * @return ServerStreamingCall The active call object
580     */
581    protected function _serverStreamRequest(
582        $method,
583        $argument,
584        $deserialize,
585        array $metadata = [],
586        array $options = []
587    ) {
588        $call_factory = $this->_UnaryStreamCallFactory($this->channel);
589        $call = $call_factory($method, $argument, $deserialize, $metadata, $options);
590        return $call;
591    }
592
593    /**
594     * Call a remote method with messages streaming in both directions.
595     *
596     * @param string   $method      The name of the method to call
597     * @param callable $deserialize A function that deserializes the responses
598     * @param array    $metadata    A metadata map to send to the server
599     *                              (optional)
600     * @param array    $options     An array of options (optional)
601     *
602     * @return BidiStreamingCall The active call object
603     */
604    protected function _bidiRequest(
605        $method,
606        $deserialize,
607        array $metadata = [],
608        array $options = []
609    ) {
610        $call_factory = $this->_StreamStreamCallFactory($this->channel);
611        $call = $call_factory($method, $deserialize, $metadata, $options);
612        return $call;
613    }
614}
615