1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'spec_helper' 16 17include GRPC::Core::StatusCodes 18 19describe GRPC::ActiveCall do 20 ActiveCall = GRPC::ActiveCall 21 Call = GRPC::Core::Call 22 CallOps = GRPC::Core::CallOps 23 WriteFlags = GRPC::Core::WriteFlags 24 25 def ok_status 26 Struct::Status.new(OK, 'OK') 27 end 28 29 def send_and_receive_close_and_status(client_call, server_call) 30 client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 31 server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil, 32 CallOps::SEND_STATUS_FROM_SERVER => ok_status) 33 client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil) 34 end 35 36 def inner_call_of_active_call(active_call) 37 active_call.instance_variable_get(:@call) 38 end 39 40 before(:each) do 41 @pass_through = proc { |x| x } 42 host = '0.0.0.0:0' 43 @server = new_core_server_for_testing(nil) 44 server_port = @server.add_http2_port(host, :this_port_is_insecure) 45 @server.start 46 @received_rpcs_queue = Queue.new 47 @server_thread = Thread.new do 48 begin 49 received_rpc = @server.request_call 50 rescue GRPC::Core::CallError, StandardError => e 51 # enqueue the exception in this case as a way to indicate the error 52 received_rpc = e 53 end 54 @received_rpcs_queue.push(received_rpc) 55 end 56 @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, 57 :this_channel_is_insecure) 58 @call = make_test_call 59 end 60 61 after(:each) do 62 @server.shutdown_and_notify(deadline) 63 @server.close 64 @server_thread.join 65 # Don't rely on GC to unref the call, since that can prevent 66 # the channel connectivity state polling thread from shutting down. 67 @call.close 68 end 69 70 describe 'restricted view methods' do 71 before(:each) do 72 ActiveCall.client_invoke(@call) 73 @client_call = ActiveCall.new(@call, @pass_through, 74 @pass_through, deadline) 75 end 76 77 after(:each) do 78 # terminate the RPC that was started in before(:each) 79 recvd_rpc = @received_rpcs_queue.pop 80 recvd_call = recvd_rpc.call 81 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => nil) 82 @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 83 send_and_receive_close_and_status(@call, recvd_call) 84 end 85 86 describe '#multi_req_view' do 87 it 'exposes a fixed subset of the ActiveCall.methods' do 88 want = %w(cancelled?, deadline, each_remote_read, metadata, \ 89 shutdown, peer, peer_cert, send_initial_metadata, \ 90 initial_metadata_sent) 91 v = @client_call.multi_req_view 92 want.each do |w| 93 expect(v.methods.include?(w)) 94 end 95 end 96 end 97 98 describe '#single_req_view' do 99 it 'exposes a fixed subset of the ActiveCall.methods' do 100 want = %w(cancelled?, deadline, metadata, shutdown, \ 101 send_initial_metadata, metadata_to_send, \ 102 merge_metadata_to_send, initial_metadata_sent) 103 v = @client_call.single_req_view 104 want.each do |w| 105 expect(v.methods.include?(w)) 106 end 107 end 108 end 109 110 describe '#interceptable' do 111 it 'exposes a fixed subset of the ActiveCall.methods' do 112 want = %w(deadline) 113 v = @client_call.interceptable 114 want.each do |w| 115 expect(v.methods.include?(w)) 116 end 117 end 118 end 119 end 120 121 describe '#remote_send' do 122 it 'allows a client to send a payload to the server', test: true do 123 ActiveCall.client_invoke(@call) 124 client_call = ActiveCall.new(@call, @pass_through, 125 @pass_through, deadline) 126 msg = 'message is a string' 127 client_call.remote_send(msg) 128 129 # check that server rpc new was received 130 recvd_rpc = @received_rpcs_queue.pop 131 expect(recvd_rpc).to_not eq nil 132 recvd_call = recvd_rpc.call 133 134 # Accept the call, and verify that the server reads the response ok. 135 server_call = ActiveCall.new(recvd_call, @pass_through, 136 @pass_through, deadline, 137 metadata_received: true, 138 started: false) 139 expect(server_call.remote_read).to eq(msg) 140 # finish the call 141 server_call.send_initial_metadata 142 @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 143 send_and_receive_close_and_status(@call, recvd_call) 144 end 145 146 it 'marshals the payload using the marshal func' do 147 ActiveCall.client_invoke(@call) 148 marshal = proc { |x| 'marshalled:' + x } 149 client_call = ActiveCall.new(@call, marshal, @pass_through, deadline) 150 msg = 'message is a string' 151 client_call.remote_send(msg) 152 153 # confirm that the message was marshalled 154 recvd_rpc = @received_rpcs_queue.pop 155 recvd_call = recvd_rpc.call 156 server_ops = { 157 CallOps::SEND_INITIAL_METADATA => nil 158 } 159 recvd_call.run_batch(server_ops) 160 server_call = ActiveCall.new(recvd_call, @pass_through, 161 @pass_through, deadline, 162 metadata_received: true) 163 expect(server_call.remote_read).to eq('marshalled:' + msg) 164 # finish the call 165 @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 166 send_and_receive_close_and_status(@call, recvd_call) 167 end 168 169 TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] 170 TEST_WRITE_FLAGS.each do |f| 171 it "successfully makes calls with write_flag set to #{f}" do 172 ActiveCall.client_invoke(@call) 173 marshal = proc { |x| 'marshalled:' + x } 174 client_call = ActiveCall.new(@call, marshal, 175 @pass_through, deadline) 176 msg = 'message is a string' 177 client_call.write_flag = f 178 client_call.remote_send(msg) 179 # flush the message in case writes are set to buffered 180 @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 181 182 # confirm that the message was marshalled 183 recvd_rpc = @received_rpcs_queue.pop 184 recvd_call = recvd_rpc.call 185 server_ops = { 186 CallOps::SEND_INITIAL_METADATA => nil 187 } 188 recvd_call.run_batch(server_ops) 189 server_call = ActiveCall.new(recvd_call, @pass_through, 190 @pass_through, deadline, 191 metadata_received: true) 192 expect(server_call.remote_read).to eq('marshalled:' + msg) 193 # finish the call 194 server_call.send_status(OK, '', true) 195 client_call.receive_and_check_status 196 end 197 end 198 end 199 200 describe 'sending initial metadata', send_initial_metadata: true do 201 it 'sends metadata before sending a message if it hasnt been sent yet' do 202 @client_call = ActiveCall.new( 203 @call, 204 @pass_through, 205 @pass_through, 206 deadline, 207 started: false) 208 209 metadata = { key: 'phony_val', other: 'other_val' } 210 expect(@client_call.metadata_sent).to eq(false) 211 @client_call.merge_metadata_to_send(metadata) 212 213 message = 'phony message' 214 215 expect(@call).to( 216 receive(:run_batch) 217 .with( 218 hash_including( 219 CallOps::SEND_INITIAL_METADATA => metadata)).once) 220 221 expect(@call).to( 222 receive(:run_batch).with(hash_including( 223 CallOps::SEND_MESSAGE => message)).once) 224 @client_call.remote_send(message) 225 226 expect(@client_call.metadata_sent).to eq(true) 227 end 228 229 it 'doesnt send metadata if it thinks its already been sent' do 230 @client_call = ActiveCall.new(@call, 231 @pass_through, 232 @pass_through, 233 deadline) 234 expect(@client_call.metadata_sent).to eql(true) 235 expect(@call).to( 236 receive(:run_batch).with(hash_including( 237 CallOps::SEND_INITIAL_METADATA)).never) 238 239 @client_call.remote_send('test message') 240 end 241 242 it 'sends metadata if it is explicitly sent and ok to do so' do 243 @client_call = ActiveCall.new(@call, 244 @pass_through, 245 @pass_through, 246 deadline, 247 started: false) 248 249 expect(@client_call.metadata_sent).to eql(false) 250 251 metadata = { test_key: 'val' } 252 @client_call.merge_metadata_to_send(metadata) 253 expect(@client_call.metadata_to_send).to eq(metadata) 254 255 expect(@call).to( 256 receive(:run_batch).with(hash_including( 257 CallOps::SEND_INITIAL_METADATA => 258 metadata)).once) 259 @client_call.send_initial_metadata 260 end 261 262 it 'explicit sending does nothing if metadata has already been sent' do 263 @client_call = ActiveCall.new(@call, 264 @pass_through, 265 @pass_through, 266 deadline) 267 268 expect(@client_call.metadata_sent).to eql(true) 269 270 blk = proc do 271 @client_call.send_initial_metadata 272 end 273 274 expect { blk.call }.to_not raise_error 275 end 276 end 277 278 describe '#merge_metadata_to_send', merge_metadata_to_send: true do 279 it 'adds to existing metadata when there is existing metadata to send' do 280 starting_metadata = { 281 k1: 'key1_val', 282 k2: 'key2_val', 283 k3: 'key3_val' 284 } 285 286 @client_call = ActiveCall.new( 287 @call, 288 @pass_through, @pass_through, 289 deadline, 290 started: false, 291 metadata_to_send: starting_metadata) 292 293 expect(@client_call.metadata_to_send).to eq(starting_metadata) 294 295 @client_call.merge_metadata_to_send( 296 k3: 'key3_new_val', 297 k4: 'key4_val') 298 299 expected_md_to_send = { 300 k1: 'key1_val', 301 k2: 'key2_val', 302 k3: 'key3_new_val', 303 k4: 'key4_val' } 304 305 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 306 307 @client_call.merge_metadata_to_send(k5: 'key5_val') 308 expected_md_to_send.merge!(k5: 'key5_val') 309 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 310 end 311 312 it 'fails when initial metadata has already been sent' do 313 @client_call = ActiveCall.new( 314 @call, 315 @pass_through, 316 @pass_through, 317 deadline, 318 started: true) 319 320 expect(@client_call.metadata_sent).to eq(true) 321 322 blk = proc do 323 @client_call.merge_metadata_to_send(k1: 'key1_val') 324 end 325 326 expect { blk.call }.to raise_error 327 end 328 end 329 330 describe '#client_invoke' do 331 it 'sends metadata to the server when present' do 332 metadata = { k1: 'v1', k2: 'v2' } 333 ActiveCall.client_invoke(@call, metadata) 334 recvd_rpc = @received_rpcs_queue.pop 335 recvd_call = recvd_rpc.call 336 expect(recvd_call).to_not be_nil 337 expect(recvd_rpc.metadata).to_not be_nil 338 expect(recvd_rpc.metadata['k1']).to eq('v1') 339 expect(recvd_rpc.metadata['k2']).to eq('v2') 340 # finish the call 341 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 342 @call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 343 send_and_receive_close_and_status(@call, recvd_call) 344 end 345 end 346 347 describe '#send_status', send_status: true do 348 it 'works when no metadata or messages have been sent yet' do 349 ActiveCall.client_invoke(@call) 350 351 recvd_rpc = @received_rpcs_queue.pop 352 server_call = ActiveCall.new( 353 recvd_rpc.call, 354 @pass_through, 355 @pass_through, 356 deadline, 357 started: false) 358 359 expect(server_call.metadata_sent).to eq(false) 360 blk = proc { server_call.send_status(OK) } 361 expect { blk.call }.to_not raise_error 362 end 363 end 364 365 describe '#remote_read', remote_read: true do 366 it 'reads the response sent by a server' do 367 ActiveCall.client_invoke(@call) 368 client_call = ActiveCall.new(@call, @pass_through, 369 @pass_through, deadline) 370 msg = 'message is a string' 371 client_call.remote_send(msg) 372 server_call = expect_server_to_receive(msg) 373 server_call.remote_send('server_response') 374 expect(client_call.remote_read).to eq('server_response') 375 send_and_receive_close_and_status( 376 @call, inner_call_of_active_call(server_call)) 377 end 378 379 it 'saves no metadata when the server adds no metadata' do 380 ActiveCall.client_invoke(@call) 381 client_call = ActiveCall.new(@call, @pass_through, 382 @pass_through, deadline) 383 msg = 'message is a string' 384 client_call.remote_send(msg) 385 server_call = expect_server_to_receive(msg) 386 server_call.remote_send('ignore me') 387 expect(client_call.metadata).to be_nil 388 client_call.remote_read 389 expect(client_call.metadata).to eq({}) 390 send_and_receive_close_and_status( 391 @call, inner_call_of_active_call(server_call)) 392 end 393 394 it 'saves metadata add by the server' do 395 ActiveCall.client_invoke(@call) 396 client_call = ActiveCall.new(@call, @pass_through, 397 @pass_through, deadline) 398 msg = 'message is a string' 399 client_call.remote_send(msg) 400 server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') 401 server_call.remote_send('ignore me') 402 expect(client_call.metadata).to be_nil 403 client_call.remote_read 404 expected = { 'k1' => 'v1', 'k2' => 'v2' } 405 expect(client_call.metadata).to eq(expected) 406 send_and_receive_close_and_status( 407 @call, inner_call_of_active_call(server_call)) 408 end 409 410 it 'get a status from server when nothing else sent from server' do 411 ActiveCall.client_invoke(@call) 412 413 recvd_rpc = @received_rpcs_queue.pop 414 recvd_call = recvd_rpc.call 415 416 server_call = ActiveCall.new( 417 recvd_call, 418 @pass_through, 419 @pass_through, 420 deadline, 421 started: false) 422 423 server_call.send_status(OK, 'OK') 424 425 # Check that we can receive initial metadata and a status 426 @call.run_batch( 427 CallOps::RECV_INITIAL_METADATA => nil) 428 batch_result = @call.run_batch( 429 CallOps::RECV_STATUS_ON_CLIENT => nil) 430 431 expect(batch_result.status.code).to eq(OK) 432 end 433 434 it 'get a nil msg before a status when an OK status is sent' do 435 ActiveCall.client_invoke(@call) 436 client_call = ActiveCall.new(@call, @pass_through, 437 @pass_through, deadline) 438 msg = 'message is a string' 439 client_call.remote_send(msg) 440 @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 441 server_call = expect_server_to_receive(msg) 442 server_call.remote_send('server_response') 443 server_call.send_status(OK, 'OK') 444 expect(client_call.remote_read).to eq('server_response') 445 res = client_call.remote_read 446 expect(res).to be_nil 447 end 448 449 it 'unmarshals the response using the unmarshal func' do 450 ActiveCall.client_invoke(@call) 451 unmarshal = proc { |x| 'unmarshalled:' + x } 452 client_call = ActiveCall.new(@call, @pass_through, 453 unmarshal, deadline) 454 455 # confirm the client receives the unmarshalled message 456 msg = 'message is a string' 457 client_call.remote_send(msg) 458 server_call = expect_server_to_receive(msg) 459 server_call.remote_send('server_response') 460 expect(client_call.remote_read).to eq('unmarshalled:server_response') 461 send_and_receive_close_and_status( 462 @call, inner_call_of_active_call(server_call)) 463 end 464 end 465 466 describe '#each_remote_read' do 467 it 'creates an Enumerator' do 468 client_call = ActiveCall.new(@call, @pass_through, 469 @pass_through, deadline) 470 expect(client_call.each_remote_read).to be_a(Enumerator) 471 # finish the call 472 client_call.cancel 473 end 474 475 it 'the returned enumerator can read n responses' do 476 ActiveCall.client_invoke(@call) 477 client_call = ActiveCall.new(@call, @pass_through, 478 @pass_through, deadline) 479 msg = 'message is a string' 480 reply = 'server_response' 481 client_call.remote_send(msg) 482 server_call = expect_server_to_receive(msg) 483 e = client_call.each_remote_read 484 n = 3 # arbitrary value > 1 485 n.times do 486 server_call.remote_send(reply) 487 expect(e.next).to eq(reply) 488 end 489 send_and_receive_close_and_status( 490 @call, inner_call_of_active_call(server_call)) 491 end 492 493 it 'the returns an enumerator that stops after an OK Status' do 494 ActiveCall.client_invoke(@call) 495 client_call = ActiveCall.new(@call, @pass_through, 496 @pass_through, deadline) 497 msg = 'message is a string' 498 reply = 'server_response' 499 client_call.remote_send(msg) 500 @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 501 server_call = expect_server_to_receive(msg) 502 e = client_call.each_remote_read 503 n = 3 # arbitrary value > 1 504 n.times do 505 server_call.remote_send(reply) 506 expect(e.next).to eq(reply) 507 end 508 server_call.send_status(OK, 'OK', true) 509 expect { e.next }.to raise_error(StopIteration) 510 end 511 end 512 513 describe '#closing the call from the client' do 514 it 'finishes ok if the server sends a status response' do 515 ActiveCall.client_invoke(@call) 516 client_call = ActiveCall.new(@call, @pass_through, 517 @pass_through, deadline) 518 msg = 'message is a string' 519 client_call.remote_send(msg) 520 expect do 521 @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 522 end.to_not raise_error 523 server_call = expect_server_to_receive(msg) 524 server_call.remote_send('server_response') 525 expect(client_call.remote_read).to eq('server_response') 526 server_call.send_status(OK, 'status code is OK') 527 expect { client_call.receive_and_check_status }.to_not raise_error 528 end 529 530 it 'finishes ok if the server sends an early status response' do 531 ActiveCall.client_invoke(@call) 532 client_call = ActiveCall.new(@call, @pass_through, 533 @pass_through, deadline) 534 msg = 'message is a string' 535 client_call.remote_send(msg) 536 server_call = expect_server_to_receive(msg) 537 server_call.remote_send('server_response') 538 server_call.send_status(OK, 'status code is OK') 539 expect(client_call.remote_read).to eq('server_response') 540 expect do 541 @call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 542 end.to_not raise_error 543 expect { client_call.receive_and_check_status }.to_not raise_error 544 end 545 546 it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do 547 ActiveCall.client_invoke(@call) 548 client_call = ActiveCall.new(@call, @pass_through, 549 @pass_through, deadline) 550 msg = 'message is a string' 551 client_call.remote_send(msg) 552 server_call = expect_server_to_receive(msg) 553 server_call.remote_send('server_response') 554 server_call.send_status(OK, 'status code is OK') 555 expect(client_call.remote_read).to eq('server_response') 556 expect do 557 @call.run_batch( 558 CallOps::SEND_CLOSE_FROM_CLIENT => nil, 559 CallOps::RECV_STATUS_ON_CLIENT => nil) 560 end.to_not raise_error 561 end 562 end 563 564 # Test sending of the initial metadata in #run_server_bidi 565 # from the server handler both implicitly and explicitly. 566 describe '#run_server_bidi metadata sending tests', run_server_bidi: true do 567 before(:each) do 568 @requests = ['first message', 'second message'] 569 @server_to_client_metadata = { 'test_key' => 'test_val' } 570 @server_status = OK 571 572 @client_call = make_test_call 573 @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 574 575 recvd_rpc = @received_rpcs_queue.pop 576 recvd_call = recvd_rpc.call 577 @server_call = ActiveCall.new( 578 recvd_call, 579 @pass_through, 580 @pass_through, 581 deadline, 582 metadata_received: true, 583 started: false, 584 metadata_to_send: @server_to_client_metadata) 585 end 586 587 after(:each) do 588 # Send the requests and send a close so the server can send a status 589 @requests.each do |message| 590 @client_call.run_batch(CallOps::SEND_MESSAGE => message) 591 end 592 @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 593 594 @server_thread.join 595 596 # Expect that initial metadata was sent, 597 # the requests were echoed, and a status was sent 598 batch_result = @client_call.run_batch( 599 CallOps::RECV_INITIAL_METADATA => nil) 600 expect(batch_result.metadata).to eq(@server_to_client_metadata) 601 602 @requests.each do |message| 603 batch_result = @client_call.run_batch( 604 CallOps::RECV_MESSAGE => nil) 605 expect(batch_result.message).to eq(message) 606 end 607 608 batch_result = @client_call.run_batch( 609 CallOps::RECV_STATUS_ON_CLIENT => nil) 610 expect(batch_result.status.code).to eq(@server_status) 611 @client_call.close 612 end 613 614 it 'sends the initial metadata implicitly if not already sent' do 615 # Server handler that doesn't have access to a "call" 616 # It echoes the requests 617 fake_gen_each_reply_with_no_call_param = proc do |msgs| 618 msgs 619 end 620 621 int_ctx = GRPC::InterceptionContext.new 622 623 @server_thread = Thread.new do 624 @server_call.run_server_bidi( 625 fake_gen_each_reply_with_no_call_param, int_ctx) 626 @server_call.send_status(@server_status) 627 end 628 end 629 630 it 'sends the metadata when sent explicitly and not already sent' do 631 # Fake server handler that has access to a "call" object and 632 # uses it to explicitly update and send the initial metadata 633 fake_gen_each_reply_with_call_param = proc do |msgs, call_param| 634 call_param.merge_metadata_to_send(@server_to_client_metadata) 635 call_param.send_initial_metadata 636 msgs 637 end 638 int_ctx = GRPC::InterceptionContext.new 639 640 @server_thread = Thread.new do 641 @server_call.run_server_bidi( 642 fake_gen_each_reply_with_call_param, int_ctx) 643 @server_call.send_status(@server_status) 644 end 645 end 646 end 647 648 def expect_server_to_receive(sent_text, **kw) 649 c = expect_server_to_be_invoked(**kw) 650 expect(c.remote_read).to eq(sent_text) 651 c 652 end 653 654 def expect_server_to_be_invoked(**kw) 655 recvd_rpc = @received_rpcs_queue.pop 656 expect(recvd_rpc).to_not eq nil 657 recvd_call = recvd_rpc.call 658 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) 659 ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline, 660 metadata_received: true, started: true) 661 end 662 663 def make_test_call 664 @ch.create_call(nil, nil, '/method', nil, deadline) 665 end 666 667 def deadline 668 Time.now + 2 # in 2 seconds; arbitrary 669 end 670end 671