• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1<?php
2/*
3 *
4 * Copyright 2020 gRPC authors.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 */
19
20/**
21 * This is the PHP xDS Interop test client. This script is meant to be run by
22 * the main xDS Interep test runner "run_xds_tests.py", not to be run
23 * by itself standalone.
24 */
25$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php');
26require_once $autoload_path;
27
28// The main xds interop test runner will ping this service to ask for
29// the stats of the distribution of the backends, for the next X rpcs.
30class LoadBalancerStatsService
31    extends \Grpc\Testing\LoadBalancerStatsServiceStub
32{
33    function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) {
34        $num_rpcs = $request->getNumRpcs();
35        $timeout_sec = $request->getTimeoutSec();
36        $rpcs_by_method = [];
37        $rpcs_by_peer = [];
38        $num_failures = 0;
39
40        // Heavy limitation now: the server is blocking, until all
41        // the necessary num_rpcs are finished, or timeout is reached
42        global $client_thread;
43        $start_id = $client_thread->num_results + 1;
44        $end_id = $start_id + $num_rpcs;
45        $now = hrtime(true);
46        $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec;
47        while (true) {
48            $curr_hr = hrtime(true);
49            $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9);
50            if ($curr_time > $timeout) {
51                break;
52            }
53            // Thread variable seems to be read-only
54            $curr_id = $client_thread->num_results;
55            if ($curr_id >= $end_id) {
56                break;
57            }
58            usleep(50000);
59        }
60
61        // Tally up results
62        $end_id = min($end_id, $client_thread->num_results);
63        // "$client_thread->results" will be in the form of
64        // [
65        //   'rpc1' => [
66        //     'hostname1', '', 'hostname2', 'hostname1', '', ...
67        //   ],
68        //   'rpc2' => [
69        //     '', 'hostname1', 'hostname2', '', 'hostname2', ...
70        //   ],
71        // ]
72        foreach ($client_thread->results as $rpc => $results) {
73            // initialize, can always start from scratch here
74            $rpcs_by_method[$rpc] = [];
75            for ($i = $start_id; $i < $end_id; $i++) {
76                $hostname = $results[$i];
77                if ($hostname) {
78                    // initialize in case we haven't seen this hostname
79                    // before
80                    if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) {
81                        $rpcs_by_method[$rpc][$hostname] = 0;
82                    }
83                    if (!array_key_exists($hostname, $rpcs_by_peer)) {
84                        $rpcs_by_peer[$hostname] = 0;
85                    }
86                    // increment the remote hostname distribution histogram
87                    // both by overall, and broken down per RPC
88                    $rpcs_by_method[$rpc][$hostname] += 1;
89                    $rpcs_by_peer[$hostname] += 1;
90                } else {
91                    // $num_failures here are counted per individual RPC
92                    $num_failures += 1;
93                }
94            }
95        }
96
97        // Convert our hashmaps above into protobuf objects
98        $response = new Grpc\Testing\LoadBalancerStatsResponse();
99        $rpcs_by_method_map = [];
100        foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) {
101            $rpcs_by_peer_proto_obj
102                = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer();
103            $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method);
104            $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj;
105        }
106        $response->setRpcsByPeer($rpcs_by_peer);
107        $response->setRpcsByMethod($rpcs_by_method_map);
108        $response->setNumFailures($num_failures);
109        return $response;
110    }
111}
112
113// This client thread blindly sends a unary RPC to the server once
114// every 1 / qps seconds.
115class ClientThread extends Thread {
116    private $server_address_;
117    private $target_seconds_between_rpcs_;
118    private $fail_on_failed_rpcs_;
119    private $autoload_path_;
120    private $TIMEOUT_US = 30 * 1e6; // 30 seconds
121    public $num_results = 0;
122    public $results;
123
124    public function __construct($server_address, $qps, $fail_on_failed_rpcs,
125                                $rpcs_to_send, $metadata_to_send,
126                                $autoload_path) {
127        $this->server_address_ = $server_address;
128        $this->target_seconds_between_rpcs_ = 1.0 / $qps;
129        $this->fail_on_failed_rpcs_ = $fail_on_failed_rpcs;
130        $this->rpcs_to_send = explode(',', $rpcs_to_send);
131        // Convert input in the form of
132        //   rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3
133        // into
134        //   [
135        //     'rpc1' => [
136        //       'k1' => 'v1',
137        //       'k3' => 'v3',
138        //     ],
139        //     'rpc2' => [
140        //       'k2' => 'v2'
141        //     ],
142        //   ]
143        $this->metadata_to_send = [];
144        if ($_all_metadata = explode(',', $metadata_to_send)) {
145            foreach ($_all_metadata as $one_metadata_pair) {
146                list($rpc,
147                     $metadata_key,
148                     $metadata_value) = explode(':', $one_metadata_pair);
149                // initialize in case we haven't seen this rpc before
150                if (!array_key_exists($rpc, $this->metadata_to_send)) {
151                    $this->metadata_to_send[$rpc] = [];
152                }
153                $this->metadata_to_send[$rpc][$metadata_key]
154                    = $metadata_value;
155            }
156        }
157        $this->autoload_path_ = $autoload_path;
158        $this->simple_request = new Grpc\Testing\SimpleRequest();
159        $this->empty_request = new Grpc\Testing\EmptyMessage();
160        $this->results = [];
161        foreach ($this->rpcs_to_send as $rpc) {
162            $this->results[$rpc] = [];
163        }
164    }
165
166    public function sendUnaryCall($stub, $metadata) {
167        return $stub->UnaryCall($this->simple_request,
168                                $metadata,
169                                ['timeout' => $this->TIMEOUT_US]);
170    }
171
172    public function sendEmptyCall($stub, $metadata) {
173        return $stub->EmptyCall($this->empty_request,
174                                $metadata,
175                                ['timeout' => $this->TIMEOUT_US]);
176    }
177
178    public function run() {
179        // Autoloaded classes do not get inherited in threads.
180        // Hence we need to do this.
181        require_once($this->autoload_path_);
182
183        $stub = new Grpc\Testing\TestServiceClient($this->server_address_, [
184            'credentials' => Grpc\ChannelCredentials::createInsecure()
185        ]);
186        # hrtime returns nanoseconds
187        $target_next_start_us = hrtime(true) / 1000;
188        while (true) {
189            $now_us = hrtime(true) / 1000;
190            $sleep_us = $target_next_start_us - $now_us;
191            if ($sleep_us < 0) {
192                $target_next_start_us =
193                        $now_us + ($this->target_seconds_between_rpcs_ * 1e6);
194                echo sprintf(
195                    "php xds: warning, rpc takes too long to finish. "
196                    . "Deficit %.1fms."
197                    . "If you consistently see this, the qps is too high.\n",
198                    round(abs($sleep_us / 1000), 1));
199            } else {
200                $target_next_start_us +=
201                        ($this->target_seconds_between_rpcs_ * 1e6);
202                usleep($sleep_us);
203            }
204            foreach ($this->rpcs_to_send as $rpc) {
205                $metadata = array_key_exists(
206                    $rpc, $this->metadata_to_send) ?
207                          $this->metadata_to_send[$rpc] : [];
208                // This copy is somehow necessary because
209                // $this->metadata_to_send[$rpc] somehow becomes a
210                // Volatile object, instead of an associative array.
211                $metadata_array = [];
212                foreach ($metadata as $key => $value) {
213                    $metadata_array[$key] = [$value];
214                }
215                $call = null;
216                if ($rpc == 'UnaryCall') {
217                    $call = $this->sendUnaryCall($stub, $metadata_array);
218                } else if ($rpc == 'EmptyCall') {
219                    $call = $this->sendEmptyCall($stub, $metadata_array);
220                } else {
221                    throw new Exception("Unhandled rpc $rpc");
222                }
223                // the remote peer is being returned as part of the
224                // initial metadata, according to the test spec
225                $initial_metadata = $call->getMetadata();
226                list($response, $status) = $call->wait();
227                if ($status->code == Grpc\STATUS_OK &&
228                    array_key_exists('hostname', $initial_metadata)) {
229                    $this->results[$rpc][] = $initial_metadata['hostname'][0];
230                } else {
231                    if ($this->fail_on_failed_rpcs_) {
232                        throw new Exception("$rpc failed with status "
233                                            . $status->code);
234                    }
235                    $this->results[$rpc][] = "";
236                }
237            }
238            // $num_results here is only incremented when the group of
239            // all $rpcs_to_send are done.
240            $this->num_results++;
241        }
242    }
243
244    // This is needed for loading autoload_path in the child thread
245    public function start(int $options = PTHREADS_INHERIT_ALL) {
246        return parent::start(PTHREADS_INHERIT_NONE);
247    }
248}
249
250
251// Note: num_channels are currently ignored for now
252$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:',
253                    'rpc:', 'metadata:',
254                    'server:', 'stats_port:', 'qps:']);
255
256$client_thread = new ClientThread($args['server'], $args['qps'],
257                                  $args['fail_on_failed_rpcs'],
258                                  (empty($args['rpc']) ? 'UnaryCall'
259                                   : $args['rpc']),
260                                  $args['metadata'],
261                                  $autoload_path);
262$client_thread->start();
263
264$server = new Grpc\RpcServer();
265$server->addHttp2Port('0.0.0.0:'.$args['stats_port']);
266$server->handle(new LoadBalancerStatsService());
267$server->run();
268