1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'spec_helper' 16 17include GRPC::Core::StatusCodes 18 19describe GRPC::ActiveCall do 20 ActiveCall = GRPC::ActiveCall 21 Call = GRPC::Core::Call 22 CallOps = GRPC::Core::CallOps 23 WriteFlags = GRPC::Core::WriteFlags 24 25 def ok_status 26 Struct::Status.new(OK, 'OK') 27 end 28 29 def send_and_receive_close_and_status(client_call, server_call) 30 client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 31 server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil, 32 CallOps::SEND_STATUS_FROM_SERVER => ok_status) 33 client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil) 34 end 35 36 def inner_call_of_active_call(active_call) 37 active_call.instance_variable_get(:@call) 38 end 39 40 before(:each) do 41 @pass_through = proc { |x| x } 42 host = '0.0.0.0:0' 43 @server = new_core_server_for_testing(nil) 44 server_port = @server.add_http2_port(host, :this_port_is_insecure) 45 @server.start 46 @received_rpcs_queue = Queue.new 47 @server_thread = Thread.new do 48 begin 49 received_rpc = @server.request_call 50 rescue GRPC::Core::CallError, StandardError => e 51 # enqueue the exception in this case as a way to indicate the error 52 received_rpc = e 53 end 54 @received_rpcs_queue.push(received_rpc) 55 end 56 @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, 57 :this_channel_is_insecure) 58 end 59 60 after(:each) do 61 @server.shutdown_and_notify(deadline) 62 @server.close 63 @server_thread.join 64 end 65 66 describe 'restricted view methods' do 67 before(:each) do 68 call = make_test_call 69 ActiveCall.client_invoke(call) 70 @client_call = ActiveCall.new(call, @pass_through, 71 @pass_through, deadline) 72 end 73 74 describe '#multi_req_view' do 75 it 'exposes a fixed subset of the ActiveCall.methods' do 76 want = %w(cancelled?, deadline, each_remote_read, metadata, \ 77 shutdown, peer, peer_cert, send_initial_metadata, \ 78 initial_metadata_sent) 79 v = @client_call.multi_req_view 80 want.each do |w| 81 expect(v.methods.include?(w)) 82 end 83 end 84 end 85 86 describe '#single_req_view' do 87 it 'exposes a fixed subset of the ActiveCall.methods' do 88 want = %w(cancelled?, deadline, metadata, shutdown, \ 89 send_initial_metadata, metadata_to_send, \ 90 merge_metadata_to_send, initial_metadata_sent) 91 v = @client_call.single_req_view 92 want.each do |w| 93 expect(v.methods.include?(w)) 94 end 95 end 96 end 97 98 describe '#interceptable' do 99 it 'exposes a fixed subset of the ActiveCall.methods' do 100 want = %w(deadline) 101 v = @client_call.interceptable 102 want.each do |w| 103 expect(v.methods.include?(w)) 104 end 105 end 106 end 107 end 108 109 describe '#remote_send' do 110 it 'allows a client to send a payload to the server', test: true do 111 call = make_test_call 112 ActiveCall.client_invoke(call) 113 client_call = ActiveCall.new(call, @pass_through, 114 @pass_through, deadline) 115 msg = 'message is a string' 116 client_call.remote_send(msg) 117 118 # check that server rpc new was received 119 recvd_rpc = @received_rpcs_queue.pop 120 expect(recvd_rpc).to_not eq nil 121 recvd_call = recvd_rpc.call 122 123 # Accept the call, and verify that the server reads the response ok. 124 server_call = ActiveCall.new(recvd_call, @pass_through, 125 @pass_through, deadline, 126 metadata_received: true, 127 started: false) 128 expect(server_call.remote_read).to eq(msg) 129 # finish the call 130 server_call.send_initial_metadata 131 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 132 send_and_receive_close_and_status(call, recvd_call) 133 end 134 135 it 'marshals the payload using the marshal func' do 136 call = make_test_call 137 ActiveCall.client_invoke(call) 138 marshal = proc { |x| 'marshalled:' + x } 139 client_call = ActiveCall.new(call, marshal, @pass_through, deadline) 140 msg = 'message is a string' 141 client_call.remote_send(msg) 142 143 # confirm that the message was marshalled 144 recvd_rpc = @received_rpcs_queue.pop 145 recvd_call = recvd_rpc.call 146 server_ops = { 147 CallOps::SEND_INITIAL_METADATA => nil 148 } 149 recvd_call.run_batch(server_ops) 150 server_call = ActiveCall.new(recvd_call, @pass_through, 151 @pass_through, deadline, 152 metadata_received: true) 153 expect(server_call.remote_read).to eq('marshalled:' + msg) 154 # finish the call 155 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 156 send_and_receive_close_and_status(call, recvd_call) 157 end 158 159 TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] 160 TEST_WRITE_FLAGS.each do |f| 161 it "successfully makes calls with write_flag set to #{f}" do 162 call = make_test_call 163 ActiveCall.client_invoke(call) 164 marshal = proc { |x| 'marshalled:' + x } 165 client_call = ActiveCall.new(call, marshal, 166 @pass_through, deadline) 167 msg = 'message is a string' 168 client_call.write_flag = f 169 client_call.remote_send(msg) 170 # flush the message in case writes are set to buffered 171 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 172 173 # confirm that the message was marshalled 174 recvd_rpc = @received_rpcs_queue.pop 175 recvd_call = recvd_rpc.call 176 server_ops = { 177 CallOps::SEND_INITIAL_METADATA => nil 178 } 179 recvd_call.run_batch(server_ops) 180 server_call = ActiveCall.new(recvd_call, @pass_through, 181 @pass_through, deadline, 182 metadata_received: true) 183 expect(server_call.remote_read).to eq('marshalled:' + msg) 184 # finish the call 185 server_call.send_status(OK, '', true) 186 client_call.receive_and_check_status 187 end 188 end 189 end 190 191 describe 'sending initial metadata', send_initial_metadata: true do 192 it 'sends metadata before sending a message if it hasnt been sent yet' do 193 call = make_test_call 194 @client_call = ActiveCall.new( 195 call, 196 @pass_through, 197 @pass_through, 198 deadline, 199 started: false) 200 201 metadata = { key: 'dummy_val', other: 'other_val' } 202 expect(@client_call.metadata_sent).to eq(false) 203 @client_call.merge_metadata_to_send(metadata) 204 205 message = 'dummy message' 206 207 expect(call).to( 208 receive(:run_batch) 209 .with( 210 hash_including( 211 CallOps::SEND_INITIAL_METADATA => metadata)).once) 212 213 expect(call).to( 214 receive(:run_batch).with(hash_including( 215 CallOps::SEND_MESSAGE => message)).once) 216 @client_call.remote_send(message) 217 218 expect(@client_call.metadata_sent).to eq(true) 219 end 220 221 it 'doesnt send metadata if it thinks its already been sent' do 222 call = make_test_call 223 224 @client_call = ActiveCall.new(call, 225 @pass_through, 226 @pass_through, 227 deadline) 228 expect(@client_call.metadata_sent).to eql(true) 229 expect(call).to( 230 receive(:run_batch).with(hash_including( 231 CallOps::SEND_INITIAL_METADATA)).never) 232 233 @client_call.remote_send('test message') 234 end 235 236 it 'sends metadata if it is explicitly sent and ok to do so' do 237 call = make_test_call 238 239 @client_call = ActiveCall.new(call, 240 @pass_through, 241 @pass_through, 242 deadline, 243 started: false) 244 245 expect(@client_call.metadata_sent).to eql(false) 246 247 metadata = { test_key: 'val' } 248 @client_call.merge_metadata_to_send(metadata) 249 expect(@client_call.metadata_to_send).to eq(metadata) 250 251 expect(call).to( 252 receive(:run_batch).with(hash_including( 253 CallOps::SEND_INITIAL_METADATA => 254 metadata)).once) 255 @client_call.send_initial_metadata 256 end 257 258 it 'explicit sending does nothing if metadata has already been sent' do 259 call = make_test_call 260 261 @client_call = ActiveCall.new(call, 262 @pass_through, 263 @pass_through, 264 deadline) 265 266 expect(@client_call.metadata_sent).to eql(true) 267 268 blk = proc do 269 @client_call.send_initial_metadata 270 end 271 272 expect { blk.call }.to_not raise_error 273 end 274 end 275 276 describe '#merge_metadata_to_send', merge_metadata_to_send: true do 277 it 'adds to existing metadata when there is existing metadata to send' do 278 call = make_test_call 279 starting_metadata = { 280 k1: 'key1_val', 281 k2: 'key2_val', 282 k3: 'key3_val' 283 } 284 285 @client_call = ActiveCall.new( 286 call, 287 @pass_through, @pass_through, 288 deadline, 289 started: false, 290 metadata_to_send: starting_metadata) 291 292 expect(@client_call.metadata_to_send).to eq(starting_metadata) 293 294 @client_call.merge_metadata_to_send( 295 k3: 'key3_new_val', 296 k4: 'key4_val') 297 298 expected_md_to_send = { 299 k1: 'key1_val', 300 k2: 'key2_val', 301 k3: 'key3_new_val', 302 k4: 'key4_val' } 303 304 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 305 306 @client_call.merge_metadata_to_send(k5: 'key5_val') 307 expected_md_to_send.merge!(k5: 'key5_val') 308 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 309 end 310 311 it 'fails when initial metadata has already been sent' do 312 call = make_test_call 313 @client_call = ActiveCall.new( 314 call, 315 @pass_through, 316 @pass_through, 317 deadline, 318 started: true) 319 320 expect(@client_call.metadata_sent).to eq(true) 321 322 blk = proc do 323 @client_call.merge_metadata_to_send(k1: 'key1_val') 324 end 325 326 expect { blk.call }.to raise_error 327 end 328 end 329 330 describe '#client_invoke' do 331 it 'sends metadata to the server when present' do 332 call = make_test_call 333 metadata = { k1: 'v1', k2: 'v2' } 334 ActiveCall.client_invoke(call, metadata) 335 recvd_rpc = @received_rpcs_queue.pop 336 recvd_call = recvd_rpc.call 337 expect(recvd_call).to_not be_nil 338 expect(recvd_rpc.metadata).to_not be_nil 339 expect(recvd_rpc.metadata['k1']).to eq('v1') 340 expect(recvd_rpc.metadata['k2']).to eq('v2') 341 # finish the call 342 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 343 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 344 send_and_receive_close_and_status(call, recvd_call) 345 end 346 end 347 348 describe '#send_status', send_status: true do 349 it 'works when no metadata or messages have been sent yet' do 350 call = make_test_call 351 ActiveCall.client_invoke(call) 352 353 recvd_rpc = @received_rpcs_queue.pop 354 server_call = ActiveCall.new( 355 recvd_rpc.call, 356 @pass_through, 357 @pass_through, 358 deadline, 359 started: false) 360 361 expect(server_call.metadata_sent).to eq(false) 362 blk = proc { server_call.send_status(OK) } 363 expect { blk.call }.to_not raise_error 364 end 365 end 366 367 describe '#remote_read', remote_read: true do 368 it 'reads the response sent by a server' do 369 call = make_test_call 370 ActiveCall.client_invoke(call) 371 client_call = ActiveCall.new(call, @pass_through, 372 @pass_through, deadline) 373 msg = 'message is a string' 374 client_call.remote_send(msg) 375 server_call = expect_server_to_receive(msg) 376 server_call.remote_send('server_response') 377 expect(client_call.remote_read).to eq('server_response') 378 send_and_receive_close_and_status( 379 call, inner_call_of_active_call(server_call)) 380 end 381 382 it 'saves no metadata when the server adds no metadata' do 383 call = make_test_call 384 ActiveCall.client_invoke(call) 385 client_call = ActiveCall.new(call, @pass_through, 386 @pass_through, deadline) 387 msg = 'message is a string' 388 client_call.remote_send(msg) 389 server_call = expect_server_to_receive(msg) 390 server_call.remote_send('ignore me') 391 expect(client_call.metadata).to be_nil 392 client_call.remote_read 393 expect(client_call.metadata).to eq({}) 394 send_and_receive_close_and_status( 395 call, inner_call_of_active_call(server_call)) 396 end 397 398 it 'saves metadata add by the server' do 399 call = make_test_call 400 ActiveCall.client_invoke(call) 401 client_call = ActiveCall.new(call, @pass_through, 402 @pass_through, deadline) 403 msg = 'message is a string' 404 client_call.remote_send(msg) 405 server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') 406 server_call.remote_send('ignore me') 407 expect(client_call.metadata).to be_nil 408 client_call.remote_read 409 expected = { 'k1' => 'v1', 'k2' => 'v2' } 410 expect(client_call.metadata).to eq(expected) 411 send_and_receive_close_and_status( 412 call, inner_call_of_active_call(server_call)) 413 end 414 415 it 'get a status from server when nothing else sent from server' do 416 client_call = make_test_call 417 ActiveCall.client_invoke(client_call) 418 419 recvd_rpc = @received_rpcs_queue.pop 420 recvd_call = recvd_rpc.call 421 422 server_call = ActiveCall.new( 423 recvd_call, 424 @pass_through, 425 @pass_through, 426 deadline, 427 started: false) 428 429 server_call.send_status(OK, 'OK') 430 431 # Check that we can receive initial metadata and a status 432 client_call.run_batch( 433 CallOps::RECV_INITIAL_METADATA => nil) 434 batch_result = client_call.run_batch( 435 CallOps::RECV_STATUS_ON_CLIENT => nil) 436 437 expect(batch_result.status.code).to eq(OK) 438 end 439 440 it 'get a nil msg before a status when an OK status is sent' do 441 call = make_test_call 442 ActiveCall.client_invoke(call) 443 client_call = ActiveCall.new(call, @pass_through, 444 @pass_through, deadline) 445 msg = 'message is a string' 446 client_call.remote_send(msg) 447 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 448 server_call = expect_server_to_receive(msg) 449 server_call.remote_send('server_response') 450 server_call.send_status(OK, 'OK') 451 expect(client_call.remote_read).to eq('server_response') 452 res = client_call.remote_read 453 expect(res).to be_nil 454 end 455 456 it 'unmarshals the response using the unmarshal func' do 457 call = make_test_call 458 ActiveCall.client_invoke(call) 459 unmarshal = proc { |x| 'unmarshalled:' + x } 460 client_call = ActiveCall.new(call, @pass_through, 461 unmarshal, deadline) 462 463 # confirm the client receives the unmarshalled message 464 msg = 'message is a string' 465 client_call.remote_send(msg) 466 server_call = expect_server_to_receive(msg) 467 server_call.remote_send('server_response') 468 expect(client_call.remote_read).to eq('unmarshalled:server_response') 469 send_and_receive_close_and_status( 470 call, inner_call_of_active_call(server_call)) 471 end 472 end 473 474 describe '#each_remote_read' do 475 it 'creates an Enumerator' do 476 call = make_test_call 477 client_call = ActiveCall.new(call, @pass_through, 478 @pass_through, deadline) 479 expect(client_call.each_remote_read).to be_a(Enumerator) 480 # finish the call 481 client_call.cancel 482 end 483 484 it 'the returned enumerator can read n responses' do 485 call = make_test_call 486 ActiveCall.client_invoke(call) 487 client_call = ActiveCall.new(call, @pass_through, 488 @pass_through, deadline) 489 msg = 'message is a string' 490 reply = 'server_response' 491 client_call.remote_send(msg) 492 server_call = expect_server_to_receive(msg) 493 e = client_call.each_remote_read 494 n = 3 # arbitrary value > 1 495 n.times do 496 server_call.remote_send(reply) 497 expect(e.next).to eq(reply) 498 end 499 send_and_receive_close_and_status( 500 call, inner_call_of_active_call(server_call)) 501 end 502 503 it 'the returns an enumerator that stops after an OK Status' do 504 call = make_test_call 505 ActiveCall.client_invoke(call) 506 client_call = ActiveCall.new(call, @pass_through, 507 @pass_through, deadline) 508 msg = 'message is a string' 509 reply = 'server_response' 510 client_call.remote_send(msg) 511 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 512 server_call = expect_server_to_receive(msg) 513 e = client_call.each_remote_read 514 n = 3 # arbitrary value > 1 515 n.times do 516 server_call.remote_send(reply) 517 expect(e.next).to eq(reply) 518 end 519 server_call.send_status(OK, 'OK', true) 520 expect { e.next }.to raise_error(StopIteration) 521 end 522 end 523 524 describe '#closing the call from the client' do 525 it 'finishes ok if the server sends a status response' do 526 call = make_test_call 527 ActiveCall.client_invoke(call) 528 client_call = ActiveCall.new(call, @pass_through, 529 @pass_through, deadline) 530 msg = 'message is a string' 531 client_call.remote_send(msg) 532 expect do 533 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 534 end.to_not raise_error 535 server_call = expect_server_to_receive(msg) 536 server_call.remote_send('server_response') 537 expect(client_call.remote_read).to eq('server_response') 538 server_call.send_status(OK, 'status code is OK') 539 expect { client_call.receive_and_check_status }.to_not raise_error 540 end 541 542 it 'finishes ok if the server sends an early status response' do 543 call = make_test_call 544 ActiveCall.client_invoke(call) 545 client_call = ActiveCall.new(call, @pass_through, 546 @pass_through, deadline) 547 msg = 'message is a string' 548 client_call.remote_send(msg) 549 server_call = expect_server_to_receive(msg) 550 server_call.remote_send('server_response') 551 server_call.send_status(OK, 'status code is OK') 552 expect(client_call.remote_read).to eq('server_response') 553 expect do 554 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 555 end.to_not raise_error 556 expect { client_call.receive_and_check_status }.to_not raise_error 557 end 558 559 it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do 560 call = make_test_call 561 ActiveCall.client_invoke(call) 562 client_call = ActiveCall.new(call, @pass_through, 563 @pass_through, deadline) 564 msg = 'message is a string' 565 client_call.remote_send(msg) 566 server_call = expect_server_to_receive(msg) 567 server_call.remote_send('server_response') 568 server_call.send_status(OK, 'status code is OK') 569 expect(client_call.remote_read).to eq('server_response') 570 expect do 571 call.run_batch( 572 CallOps::SEND_CLOSE_FROM_CLIENT => nil, 573 CallOps::RECV_STATUS_ON_CLIENT => nil) 574 end.to_not raise_error 575 end 576 end 577 578 # Test sending of the initial metadata in #run_server_bidi 579 # from the server handler both implicitly and explicitly. 580 describe '#run_server_bidi metadata sending tests', run_server_bidi: true do 581 before(:each) do 582 @requests = ['first message', 'second message'] 583 @server_to_client_metadata = { 'test_key' => 'test_val' } 584 @server_status = OK 585 586 @client_call = make_test_call 587 @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 588 589 recvd_rpc = @received_rpcs_queue.pop 590 recvd_call = recvd_rpc.call 591 @server_call = ActiveCall.new( 592 recvd_call, 593 @pass_through, 594 @pass_through, 595 deadline, 596 metadata_received: true, 597 started: false, 598 metadata_to_send: @server_to_client_metadata) 599 end 600 601 after(:each) do 602 # Send the requests and send a close so the server can send a status 603 @requests.each do |message| 604 @client_call.run_batch(CallOps::SEND_MESSAGE => message) 605 end 606 @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 607 608 @server_thread.join 609 610 # Expect that initial metadata was sent, 611 # the requests were echoed, and a status was sent 612 batch_result = @client_call.run_batch( 613 CallOps::RECV_INITIAL_METADATA => nil) 614 expect(batch_result.metadata).to eq(@server_to_client_metadata) 615 616 @requests.each do |message| 617 batch_result = @client_call.run_batch( 618 CallOps::RECV_MESSAGE => nil) 619 expect(batch_result.message).to eq(message) 620 end 621 622 batch_result = @client_call.run_batch( 623 CallOps::RECV_STATUS_ON_CLIENT => nil) 624 expect(batch_result.status.code).to eq(@server_status) 625 end 626 627 it 'sends the initial metadata implicitly if not already sent' do 628 # Server handler that doesn't have access to a "call" 629 # It echoes the requests 630 fake_gen_each_reply_with_no_call_param = proc do |msgs| 631 msgs 632 end 633 634 int_ctx = GRPC::InterceptionContext.new 635 636 @server_thread = Thread.new do 637 @server_call.run_server_bidi( 638 fake_gen_each_reply_with_no_call_param, int_ctx) 639 @server_call.send_status(@server_status) 640 end 641 end 642 643 it 'sends the metadata when sent explicitly and not already sent' do 644 # Fake server handler that has access to a "call" object and 645 # uses it to explicitly update and send the initial metadata 646 fake_gen_each_reply_with_call_param = proc do |msgs, call_param| 647 call_param.merge_metadata_to_send(@server_to_client_metadata) 648 call_param.send_initial_metadata 649 msgs 650 end 651 int_ctx = GRPC::InterceptionContext.new 652 653 @server_thread = Thread.new do 654 @server_call.run_server_bidi( 655 fake_gen_each_reply_with_call_param, int_ctx) 656 @server_call.send_status(@server_status) 657 end 658 end 659 end 660 661 def expect_server_to_receive(sent_text, **kw) 662 c = expect_server_to_be_invoked(**kw) 663 expect(c.remote_read).to eq(sent_text) 664 c 665 end 666 667 def expect_server_to_be_invoked(**kw) 668 recvd_rpc = @received_rpcs_queue.pop 669 expect(recvd_rpc).to_not eq nil 670 recvd_call = recvd_rpc.call 671 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) 672 ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline, 673 metadata_received: true, started: true) 674 end 675 676 def make_test_call 677 @ch.create_call(nil, nil, '/method', nil, deadline) 678 end 679 680 def deadline 681 Time.now + 2 # in 2 seconds; arbitrary 682 end 683end 684