1<?php 2/* 3 * 4 * Copyright 2020 gRPC authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 */ 19 20/** 21 * This is the PHP xDS Interop test client. This script is meant to be run by 22 * the main xDS Interep test runner "run_xds_tests.py", not to be run 23 * by itself standalone. 24 */ 25$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php'); 26require_once $autoload_path; 27 28// The main xds interop test runner will ping this service to ask for 29// the stats of the distribution of the backends, for the next X rpcs. 30class LoadBalancerStatsService 31 extends \Grpc\Testing\LoadBalancerStatsServiceStub 32{ 33 function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) { 34 $num_rpcs = $request->getNumRpcs(); 35 $timeout_sec = $request->getTimeoutSec(); 36 $rpcs_by_peer = []; 37 $num_failures = $num_rpcs; 38 39 // Heavy limitation now: the server is blocking, until all 40 // the necessary num_rpcs are finished, or timeout is reached 41 global $client_thread; 42 $start_id = count($client_thread->results) + 1; 43 $end_id = $start_id + $num_rpcs; 44 $now = hrtime(true); 45 $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec; 46 while (true) { 47 $curr_hr = hrtime(true); 48 $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9); 49 if ($curr_time > $timeout) { 50 break; 51 } 52 // Thread variable seems to be read-only 53 $curr_id = count($client_thread->results); 54 if ($curr_id >= $end_id) { 55 break; 56 } 57 usleep(50000); 58 } 59 60 // Tally up results 61 $end_id = min($end_id, count($client_thread->results)); 62 for ($i = $start_id; $i < $end_id; $i++) { 63 $hostname = $client_thread->results[$i]; 64 if ($hostname) { 65 $num_failures -= 1; 66 if (!array_key_exists($hostname, $rpcs_by_peer)) { 67 $rpcs_by_peer[$hostname] = 0; 68 } 69 $rpcs_by_peer[$hostname] += 1; 70 } 71 } 72 $response = new Grpc\Testing\LoadBalancerStatsResponse(); 73 $response->setRpcsByPeer($rpcs_by_peer); 74 $response->setNumFailures($num_failures); 75 return $response; 76 } 77} 78 79// This client thread blindly sends a unary RPC to the server once 80// every 1 / qps seconds. 81class ClientThread extends Thread { 82 private $server_address_; 83 private $target_seconds_between_rpcs_; 84 private $fail_on_failed_rpcs_; 85 private $autoload_path_; 86 public $results; 87 88 public function __construct($server_address, $qps, $fail_on_failed_rpcs, 89 $autoload_path) { 90 $this->server_address_ = $server_address; 91 $this->target_seconds_between_rpcs_ = 1.0 / $qps; 92 $this->fail_on_failed_rpcs_ = $fail_on_failed_rpcs; 93 $this->autoload_path_ = $autoload_path; 94 $this->results = []; 95 } 96 97 public function run() { 98 // Autoloaded classes do not get inherited in threads. 99 // Hence we need to do this. 100 require_once($this->autoload_path_); 101 $TIMEOUT_US = 30 * 1e6; // 30 seconds 102 103 $stub = new Grpc\Testing\TestServiceClient($this->server_address_, [ 104 'credentials' => Grpc\ChannelCredentials::createInsecure() 105 ]); 106 $request = new Grpc\Testing\SimpleRequest(); 107 $target_next_start_us = hrtime(true) / 1000; # hrtime returns nanoseconds 108 while (true) { 109 $now_us = hrtime(true) / 1000; 110 $sleep_us = $target_next_start_us - $now_us; 111 if ($sleep_us < 0) { 112 $target_next_start_us = 113 $now_us + ($this->target_seconds_between_rpcs_ * 1e6); 114 echo sprintf( 115 "php xds: warning, rpc takes too long to finish. " 116 . "Deficit %.1fms." 117 . "If you consistently see this, the qps is too high.\n", 118 round(abs($sleep_us / 1000), 1)); 119 } else { 120 $target_next_start_us += 121 ($this->target_seconds_between_rpcs_ * 1e6); 122 usleep($sleep_us); 123 } 124 list($response, $status) 125 = $stub->UnaryCall($request, [], 126 ['timeout' => $TIMEOUT_US])->wait(); 127 if ($status->code == Grpc\STATUS_OK) { 128 $this->results[] = $response->getHostname(); 129 } else { 130 if ($this->fail_on_failed_rpcs_) { 131 throw new Exception('UnaryCall failed with status ' 132 . $status->code); 133 } 134 $this->results[] = ""; 135 } 136 } 137 } 138 139 // This is needed for loading autoload_path in the child thread 140 public function start(int $options = PTHREADS_INHERIT_ALL) { 141 return parent::start(PTHREADS_INHERIT_NONE); 142 } 143} 144 145 146// Note: num_channels are currently ignored for now 147$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:', 148 'server:', 'stats_port:', 'qps:']); 149 150$client_thread = new ClientThread($args['server'], $args['qps'], 151 $args['fail_on_failed_rpcs'], 152 $autoload_path); 153$client_thread->start(); 154 155$server = new Grpc\RpcServer(); 156$server->addHttp2Port('0.0.0.0:'.$args['stats_port']); 157$server->handle(new LoadBalancerStatsService()); 158$server->run(); 159