• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<?php
2/*
3 *
4 * Copyright 2015 gRPC authors.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
19
20namespace Grpc;
21
22/**
23 * Base class for generated client stubs. Stub methods are expected to call
24 * _simpleRequest or _streamRequest and return the result.
25 */
26class BaseStub
27{
28    private $hostname;
29    private $hostname_override;
30    private $channel;
31    private $call_invoker;
32
33    // a callback function
34    private $update_metadata;
35
36    /**
37     * @param string  $hostname
38     * @param array   $opts
39     *  - 'update_metadata': (optional) a callback function which takes in a
40     * metadata array, and returns an updated metadata array
41     *  - 'grpc.primary_user_agent': (optional) a user-agent string
42     * @param Channel|InterceptorChannel $channel An already created Channel or InterceptorChannel object (optional)
43     */
44    public function __construct($hostname, $opts, $channel = null)
45    {
46        $ssl_roots = file_get_contents(
47            dirname(__FILE__).'/../../../../etc/roots.pem'
48        );
49        ChannelCredentials::setDefaultRootsPem($ssl_roots);
50
51        $this->hostname = $hostname;
52        $this->update_metadata = null;
53        if (isset($opts['update_metadata'])) {
54            if (is_callable($opts['update_metadata'])) {
55                $this->update_metadata = $opts['update_metadata'];
56            }
57            unset($opts['update_metadata']);
58        }
59        if (!empty($opts['grpc.ssl_target_name_override'])) {
60            $this->hostname_override = $opts['grpc.ssl_target_name_override'];
61        }
62        if (isset($opts['grpc_call_invoker'])) {
63            $this->call_invoker = $opts['grpc_call_invoker'];
64            unset($opts['grpc_call_invoker']);
65            $channel_opts = $this->updateOpts($opts);
66            // If the grpc_call_invoker is defined, use the channel created by the call invoker.
67            $this->channel = $this->call_invoker->createChannelFactory($hostname, $channel_opts);
68            return;
69        }
70        $this->call_invoker = new DefaultCallInvoker();
71        if ($channel) {
72            if (!is_a($channel, 'Grpc\Channel') &&
73                !is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
74                throw new \Exception('The channel argument is not a Channel object '.
75                    'or an InterceptorChannel object created by '.
76                    'Interceptor::intercept($channel, Interceptor|Interceptor[] $interceptors)');
77            }
78            $this->channel = $channel;
79            return;
80        }
81
82        $this->channel = static::getDefaultChannel($hostname, $opts);
83    }
84
85    private static function updateOpts($opts) {
86        if (!file_exists($composerFile = __DIR__.'/../../composer.json')) {
87            // for grpc/grpc-php subpackage
88            $composerFile = __DIR__.'/../composer.json';
89        }
90        $package_config = json_decode(file_get_contents($composerFile), true);
91        if (!empty($opts['grpc.primary_user_agent'])) {
92            $opts['grpc.primary_user_agent'] .= ' ';
93        } else {
94            $opts['grpc.primary_user_agent'] = '';
95        }
96        $opts['grpc.primary_user_agent'] .=
97            'grpc-php/'.$package_config['version'];
98        if (!array_key_exists('credentials', $opts)) {
99            throw new \Exception("The opts['credentials'] key is now ".
100                'required. Please see one of the '.
101                'ChannelCredentials::create methods');
102        }
103        return $opts;
104    }
105
106    /**
107     * Creates and returns the default Channel
108     *
109     * @param array $opts Channel constructor options
110     *
111     * @return Channel The channel
112     */
113    public static function getDefaultChannel($hostname, array $opts)
114    {
115        $channel_opts = self::updateOpts($opts);
116        return new Channel($hostname, $opts);
117    }
118
119    /**
120     * @return string The URI of the endpoint
121     */
122    public function getTarget()
123    {
124        return $this->channel->getTarget();
125    }
126
127    /**
128     * @param bool $try_to_connect (optional)
129     *
130     * @return int The grpc connectivity state
131     */
132    public function getConnectivityState($try_to_connect = false)
133    {
134        return $this->channel->getConnectivityState($try_to_connect);
135    }
136
137    /**
138     * @param int $timeout in microseconds
139     *
140     * @return bool true if channel is ready
141     * @throw Exception if channel is in FATAL_ERROR state
142     */
143    public function waitForReady($timeout)
144    {
145        $new_state = $this->getConnectivityState(true);
146        if ($this->_checkConnectivityState($new_state)) {
147            return true;
148        }
149
150        $now = Timeval::now();
151        $delta = new Timeval($timeout);
152        $deadline = $now->add($delta);
153
154        while ($this->channel->watchConnectivityState($new_state, $deadline)) {
155            // state has changed before deadline
156            $new_state = $this->getConnectivityState();
157            if ($this->_checkConnectivityState($new_state)) {
158                return true;
159            }
160        }
161        // deadline has passed
162        $new_state = $this->getConnectivityState();
163
164        return $this->_checkConnectivityState($new_state);
165    }
166
167    /**
168     * Close the communication channel associated with this stub.
169     */
170    public function close()
171    {
172        $this->channel->close();
173    }
174
175    /**
176     * @param $new_state Connect state
177     *
178     * @return bool true if state is CHANNEL_READY
179     * @throw Exception if state is CHANNEL_FATAL_FAILURE
180     */
181    private function _checkConnectivityState($new_state)
182    {
183        if ($new_state == \Grpc\CHANNEL_READY) {
184            return true;
185        }
186        if ($new_state == \Grpc\CHANNEL_FATAL_FAILURE) {
187            throw new \Exception('Failed to connect to server');
188        }
189
190        return false;
191    }
192
193    /**
194     * constructs the auth uri for the jwt.
195     *
196     * @param string $method The method string
197     *
198     * @return string The URL string
199     */
200    private function _get_jwt_aud_uri($method)
201    {
202        $last_slash_idx = strrpos($method, '/');
203        if ($last_slash_idx === false) {
204            throw new \InvalidArgumentException(
205                'service name must have a slash'
206            );
207        }
208        $service_name = substr($method, 0, $last_slash_idx);
209
210        if ($this->hostname_override) {
211            $hostname = $this->hostname_override;
212        } else {
213            $hostname = $this->hostname;
214        }
215
216        return 'https://'.$hostname.$service_name;
217    }
218
219    /**
220     * validate and normalize the metadata array.
221     *
222     * @param array $metadata The metadata map
223     *
224     * @return array $metadata Validated and key-normalized metadata map
225     * @throw InvalidArgumentException if key contains invalid characters
226     */
227    private function _validate_and_normalize_metadata($metadata)
228    {
229        $metadata_copy = [];
230        foreach ($metadata as $key => $value) {
231            if (!preg_match('/^[A-Za-z\d_-]+$/', $key)) {
232                throw new \InvalidArgumentException(
233                    'Metadata keys must be nonempty strings containing only '.
234                    'alphanumeric characters, hyphens and underscores'
235                );
236            }
237            $metadata_copy[strtolower($key)] = $value;
238        }
239
240        return $metadata_copy;
241    }
242
243    /**
244     * Create a function which can be used to create UnaryCall
245     *
246     * @param Channel|InterceptorChannel   $channel
247     * @param callable $deserialize A function that deserializes the response
248     *
249     * @return \Closure
250     */
251    private function _GrpcUnaryUnary($channel)
252    {
253        return function ($method,
254                         $argument,
255                         $deserialize,
256                         array $metadata = [],
257                         array $options = []) use ($channel) {
258            $call = $this->call_invoker->UnaryCall(
259                $channel,
260                $method,
261                $deserialize,
262                $options
263            );
264            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
265            if (is_callable($this->update_metadata)) {
266                $metadata = call_user_func(
267                    $this->update_metadata,
268                    $metadata,
269                    $jwt_aud_uri
270                );
271            }
272            $metadata = $this->_validate_and_normalize_metadata(
273                $metadata
274            );
275            $call->start($argument, $metadata, $options);
276            return $call;
277        };
278    }
279
280    /**
281     * Create a function which can be used to create ServerStreamingCall
282     *
283     * @param Channel|InterceptorChannel   $channel
284     * @param callable $deserialize A function that deserializes the response
285     *
286     * @return \Closure
287     */
288    private function _GrpcStreamUnary($channel)
289    {
290        return function ($method,
291                         $deserialize,
292                         array $metadata = [],
293                         array $options = []) use ($channel) {
294            $call = $this->call_invoker->ClientStreamingCall(
295                $channel,
296                $method,
297                $deserialize,
298                $options
299            );
300            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
301            if (is_callable($this->update_metadata)) {
302                $metadata = call_user_func(
303                    $this->update_metadata,
304                    $metadata,
305                    $jwt_aud_uri
306                );
307            }
308            $metadata = $this->_validate_and_normalize_metadata(
309                $metadata
310            );
311            $call->start($metadata);
312            return $call;
313        };
314    }
315
316    /**
317     * Create a function which can be used to create ClientStreamingCall
318     *
319     * @param Channel|InterceptorChannel   $channel
320     * @param callable $deserialize A function that deserializes the response
321     *
322     * @return \Closure
323     */
324    private function _GrpcUnaryStream($channel)
325    {
326        return function ($method,
327                         $argument,
328                         $deserialize,
329                         array $metadata = [],
330                         array $options = []) use ($channel) {
331            $call = $this->call_invoker->ServerStreamingCall(
332                $channel,
333                $method,
334                $deserialize,
335                $options
336            );
337            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
338            if (is_callable($this->update_metadata)) {
339                $metadata = call_user_func(
340                    $this->update_metadata,
341                    $metadata,
342                    $jwt_aud_uri
343                );
344            }
345            $metadata = $this->_validate_and_normalize_metadata(
346                $metadata
347            );
348            $call->start($argument, $metadata, $options);
349            return $call;
350        };
351    }
352
353    /**
354     * Create a function which can be used to create BidiStreamingCall
355     *
356     * @param Channel|InterceptorChannel   $channel
357     * @param callable $deserialize A function that deserializes the response
358     *
359     * @return \Closure
360     */
361    private function _GrpcStreamStream($channel)
362    {
363        return function ($method,
364                         $deserialize,
365                         array $metadata = [],
366                         array $options = []) use ($channel) {
367            $call = $this->call_invoker->BidiStreamingCall(
368                $channel,
369                $method,
370                $deserialize,
371                $options
372            );
373            $jwt_aud_uri = $this->_get_jwt_aud_uri($method);
374            if (is_callable($this->update_metadata)) {
375                $metadata = call_user_func(
376                    $this->update_metadata,
377                    $metadata,
378                    $jwt_aud_uri
379                );
380            }
381            $metadata = $this->_validate_and_normalize_metadata(
382                $metadata
383            );
384            $call->start($metadata);
385
386            return $call;
387        };
388    }
389
390    /**
391     * Create a function which can be used to create UnaryCall
392     *
393     * @param Channel|InterceptorChannel   $channel
394     * @param callable $deserialize A function that deserializes the response
395     *
396     * @return \Closure
397     */
398    private function _UnaryUnaryCallFactory($channel)
399    {
400        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
401            return function ($method,
402                             $argument,
403                             $deserialize,
404                             array $metadata = [],
405                             array $options = []) use ($channel) {
406                return $channel->getInterceptor()->interceptUnaryUnary(
407                    $method,
408                    $argument,
409                    $deserialize,
410                    $metadata,
411                    $options,
412                    $this->_UnaryUnaryCallFactory($channel->getNext())
413                );
414            };
415        }
416        return $this->_GrpcUnaryUnary($channel);
417    }
418
419    /**
420     * Create a function which can be used to create ServerStreamingCall
421     *
422     * @param Channel|InterceptorChannel   $channel
423     * @param callable $deserialize A function that deserializes the response
424     *
425     * @return \Closure
426     */
427    private function _UnaryStreamCallFactory($channel)
428    {
429        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
430            return function ($method,
431                             $argument,
432                             $deserialize,
433                             array $metadata = [],
434                             array $options = []) use ($channel) {
435                return $channel->getInterceptor()->interceptUnaryStream(
436                    $method,
437                    $argument,
438                    $deserialize,
439                    $metadata,
440                    $options,
441                    $this->_UnaryStreamCallFactory($channel->getNext())
442                );
443            };
444        }
445        return $this->_GrpcUnaryStream($channel);
446    }
447
448    /**
449     * Create a function which can be used to create ClientStreamingCall
450     *
451     * @param Channel|InterceptorChannel   $channel
452     * @param callable $deserialize A function that deserializes the response
453     *
454     * @return \Closure
455     */
456    private function _StreamUnaryCallFactory($channel)
457    {
458        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
459            return function ($method,
460                             $deserialize,
461                             array $metadata = [],
462                             array $options = []) use ($channel) {
463                return $channel->getInterceptor()->interceptStreamUnary(
464                    $method,
465                    $deserialize,
466                    $metadata,
467                    $options,
468                    $this->_StreamUnaryCallFactory($channel->getNext())
469                );
470            };
471        }
472        return $this->_GrpcStreamUnary($channel);
473    }
474
475    /**
476     * Create a function which can be used to create BidiStreamingCall
477     *
478     * @param Channel|InterceptorChannel   $channel
479     * @param callable $deserialize A function that deserializes the response
480     *
481     * @return \Closure
482     */
483    private function _StreamStreamCallFactory($channel)
484    {
485        if (is_a($channel, 'Grpc\Internal\InterceptorChannel')) {
486            return function ($method,
487                             $deserialize,
488                             array $metadata = [],
489                             array $options = []) use ($channel) {
490                return $channel->getInterceptor()->interceptStreamStream(
491                    $method,
492                    $deserialize,
493                    $metadata,
494                    $options,
495                    $this->_StreamStreamCallFactory($channel->getNext())
496                );
497            };
498        }
499        return $this->_GrpcStreamStream($channel);
500    }
501
502    /* This class is intended to be subclassed by generated code, so
503     * all functions begin with "_" to avoid name collisions. */
504    /**
505     * Call a remote method that takes a single argument and has a
506     * single output.
507     *
508     * @param string   $method      The name of the method to call
509     * @param mixed    $argument    The argument to the method
510     * @param callable $deserialize A function that deserializes the response
511     * @param array    $metadata    A metadata map to send to the server
512     *                              (optional)
513     * @param array    $options     An array of options (optional)
514     *
515     * @return UnaryCall The active call object
516     */
517    protected function _simpleRequest(
518        $method,
519        $argument,
520        $deserialize,
521        array $metadata = [],
522        array $options = []
523    ) {
524        $call_factory = $this->_UnaryUnaryCallFactory($this->channel);
525        $call = $call_factory($method, $argument, $deserialize, $metadata, $options);
526        return $call;
527    }
528
529    /**
530     * Call a remote method that takes a stream of arguments and has a single
531     * output.
532     *
533     * @param string   $method      The name of the method to call
534     * @param callable $deserialize A function that deserializes the response
535     * @param array    $metadata    A metadata map to send to the server
536     *                              (optional)
537     * @param array    $options     An array of options (optional)
538     *
539     * @return ClientStreamingCall The active call object
540     */
541    protected function _clientStreamRequest(
542        $method,
543        $deserialize,
544        array $metadata = [],
545        array $options = []
546    ) {
547        $call_factory = $this->_StreamUnaryCallFactory($this->channel);
548        $call = $call_factory($method, $deserialize, $metadata, $options);
549        return $call;
550    }
551
552    /**
553     * Call a remote method that takes a single argument and returns a stream
554     * of responses.
555     *
556     * @param string   $method      The name of the method to call
557     * @param mixed    $argument    The argument to the method
558     * @param callable $deserialize A function that deserializes the responses
559     * @param array    $metadata    A metadata map to send to the server
560     *                              (optional)
561     * @param array    $options     An array of options (optional)
562     *
563     * @return ServerStreamingCall The active call object
564     */
565    protected function _serverStreamRequest(
566        $method,
567        $argument,
568        $deserialize,
569        array $metadata = [],
570        array $options = []
571    ) {
572        $call_factory = $this->_UnaryStreamCallFactory($this->channel);
573        $call = $call_factory($method, $argument, $deserialize, $metadata, $options);
574        return $call;
575    }
576
577    /**
578     * Call a remote method with messages streaming in both directions.
579     *
580     * @param string   $method      The name of the method to call
581     * @param callable $deserialize A function that deserializes the responses
582     * @param array    $metadata    A metadata map to send to the server
583     *                              (optional)
584     * @param array    $options     An array of options (optional)
585     *
586     * @return BidiStreamingCall The active call object
587     */
588    protected function _bidiRequest(
589        $method,
590        $deserialize,
591        array $metadata = [],
592        array $options = []
593    ) {
594        $call_factory = $this->_StreamStreamCallFactory($this->channel);
595        $call = $call_factory($method, $deserialize, $metadata, $options);
596        return $call;
597    }
598}
599