• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<?php
2/*
3 *
4 * Copyright 2020 gRPC authors.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
19
20/**
21 * This is the PHP xDS Interop test client. This script is meant to be run by
22 * the main xDS Interep test runner "run_xds_tests.py", not to be run
23 * by itself standalone.
24 */
25$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
26require_once $autoload_path;
27
28// The main xds interop test runner will ping this service to ask for
29// the stats of the distribution of the backends, for the next X rpcs.
30class LoadBalancerStatsService
31    extends \Grpc\Testing\LoadBalancerStatsServiceStub
32{
33    function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) {
34        $num_rpcs = $request->getNumRpcs();
35        $timeout_sec = $request->getTimeoutSec();
36        $rpcs_by_peer = [];
37        $num_failures = $num_rpcs;
38
39        // Heavy limitation now: the server is blocking, until all
40        // the necessary num_rpcs are finished, or timeout is reached
41        global $client_thread;
42        $start_id = count($client_thread->results) + 1;
43        $end_id = $start_id + $num_rpcs;
44        $now = hrtime(true);
45        $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
46        while (true) {
47            $curr_hr = hrtime(true);
48            $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
49            if ($curr_time > $timeout) {
50                break;
51            }
52            // Thread variable seems to be read-only
53            $curr_id = count($client_thread->results);
54            if ($curr_id >= $end_id) {
55                break;
56            }
57            usleep(50000);
58        }
59
60        // Tally up results
61        $end_id = min($end_id, count($client_thread->results));
62        for ($i = $start_id; $i < $end_id; $i++) {
63            $hostname = $client_thread->results[$i];
64            if ($hostname) {
65                $num_failures -= 1;
66                if (!array_key_exists($hostname, $rpcs_by_peer)) {
67                    $rpcs_by_peer[$hostname] = 0;
68                }
69                $rpcs_by_peer[$hostname] += 1;
70            }
71        }
72        $response = new Grpc\Testing\LoadBalancerStatsResponse();
73        $response->setRpcsByPeer($rpcs_by_peer);
74        $response->setNumFailures($num_failures);
75        return $response;
76    }
77}
78
79// This client thread blindly sends a unary RPC to the server once
80// every 1 / qps seconds.
81class ClientThread extends Thread {
82    private $server_address_;
83    private $target_seconds_between_rpcs_;
84    private $fail_on_failed_rpcs_;
85    private $autoload_path_;
86    public $results;
87
88    public function __construct($server_address, $qps, $fail_on_failed_rpcs,
89                                $autoload_path) {
90        $this->server_address_ = $server_address;
91        $this->target_seconds_between_rpcs_ = 1.0 / $qps;
92        $this->fail_on_failed_rpcs_ = $fail_on_failed_rpcs;
93        $this->autoload_path_ = $autoload_path;
94        $this->results = [];
95    }
96
97    public function run() {
98        // Autoloaded classes do not get inherited in threads.
99        // Hence we need to do this.
100        require_once($this->autoload_path_);
101        $TIMEOUT_US = 30 * 1e6; // 30 seconds
102
103        $stub = new Grpc\Testing\TestServiceClient($this->server_address_, [
104            'credentials' => Grpc\ChannelCredentials::createInsecure()
105        ]);
106        $request = new Grpc\Testing\SimpleRequest();
107        $target_next_start_us = hrtime(true) / 1000; # hrtime returns nanoseconds
108        while (true) {
109            $now_us = hrtime(true) / 1000;
110            $sleep_us = $target_next_start_us - $now_us;
111            if ($sleep_us < 0) {
112                $target_next_start_us =
113                        $now_us + ($this->target_seconds_between_rpcs_ * 1e6);
114                echo sprintf(
115                    "php xds: warning, rpc takes too long to finish. "
116                    . "Deficit %.1fms."
117                    . "If you consistently see this, the qps is too high.\n",
118                    round(abs($sleep_us / 1000), 1));
119            } else {
120                $target_next_start_us +=
121                        ($this->target_seconds_between_rpcs_ * 1e6);
122                usleep($sleep_us);
123            }
124            list($response, $status)
125                = $stub->UnaryCall($request, [],
126                                   ['timeout' => $TIMEOUT_US])->wait();
127            if ($status->code == Grpc\STATUS_OK) {
128                $this->results[] = $response->getHostname();
129            } else {
130                if ($this->fail_on_failed_rpcs_) {
131                    throw new Exception('UnaryCall failed with status '
132                                        . $status->code);
133                }
134                $this->results[] = "";
135            }
136        }
137    }
138
139    // This is needed for loading autoload_path in the child thread
140    public function start(int $options = PTHREADS_INHERIT_ALL) {
141        return parent::start(PTHREADS_INHERIT_NONE);
142    }
143}
144
145
146// Note: num_channels are currently ignored for now
147$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
148                    'server:', 'stats_port:', 'qps:']);
149
150$client_thread = new ClientThread($args['server'], $args['qps'],
151                                  $args['fail_on_failed_rpcs'],
152                                  $autoload_path);
153$client_thread->start();
154
155$server = new Grpc\RpcServer();
156$server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
157$server->handle(new LoadBalancerStatsService());
158$server->run();
159