1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14require 'spec_helper' 15 16def load_test_certs 17 test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') 18 files = ['ca.pem', 'server1.key', 'server1.pem'] 19 files.map { |f| File.open(File.join(test_root, f)).read } 20end 21 22def check_md(wanted_md, received_md) 23 wanted_md.zip(received_md).each do |w, r| 24 w.each do |key, value| 25 expect(r[key]).to eq(value) 26 end 27 end 28end 29 30# A test service with no methods. 31class EmptyService 32 include GRPC::GenericService 33end 34 35# A test service without an implementation. 36class NoRpcImplementation 37 include GRPC::GenericService 38 rpc :an_rpc, EchoMsg, EchoMsg 39end 40 41# A test service with an implementation that fails with BadStatus 42class FailingService 43 include GRPC::GenericService 44 rpc :an_rpc, EchoMsg, EchoMsg 45 attr_reader :details, :code, :md 46 47 def initialize(_default_var = 'ignored') 48 @details = 'app error' 49 @code = 101 50 @md = { 'failed_method' => 'an_rpc' } 51 end 52 53 def an_rpc(_req, _call) 54 fail GRPC::BadStatus.new(@code, @details, @md) 55 end 56end 57 58FailingStub = FailingService.rpc_stub_class 59 60# A slow test service. 61class SlowService 62 include GRPC::GenericService 63 rpc :an_rpc, EchoMsg, EchoMsg 64 attr_reader :received_md, :delay 65 66 def initialize(_default_var = 'ignored') 67 @delay = 0.25 68 @received_md = [] 69 end 70 71 def an_rpc(req, call) 72 GRPC.logger.info("starting a slow #{@delay} rpc") 73 sleep @delay 74 @received_md << call.metadata unless call.metadata.nil? 75 req # send back the req as the response 76 end 77end 78 79SlowStub = SlowService.rpc_stub_class 80 81# A test service that allows a synchronized RPC cancellation 82class SynchronizedCancellationService 83 include GRPC::GenericService 84 rpc :an_rpc, EchoMsg, EchoMsg 85 attr_reader :received_md, :delay 86 87 # notify_request_received and wait_until_rpc_cancelled are 88 # callbacks to synchronously allow the client to proceed with 89 # cancellation (after the unary request has been received), 90 # and to synchronously wait until the client has cancelled the 91 # current RPC. 92 def initialize(notify_request_received, wait_until_rpc_cancelled) 93 @notify_request_received = notify_request_received 94 @wait_until_rpc_cancelled = wait_until_rpc_cancelled 95 end 96 97 def an_rpc(req, _call) 98 GRPC.logger.info('starting a synchronusly cancelled rpc') 99 @notify_request_received.call(req) 100 @wait_until_rpc_cancelled.call 101 req # send back the req as the response 102 end 103end 104 105SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class 106 107# a test service that hangs onto call objects 108# and uses them after the server-side call has been 109# finished 110class CheckCallAfterFinishedService 111 include GRPC::GenericService 112 rpc :an_rpc, EchoMsg, EchoMsg 113 rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg 114 rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg) 115 rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg) 116 attr_reader :server_side_call 117 118 def an_rpc(req, call) 119 fail 'shouldnt reuse service' unless @server_side_call.nil? 120 @server_side_call = call 121 req 122 end 123 124 def a_client_streaming_rpc(call) 125 fail 'shouldnt reuse service' unless @server_side_call.nil? 126 @server_side_call = call 127 # iterate through requests so call can complete 128 call.each_remote_read.each { |r| GRPC.logger.info(r) } 129 EchoMsg.new 130 end 131 132 def a_server_streaming_rpc(_, call) 133 fail 'shouldnt reuse service' unless @server_side_call.nil? 134 @server_side_call = call 135 [EchoMsg.new, EchoMsg.new] 136 end 137 138 def a_bidi_rpc(requests, call) 139 fail 'shouldnt reuse service' unless @server_side_call.nil? 140 @server_side_call = call 141 requests.each { |r| GRPC.logger.info(r) } 142 [EchoMsg.new, EchoMsg.new] 143 end 144end 145 146CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class 147 148# A service with a bidi streaming method. 149class BidiService 150 include GRPC::GenericService 151 rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg) 152 153 def server_sends_bad_input(_, _) 154 'bad response. (not an enumerable, client sees an error)' 155 end 156end 157 158BidiStub = BidiService.rpc_stub_class 159 160describe GRPC::RpcServer do 161 RpcServer = GRPC::RpcServer 162 StatusCodes = GRPC::Core::StatusCodes 163 164 before(:each) do 165 @method = 'an_rpc_method' 166 @pass = 0 167 @fail = 1 168 @noop = proc { |x| x } 169 end 170 171 describe '#new' do 172 it 'can be created with just some args' do 173 opts = { server_args: { a_channel_arg: 'an_arg' } } 174 blk = proc do 175 new_rpc_server_for_testing(**opts) 176 end 177 expect(&blk).not_to raise_error 178 end 179 180 it 'cannot be created with invalid ServerCredentials' do 181 blk = proc do 182 opts = { 183 server_args: { a_channel_arg: 'an_arg' }, 184 creds: Object.new 185 } 186 new_rpc_server_for_testing(**opts) 187 end 188 expect(&blk).to raise_error 189 end 190 end 191 192 describe '#stopped?' do 193 before(:each) do 194 opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 } 195 @srv = new_rpc_server_for_testing(**opts) 196 @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 197 end 198 199 it 'starts out false' do 200 expect(@srv.stopped?).to be(false) 201 end 202 203 it 'stays false after the server starts running', server: true do 204 @srv.handle(EchoService) 205 t = Thread.new { @srv.run } 206 @srv.wait_till_running 207 expect(@srv.stopped?).to be(false) 208 @srv.stop 209 t.join 210 end 211 212 it 'is true after a running server is stopped', server: true do 213 @srv.handle(EchoService) 214 t = Thread.new { @srv.run } 215 @srv.wait_till_running 216 @srv.stop 217 t.join 218 expect(@srv.stopped?).to be(true) 219 end 220 end 221 222 describe '#running?' do 223 it 'starts out false' do 224 opts = { 225 server_args: { a_channel_arg: 'an_arg' } 226 } 227 r = new_rpc_server_for_testing(**opts) 228 expect(r.running?).to be(false) 229 end 230 231 it 'is false if run is called with no services registered', server: true do 232 opts = { 233 server_args: { a_channel_arg: 'an_arg' }, 234 poll_period: 2 235 } 236 r = new_rpc_server_for_testing(**opts) 237 r.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 238 expect { r.run }.to raise_error(RuntimeError) 239 end 240 241 it 'is true after run is called with a registered service' do 242 opts = { 243 server_args: { a_channel_arg: 'an_arg' }, 244 poll_period: 2.5 245 } 246 r = new_rpc_server_for_testing(**opts) 247 r.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 248 r.handle(EchoService) 249 t = Thread.new { r.run } 250 r.wait_till_running 251 expect(r.running?).to be(true) 252 r.stop 253 t.join 254 end 255 end 256 257 describe '#handle' do 258 before(:each) do 259 @opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 } 260 @srv = new_rpc_server_for_testing(**@opts) 261 @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 262 end 263 264 it 'raises if #run has already been called' do 265 @srv.handle(EchoService) 266 t = Thread.new { @srv.run } 267 @srv.wait_till_running 268 expect { @srv.handle(EchoService) }.to raise_error 269 @srv.stop 270 t.join 271 end 272 273 it 'raises if the server has been run and stopped' do 274 @srv.handle(EchoService) 275 t = Thread.new { @srv.run } 276 @srv.wait_till_running 277 @srv.stop 278 t.join 279 expect { @srv.handle(EchoService) }.to raise_error 280 end 281 282 it 'raises if the service does not include GenericService ' do 283 expect { @srv.handle(Object) }.to raise_error 284 end 285 286 it 'raises if the service does not declare any rpc methods' do 287 expect { @srv.handle(EmptyService) }.to raise_error 288 end 289 290 it 'raises if a handler method is already registered' do 291 @srv.handle(EchoService) 292 expect { r.handle(EchoService) }.to raise_error 293 end 294 end 295 296 describe '#run' do 297 let(:client_opts) { { channel_override: @ch } } 298 let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc } 299 let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) } 300 301 context 'with no connect_metadata' do 302 before(:each) do 303 server_opts = { 304 poll_period: 1 305 } 306 @srv = new_rpc_server_for_testing(**server_opts) 307 server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 308 @host = "localhost:#{server_port}" 309 @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure) 310 end 311 312 it 'should return NOT_FOUND status on unknown methods', server: true do 313 @srv.handle(EchoService) 314 t = Thread.new { @srv.run } 315 @srv.wait_till_running 316 req = EchoMsg.new 317 blk = proc do 318 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure, 319 **client_opts) 320 stub.request_response('/unknown', req, marshal, unmarshal) 321 end 322 expect(&blk).to raise_error GRPC::BadStatus 323 @srv.stop 324 t.join 325 end 326 327 it 'should return UNIMPLEMENTED on unimplemented methods', server: true do 328 @srv.handle(NoRpcImplementation) 329 t = Thread.new { @srv.run } 330 @srv.wait_till_running 331 req = EchoMsg.new 332 blk = proc do 333 stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure, 334 **client_opts) 335 stub.request_response('/an_rpc', req, marshal, unmarshal) 336 end 337 expect(&blk).to raise_error do |error| 338 expect(error).to be_a(GRPC::BadStatus) 339 expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED) 340 end 341 @srv.stop 342 t.join 343 end 344 345 it 'should handle multiple sequential requests', server: true do 346 @srv.handle(EchoService) 347 t = Thread.new { @srv.run } 348 @srv.wait_till_running 349 req = EchoMsg.new 350 n = 5 # arbitrary 351 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) 352 n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) } 353 @srv.stop 354 t.join 355 end 356 357 it 'should receive metadata sent as rpc keyword args', server: true do 358 service = EchoService.new 359 @srv.handle(service) 360 t = Thread.new { @srv.run } 361 @srv.wait_till_running 362 req = EchoMsg.new 363 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) 364 expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' })) 365 .to be_a(EchoMsg) 366 wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] 367 check_md(wanted_md, service.received_md) 368 @srv.stop 369 t.join 370 end 371 372 it 'should receive metadata if a deadline is specified', server: true do 373 service = SlowService.new 374 @srv.handle(service) 375 t = Thread.new { @srv.run } 376 @srv.wait_till_running 377 req = EchoMsg.new 378 stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts) 379 timeout = service.delay + 1.0 380 deadline = GRPC::Core::TimeConsts.from_relative_time(timeout) 381 resp = stub.an_rpc(req, 382 deadline: deadline, 383 metadata: { k1: 'v1', k2: 'v2' }) 384 expect(resp).to be_a(EchoMsg) 385 wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }] 386 check_md(wanted_md, service.received_md) 387 @srv.stop 388 t.join 389 end 390 391 it 'should handle cancellation correctly', server: true do 392 request_received = false 393 request_received_mu = Mutex.new 394 request_received_cv = ConditionVariable.new 395 notify_request_received = proc do |req| 396 request_received_mu.synchronize do 397 fail 'req is nil' if req.nil? 398 expect(req.is_a?(EchoMsg)).to be true 399 fail 'test bug - already set' if request_received 400 request_received = true 401 request_received_cv.signal 402 end 403 end 404 405 rpc_cancelled = false 406 rpc_cancelled_mu = Mutex.new 407 rpc_cancelled_cv = ConditionVariable.new 408 wait_until_rpc_cancelled = proc do 409 rpc_cancelled_mu.synchronize do 410 loop do 411 break if rpc_cancelled 412 rpc_cancelled_cv.wait(rpc_cancelled_mu) 413 end 414 end 415 end 416 417 service = SynchronizedCancellationService.new(notify_request_received, 418 wait_until_rpc_cancelled) 419 @srv.handle(service) 420 srv_thd = Thread.new { @srv.run } 421 @srv.wait_till_running 422 req = EchoMsg.new 423 stub = SynchronizedCancellationStub.new(@host, 424 :this_channel_is_insecure, 425 **client_opts) 426 op = stub.an_rpc(req, return_op: true) 427 428 client_thd = Thread.new do 429 expect { op.execute }.to raise_error GRPC::Cancelled 430 end 431 432 request_received_mu.synchronize do 433 loop do 434 break if request_received 435 request_received_cv.wait(request_received_mu) 436 end 437 end 438 439 op.cancel 440 441 rpc_cancelled_mu.synchronize do 442 fail 'test bug - already set' if rpc_cancelled 443 rpc_cancelled = true 444 rpc_cancelled_cv.signal 445 end 446 447 client_thd.join 448 @srv.stop 449 srv_thd.join 450 end 451 452 it 'should handle multiple parallel requests', server: true do 453 @srv.handle(EchoService) 454 t = Thread.new { @srv.run } 455 @srv.wait_till_running 456 req, q = EchoMsg.new, Queue.new 457 n = 5 # arbitrary 458 threads = [t] 459 n.times do 460 threads << Thread.new do 461 stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts) 462 q << stub.an_rpc(req) 463 end 464 end 465 n.times { expect(q.pop).to be_a(EchoMsg) } 466 @srv.stop 467 threads.each(&:join) 468 end 469 470 it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do 471 opts = { 472 server_args: { a_channel_arg: 'an_arg' }, 473 pool_size: 2, 474 poll_period: 1, 475 max_waiting_requests: 1 476 } 477 alt_srv = new_rpc_server_for_testing(**opts) 478 alt_srv.handle(SlowService) 479 alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 480 alt_host = "0.0.0.0:#{alt_port}" 481 t = Thread.new { alt_srv.run } 482 alt_srv.wait_till_running 483 req = EchoMsg.new 484 n = 20 # arbitrary, use as many to ensure the server pool is exceeded 485 threads = [] 486 one_failed_as_unavailable = false 487 n.times do 488 threads << Thread.new do 489 stub = SlowStub.new(alt_host, :this_channel_is_insecure) 490 begin 491 stub.an_rpc(req) 492 rescue GRPC::ResourceExhausted 493 one_failed_as_unavailable = true 494 end 495 end 496 end 497 threads.each(&:join) 498 alt_srv.stop 499 t.join 500 expect(one_failed_as_unavailable).to be(true) 501 end 502 503 it 'should send a status UNKNOWN with a relevant message when the' \ 504 'servers response stream is not an enumerable' do 505 @srv.handle(BidiService) 506 t = Thread.new { @srv.run } 507 @srv.wait_till_running 508 stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts) 509 responses = stub.server_sends_bad_input([]) 510 exception = nil 511 begin 512 responses.each { |r| r } 513 rescue GRPC::Unknown => e 514 exception = e 515 end 516 # Erroneous responses sent from the server handler should cause an 517 # exception on the client with relevant info. 518 expected_details = 'NoMethodError: undefined method `each\' for '\ 519 '"bad response. (not an enumerable, client sees an error)"' 520 521 expect(exception.inspect.include?(expected_details)).to be true 522 @srv.stop 523 t.join 524 end 525 end 526 527 context 'with connect metadata' do 528 let(:test_md_proc) do 529 proc do |mth, md| 530 res = md.clone 531 res['method'] = mth 532 res['connect_k1'] = 'connect_v1' 533 res 534 end 535 end 536 before(:each) do 537 server_opts = { 538 poll_period: 1, 539 connect_md_proc: test_md_proc 540 } 541 @srv = new_rpc_server_for_testing(**server_opts) 542 alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 543 @alt_host = "0.0.0.0:#{alt_port}" 544 end 545 546 it 'should send connect metadata to the client', server: true do 547 service = EchoService.new 548 @srv.handle(service) 549 t = Thread.new { @srv.run } 550 @srv.wait_till_running 551 req = EchoMsg.new 552 stub = EchoStub.new(@alt_host, :this_channel_is_insecure) 553 op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true) 554 expect(op.metadata).to be nil 555 expect(op.execute).to be_a(EchoMsg) 556 wanted_md = { 557 'k1' => 'v1', 558 'k2' => 'v2', 559 'method' => '/EchoService/an_rpc', 560 'connect_k1' => 'connect_v1' 561 } 562 wanted_md.each do |key, value| 563 GRPC.logger.info("key: #{key}") 564 expect(op.metadata[key]).to eq(value) 565 end 566 @srv.stop 567 t.join 568 end 569 end 570 571 context 'with trailing metadata' do 572 before(:each) do 573 server_opts = { 574 poll_period: 1 575 } 576 @srv = new_rpc_server_for_testing(**server_opts) 577 alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 578 @alt_host = "0.0.0.0:#{alt_port}" 579 end 580 581 it 'should be added to BadStatus when requests fail', server: true do 582 service = FailingService.new 583 @srv.handle(service) 584 t = Thread.new { @srv.run } 585 @srv.wait_till_running 586 req = EchoMsg.new 587 stub = FailingStub.new(@alt_host, :this_channel_is_insecure) 588 blk = proc { stub.an_rpc(req) } 589 590 # confirm it raise the expected error 591 expect(&blk).to raise_error GRPC::BadStatus 592 593 # call again and confirm exception contained the trailing metadata. 594 begin 595 blk.call 596 rescue GRPC::BadStatus => e 597 expect(e.code).to eq(service.code) 598 expect(e.details).to eq(service.details) 599 expect(e.metadata).to eq(service.md) 600 end 601 @srv.stop 602 t.join 603 end 604 605 it 'should be received by the client', server: true do 606 wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' } 607 service = EchoService.new(k1: 'out_v1', k2: 'out_v2') 608 @srv.handle(service) 609 t = Thread.new { @srv.run } 610 @srv.wait_till_running 611 req = EchoMsg.new 612 stub = EchoStub.new(@alt_host, :this_channel_is_insecure) 613 op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' }) 614 expect(op.metadata).to be nil 615 expect(op.execute).to be_a(EchoMsg) 616 expect(op.trailing_metadata).to eq(wanted_trailers) 617 @srv.stop 618 t.join 619 end 620 end 621 622 context 'when call objects are used after calls have completed' do 623 before(:each) do 624 server_opts = { 625 poll_period: 1 626 } 627 @srv = new_rpc_server_for_testing(**server_opts) 628 alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure) 629 @alt_host = "0.0.0.0:#{alt_port}" 630 631 @service = CheckCallAfterFinishedService.new 632 @srv.handle(@service) 633 @srv_thd = Thread.new { @srv.run } 634 @srv.wait_till_running 635 end 636 637 # check that the server-side call is still in a usable state even 638 # after it has finished 639 def check_single_req_view_of_finished_call(call) 640 common_check_of_finished_server_call(call) 641 642 expect(call.peer).to be_a(String) 643 expect(call.peer_cert).to be(nil) 644 end 645 646 def check_multi_req_view_of_finished_call(call) 647 common_check_of_finished_server_call(call) 648 649 expect do 650 call.each_remote_read.each { |r| p r } 651 end.to raise_error(GRPC::Core::CallError) 652 end 653 654 def common_check_of_finished_server_call(call) 655 expect do 656 call.merge_metadata_to_send({}) 657 end.to raise_error(RuntimeError) 658 659 expect do 660 call.send_initial_metadata 661 end.to_not raise_error 662 663 expect(call.cancelled?).to be(false) 664 expect(call.metadata).to be_a(Hash) 665 expect(call.metadata['user-agent']).to be_a(String) 666 667 expect(call.metadata_sent).to be(true) 668 expect(call.output_metadata).to eq({}) 669 expect(call.metadata_to_send).to eq({}) 670 expect(call.deadline.is_a?(Time)).to be(true) 671 end 672 673 it 'should not crash when call used after an unary call is finished' do 674 req = EchoMsg.new 675 stub = CheckCallAfterFinishedServiceStub.new(@alt_host, 676 :this_channel_is_insecure) 677 resp = stub.an_rpc(req) 678 expect(resp).to be_a(EchoMsg) 679 @srv.stop 680 @srv_thd.join 681 682 check_single_req_view_of_finished_call(@service.server_side_call) 683 end 684 685 it 'should not crash when call used after client streaming finished' do 686 requests = [EchoMsg.new, EchoMsg.new] 687 stub = CheckCallAfterFinishedServiceStub.new(@alt_host, 688 :this_channel_is_insecure) 689 resp = stub.a_client_streaming_rpc(requests) 690 expect(resp).to be_a(EchoMsg) 691 @srv.stop 692 @srv_thd.join 693 694 check_multi_req_view_of_finished_call(@service.server_side_call) 695 end 696 697 it 'should not crash when call used after server streaming finished' do 698 req = EchoMsg.new 699 stub = CheckCallAfterFinishedServiceStub.new(@alt_host, 700 :this_channel_is_insecure) 701 responses = stub.a_server_streaming_rpc(req) 702 responses.each do |r| 703 expect(r).to be_a(EchoMsg) 704 end 705 @srv.stop 706 @srv_thd.join 707 708 check_single_req_view_of_finished_call(@service.server_side_call) 709 end 710 711 it 'should not crash when call used after a bidi call is finished' do 712 requests = [EchoMsg.new, EchoMsg.new] 713 stub = CheckCallAfterFinishedServiceStub.new(@alt_host, 714 :this_channel_is_insecure) 715 responses = stub.a_bidi_rpc(requests) 716 responses.each do |r| 717 expect(r).to be_a(EchoMsg) 718 end 719 @srv.stop 720 @srv_thd.join 721 722 check_multi_req_view_of_finished_call(@service.server_side_call) 723 end 724 end 725 end 726end 727