1<?php 2/* 3 * 4 * Copyright 2020 gRPC authors. 5 * 6 * Licensed under the Apache License, Version 2.0 (the "License"); 7 * you may not use this file except in compliance with the License. 8 * You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 * 18 */ 19 20/** 21 * This is the PHP xDS Interop test client. This script is meant to be run by 22 * the main xDS Interep test runner "run_xds_tests.py", not to be run 23 * by itself standalone. 24 */ 25$autoload_path = realpath(dirname(__FILE__).'/../../vendor/autoload.php'); 26require_once $autoload_path; 27 28// The main xds interop test runner will ping this service to ask for 29// the stats of the distribution of the backends, for the next X rpcs. 30class LoadBalancerStatsService 31 extends \Grpc\Testing\LoadBalancerStatsServiceStub 32{ 33 function getClientStats(\Grpc\Testing\LoadBalancerStatsRequest $request) { 34 $num_rpcs = $request->getNumRpcs(); 35 $timeout_sec = $request->getTimeoutSec(); 36 $rpcs_by_method = []; 37 $rpcs_by_peer = []; 38 $num_failures = 0; 39 40 // Heavy limitation now: the server is blocking, until all 41 // the necessary num_rpcs are finished, or timeout is reached 42 global $client_thread; 43 $start_id = $client_thread->num_results + 1; 44 $end_id = $start_id + $num_rpcs; 45 $now = hrtime(true); 46 $timeout = $now[0] + ($now[1] / 1e9) + $timeout_sec; 47 while (true) { 48 $curr_hr = hrtime(true); 49 $curr_time = $curr_hr[0] + ($curr_hr[1] / 1e9); 50 if ($curr_time > $timeout) { 51 break; 52 } 53 // Thread variable seems to be read-only 54 $curr_id = $client_thread->num_results; 55 if ($curr_id >= $end_id) { 56 break; 57 } 58 usleep(50000); 59 } 60 61 // Tally up results 62 $end_id = min($end_id, $client_thread->num_results); 63 // "$client_thread->results" will be in the form of 64 // [ 65 // 'rpc1' => [ 66 // 'hostname1', '', 'hostname2', 'hostname1', '', ... 67 // ], 68 // 'rpc2' => [ 69 // '', 'hostname1', 'hostname2', '', 'hostname2', ... 70 // ], 71 // ] 72 foreach ($client_thread->results as $rpc => $results) { 73 // initialize, can always start from scratch here 74 $rpcs_by_method[$rpc] = []; 75 for ($i = $start_id; $i < $end_id; $i++) { 76 $hostname = $results[$i]; 77 if ($hostname) { 78 // initialize in case we haven't seen this hostname 79 // before 80 if (!array_key_exists($hostname, $rpcs_by_method[$rpc])) { 81 $rpcs_by_method[$rpc][$hostname] = 0; 82 } 83 if (!array_key_exists($hostname, $rpcs_by_peer)) { 84 $rpcs_by_peer[$hostname] = 0; 85 } 86 // increment the remote hostname distribution histogram 87 // both by overall, and broken down per RPC 88 $rpcs_by_method[$rpc][$hostname] += 1; 89 $rpcs_by_peer[$hostname] += 1; 90 } else { 91 // $num_failures here are counted per individual RPC 92 $num_failures += 1; 93 } 94 } 95 } 96 97 // Convert our hashmaps above into protobuf objects 98 $response = new Grpc\Testing\LoadBalancerStatsResponse(); 99 $rpcs_by_method_map = []; 100 foreach ($rpcs_by_method as $rpc => $rpcs_by_peer_per_method) { 101 $rpcs_by_peer_proto_obj 102 = new Grpc\Testing\LoadBalancerStatsResponse\RpcsByPeer(); 103 $rpcs_by_peer_proto_obj->setRpcsByPeer($rpcs_by_peer_per_method); 104 $rpcs_by_method_map[$rpc] = $rpcs_by_peer_proto_obj; 105 } 106 $response->setRpcsByPeer($rpcs_by_peer); 107 $response->setRpcsByMethod($rpcs_by_method_map); 108 $response->setNumFailures($num_failures); 109 return $response; 110 } 111} 112 113// This client thread blindly sends a unary RPC to the server once 114// every 1 / qps seconds. 115class ClientThread extends Thread { 116 private $server_address_; 117 private $target_seconds_between_rpcs_; 118 private $fail_on_failed_rpcs_; 119 private $autoload_path_; 120 private $TIMEOUT_US = 30 * 1e6; // 30 seconds 121 public $num_results = 0; 122 public $results; 123 124 public function __construct($server_address, $qps, $fail_on_failed_rpcs, 125 $rpcs_to_send, $metadata_to_send, 126 $autoload_path) { 127 $this->server_address_ = $server_address; 128 $this->target_seconds_between_rpcs_ = 1.0 / $qps; 129 $this->fail_on_failed_rpcs_ = $fail_on_failed_rpcs; 130 $this->rpcs_to_send = explode(',', $rpcs_to_send); 131 // Convert input in the form of 132 // rpc1:k1:v1,rpc2:k2:v2,rpc1:k3:v3 133 // into 134 // [ 135 // 'rpc1' => [ 136 // 'k1' => 'v1', 137 // 'k3' => 'v3', 138 // ], 139 // 'rpc2' => [ 140 // 'k2' => 'v2' 141 // ], 142 // ] 143 $this->metadata_to_send = []; 144 if ($_all_metadata = explode(',', $metadata_to_send)) { 145 foreach ($_all_metadata as $one_metadata_pair) { 146 list($rpc, 147 $metadata_key, 148 $metadata_value) = explode(':', $one_metadata_pair); 149 // initialize in case we haven't seen this rpc before 150 if (!array_key_exists($rpc, $this->metadata_to_send)) { 151 $this->metadata_to_send[$rpc] = []; 152 } 153 $this->metadata_to_send[$rpc][$metadata_key] 154 = $metadata_value; 155 } 156 } 157 $this->autoload_path_ = $autoload_path; 158 $this->simple_request = new Grpc\Testing\SimpleRequest(); 159 $this->empty_request = new Grpc\Testing\EmptyMessage(); 160 $this->results = []; 161 foreach ($this->rpcs_to_send as $rpc) { 162 $this->results[$rpc] = []; 163 } 164 } 165 166 public function sendUnaryCall($stub, $metadata) { 167 return $stub->UnaryCall($this->simple_request, 168 $metadata, 169 ['timeout' => $this->TIMEOUT_US]); 170 } 171 172 public function sendEmptyCall($stub, $metadata) { 173 return $stub->EmptyCall($this->empty_request, 174 $metadata, 175 ['timeout' => $this->TIMEOUT_US]); 176 } 177 178 public function run() { 179 // Autoloaded classes do not get inherited in threads. 180 // Hence we need to do this. 181 require_once($this->autoload_path_); 182 183 $stub = new Grpc\Testing\TestServiceClient($this->server_address_, [ 184 'credentials' => Grpc\ChannelCredentials::createInsecure() 185 ]); 186 # hrtime returns nanoseconds 187 $target_next_start_us = hrtime(true) / 1000; 188 while (true) { 189 $now_us = hrtime(true) / 1000; 190 $sleep_us = $target_next_start_us - $now_us; 191 if ($sleep_us < 0) { 192 $target_next_start_us = 193 $now_us + ($this->target_seconds_between_rpcs_ * 1e6); 194 echo sprintf( 195 "php xds: warning, rpc takes too long to finish. " 196 . "Deficit %.1fms." 197 . "If you consistently see this, the qps is too high.\n", 198 round(abs($sleep_us / 1000), 1)); 199 } else { 200 $target_next_start_us += 201 ($this->target_seconds_between_rpcs_ * 1e6); 202 usleep($sleep_us); 203 } 204 foreach ($this->rpcs_to_send as $rpc) { 205 $metadata = array_key_exists( 206 $rpc, $this->metadata_to_send) ? 207 $this->metadata_to_send[$rpc] : []; 208 // This copy is somehow necessary because 209 // $this->metadata_to_send[$rpc] somehow becomes a 210 // Volatile object, instead of an associative array. 211 $metadata_array = []; 212 foreach ($metadata as $key => $value) { 213 $metadata_array[$key] = [$value]; 214 } 215 $call = null; 216 if ($rpc == 'UnaryCall') { 217 $call = $this->sendUnaryCall($stub, $metadata_array); 218 } else if ($rpc == 'EmptyCall') { 219 $call = $this->sendEmptyCall($stub, $metadata_array); 220 } else { 221 throw new Exception("Unhandled rpc $rpc"); 222 } 223 // the remote peer is being returned as part of the 224 // initial metadata, according to the test spec 225 $initial_metadata = $call->getMetadata(); 226 list($response, $status) = $call->wait(); 227 if ($status->code == Grpc\STATUS_OK && 228 array_key_exists('hostname', $initial_metadata)) { 229 $this->results[$rpc][] = $initial_metadata['hostname'][0]; 230 } else { 231 if ($this->fail_on_failed_rpcs_) { 232 throw new Exception("$rpc failed with status " 233 . $status->code); 234 } 235 $this->results[$rpc][] = ""; 236 } 237 } 238 // $num_results here is only incremented when the group of 239 // all $rpcs_to_send are done. 240 $this->num_results++; 241 } 242 } 243 244 // This is needed for loading autoload_path in the child thread 245 public function start(int $options = PTHREADS_INHERIT_ALL) { 246 return parent::start(PTHREADS_INHERIT_NONE); 247 } 248} 249 250 251// Note: num_channels are currently ignored for now 252$args = getopt('', ['fail_on_failed_rpcs:', 'num_channels:', 253 'rpc:', 'metadata:', 254 'server:', 'stats_port:', 'qps:']); 255 256$client_thread = new ClientThread($args['server'], $args['qps'], 257 $args['fail_on_failed_rpcs'], 258 (empty($args['rpc']) ? 'UnaryCall' 259 : $args['rpc']), 260 $args['metadata'], 261 $autoload_path); 262$client_thread->start(); 263 264$server = new Grpc\RpcServer(); 265$server->addHttp2Port('0.0.0.0:'.$args['stats_port']); 266$server->handle(new LoadBalancerStatsService()); 267$server->run(); 268