1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'spec_helper' 16 17include GRPC::Core::StatusCodes 18 19describe GRPC::ActiveCall do 20 ActiveCall = GRPC::ActiveCall 21 Call = GRPC::Core::Call 22 CallOps = GRPC::Core::CallOps 23 WriteFlags = GRPC::Core::WriteFlags 24 25 def ok_status 26 Struct::Status.new(OK, 'OK') 27 end 28 29 def send_and_receive_close_and_status(client_call, server_call) 30 client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 31 server_call.run_batch(CallOps::RECV_CLOSE_ON_SERVER => nil, 32 CallOps::SEND_STATUS_FROM_SERVER => ok_status) 33 client_call.run_batch(CallOps::RECV_STATUS_ON_CLIENT => nil) 34 end 35 36 def inner_call_of_active_call(active_call) 37 active_call.instance_variable_get(:@call) 38 end 39 40 before(:each) do 41 @pass_through = proc { |x| x } 42 host = '0.0.0.0:0' 43 @server = new_core_server_for_testing(nil) 44 server_port = @server.add_http2_port(host, :this_port_is_insecure) 45 @server.start 46 @ch = GRPC::Core::Channel.new("0.0.0.0:#{server_port}", nil, 47 :this_channel_is_insecure) 48 end 49 50 after(:each) do 51 @server.shutdown_and_notify(deadline) 52 @server.close 53 end 54 55 describe 'restricted view methods' do 56 before(:each) do 57 call = make_test_call 58 ActiveCall.client_invoke(call) 59 @client_call = ActiveCall.new(call, @pass_through, 60 @pass_through, deadline) 61 end 62 63 describe '#multi_req_view' do 64 it 'exposes a fixed subset of the ActiveCall.methods' do 65 want = %w(cancelled?, deadline, each_remote_read, metadata, \ 66 shutdown, peer, peer_cert, send_initial_metadata, \ 67 initial_metadata_sent) 68 v = @client_call.multi_req_view 69 want.each do |w| 70 expect(v.methods.include?(w)) 71 end 72 end 73 end 74 75 describe '#single_req_view' do 76 it 'exposes a fixed subset of the ActiveCall.methods' do 77 want = %w(cancelled?, deadline, metadata, shutdown, \ 78 send_initial_metadata, metadata_to_send, \ 79 merge_metadata_to_send, initial_metadata_sent) 80 v = @client_call.single_req_view 81 want.each do |w| 82 expect(v.methods.include?(w)) 83 end 84 end 85 end 86 87 describe '#interceptable' do 88 it 'exposes a fixed subset of the ActiveCall.methods' do 89 want = %w(deadline) 90 v = @client_call.interceptable 91 want.each do |w| 92 expect(v.methods.include?(w)) 93 end 94 end 95 end 96 end 97 98 describe '#remote_send' do 99 it 'allows a client to send a payload to the server', test: true do 100 call = make_test_call 101 ActiveCall.client_invoke(call) 102 client_call = ActiveCall.new(call, @pass_through, 103 @pass_through, deadline) 104 msg = 'message is a string' 105 client_call.remote_send(msg) 106 107 # check that server rpc new was received 108 recvd_rpc = @server.request_call 109 expect(recvd_rpc).to_not eq nil 110 recvd_call = recvd_rpc.call 111 112 # Accept the call, and verify that the server reads the response ok. 113 server_call = ActiveCall.new(recvd_call, @pass_through, 114 @pass_through, deadline, 115 metadata_received: true, 116 started: false) 117 expect(server_call.remote_read).to eq(msg) 118 # finish the call 119 server_call.send_initial_metadata 120 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 121 send_and_receive_close_and_status(call, recvd_call) 122 end 123 124 it 'marshals the payload using the marshal func' do 125 call = make_test_call 126 ActiveCall.client_invoke(call) 127 marshal = proc { |x| 'marshalled:' + x } 128 client_call = ActiveCall.new(call, marshal, @pass_through, deadline) 129 msg = 'message is a string' 130 client_call.remote_send(msg) 131 132 # confirm that the message was marshalled 133 recvd_rpc = @server.request_call 134 recvd_call = recvd_rpc.call 135 server_ops = { 136 CallOps::SEND_INITIAL_METADATA => nil 137 } 138 recvd_call.run_batch(server_ops) 139 server_call = ActiveCall.new(recvd_call, @pass_through, 140 @pass_through, deadline, 141 metadata_received: true) 142 expect(server_call.remote_read).to eq('marshalled:' + msg) 143 # finish the call 144 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 145 send_and_receive_close_and_status(call, recvd_call) 146 end 147 148 TEST_WRITE_FLAGS = [WriteFlags::BUFFER_HINT, WriteFlags::NO_COMPRESS] 149 TEST_WRITE_FLAGS.each do |f| 150 it "successfully makes calls with write_flag set to #{f}" do 151 call = make_test_call 152 ActiveCall.client_invoke(call) 153 marshal = proc { |x| 'marshalled:' + x } 154 client_call = ActiveCall.new(call, marshal, 155 @pass_through, deadline) 156 msg = 'message is a string' 157 client_call.write_flag = f 158 client_call.remote_send(msg) 159 # flush the message in case writes are set to buffered 160 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) if f == 1 161 162 # confirm that the message was marshalled 163 recvd_rpc = @server.request_call 164 recvd_call = recvd_rpc.call 165 server_ops = { 166 CallOps::SEND_INITIAL_METADATA => nil 167 } 168 recvd_call.run_batch(server_ops) 169 server_call = ActiveCall.new(recvd_call, @pass_through, 170 @pass_through, deadline, 171 metadata_received: true) 172 expect(server_call.remote_read).to eq('marshalled:' + msg) 173 # finish the call 174 server_call.send_status(OK, '', true) 175 client_call.receive_and_check_status 176 end 177 end 178 end 179 180 describe 'sending initial metadata', send_initial_metadata: true do 181 it 'sends metadata before sending a message if it hasnt been sent yet' do 182 call = make_test_call 183 @client_call = ActiveCall.new( 184 call, 185 @pass_through, 186 @pass_through, 187 deadline, 188 started: false) 189 190 metadata = { key: 'dummy_val', other: 'other_val' } 191 expect(@client_call.metadata_sent).to eq(false) 192 @client_call.merge_metadata_to_send(metadata) 193 194 message = 'dummy message' 195 196 expect(call).to( 197 receive(:run_batch) 198 .with( 199 hash_including( 200 CallOps::SEND_INITIAL_METADATA => metadata)).once) 201 202 expect(call).to( 203 receive(:run_batch).with(hash_including( 204 CallOps::SEND_MESSAGE => message)).once) 205 @client_call.remote_send(message) 206 207 expect(@client_call.metadata_sent).to eq(true) 208 end 209 210 it 'doesnt send metadata if it thinks its already been sent' do 211 call = make_test_call 212 213 @client_call = ActiveCall.new(call, 214 @pass_through, 215 @pass_through, 216 deadline) 217 expect(@client_call.metadata_sent).to eql(true) 218 expect(call).to( 219 receive(:run_batch).with(hash_including( 220 CallOps::SEND_INITIAL_METADATA)).never) 221 222 @client_call.remote_send('test message') 223 end 224 225 it 'sends metadata if it is explicitly sent and ok to do so' do 226 call = make_test_call 227 228 @client_call = ActiveCall.new(call, 229 @pass_through, 230 @pass_through, 231 deadline, 232 started: false) 233 234 expect(@client_call.metadata_sent).to eql(false) 235 236 metadata = { test_key: 'val' } 237 @client_call.merge_metadata_to_send(metadata) 238 expect(@client_call.metadata_to_send).to eq(metadata) 239 240 expect(call).to( 241 receive(:run_batch).with(hash_including( 242 CallOps::SEND_INITIAL_METADATA => 243 metadata)).once) 244 @client_call.send_initial_metadata 245 end 246 247 it 'explicit sending does nothing if metadata has already been sent' do 248 call = make_test_call 249 250 @client_call = ActiveCall.new(call, 251 @pass_through, 252 @pass_through, 253 deadline) 254 255 expect(@client_call.metadata_sent).to eql(true) 256 257 blk = proc do 258 @client_call.send_initial_metadata 259 end 260 261 expect { blk.call }.to_not raise_error 262 end 263 end 264 265 describe '#merge_metadata_to_send', merge_metadata_to_send: true do 266 it 'adds to existing metadata when there is existing metadata to send' do 267 call = make_test_call 268 starting_metadata = { 269 k1: 'key1_val', 270 k2: 'key2_val', 271 k3: 'key3_val' 272 } 273 274 @client_call = ActiveCall.new( 275 call, 276 @pass_through, @pass_through, 277 deadline, 278 started: false, 279 metadata_to_send: starting_metadata) 280 281 expect(@client_call.metadata_to_send).to eq(starting_metadata) 282 283 @client_call.merge_metadata_to_send( 284 k3: 'key3_new_val', 285 k4: 'key4_val') 286 287 expected_md_to_send = { 288 k1: 'key1_val', 289 k2: 'key2_val', 290 k3: 'key3_new_val', 291 k4: 'key4_val' } 292 293 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 294 295 @client_call.merge_metadata_to_send(k5: 'key5_val') 296 expected_md_to_send.merge!(k5: 'key5_val') 297 expect(@client_call.metadata_to_send).to eq(expected_md_to_send) 298 end 299 300 it 'fails when initial metadata has already been sent' do 301 call = make_test_call 302 @client_call = ActiveCall.new( 303 call, 304 @pass_through, 305 @pass_through, 306 deadline, 307 started: true) 308 309 expect(@client_call.metadata_sent).to eq(true) 310 311 blk = proc do 312 @client_call.merge_metadata_to_send(k1: 'key1_val') 313 end 314 315 expect { blk.call }.to raise_error 316 end 317 end 318 319 describe '#client_invoke' do 320 it 'sends metadata to the server when present' do 321 call = make_test_call 322 metadata = { k1: 'v1', k2: 'v2' } 323 ActiveCall.client_invoke(call, metadata) 324 recvd_rpc = @server.request_call 325 recvd_call = recvd_rpc.call 326 expect(recvd_call).to_not be_nil 327 expect(recvd_rpc.metadata).to_not be_nil 328 expect(recvd_rpc.metadata['k1']).to eq('v1') 329 expect(recvd_rpc.metadata['k2']).to eq('v2') 330 # finish the call 331 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 332 call.run_batch(CallOps::RECV_INITIAL_METADATA => nil) 333 send_and_receive_close_and_status(call, recvd_call) 334 end 335 end 336 337 describe '#send_status', send_status: true do 338 it 'works when no metadata or messages have been sent yet' do 339 call = make_test_call 340 ActiveCall.client_invoke(call) 341 342 recvd_rpc = @server.request_call 343 server_call = ActiveCall.new( 344 recvd_rpc.call, 345 @pass_through, 346 @pass_through, 347 deadline, 348 started: false) 349 350 expect(server_call.metadata_sent).to eq(false) 351 blk = proc { server_call.send_status(OK) } 352 expect { blk.call }.to_not raise_error 353 end 354 end 355 356 describe '#remote_read', remote_read: true do 357 it 'reads the response sent by a server' do 358 call = make_test_call 359 ActiveCall.client_invoke(call) 360 client_call = ActiveCall.new(call, @pass_through, 361 @pass_through, deadline) 362 msg = 'message is a string' 363 client_call.remote_send(msg) 364 server_call = expect_server_to_receive(msg) 365 server_call.remote_send('server_response') 366 expect(client_call.remote_read).to eq('server_response') 367 send_and_receive_close_and_status( 368 call, inner_call_of_active_call(server_call)) 369 end 370 371 it 'saves no metadata when the server adds no metadata' do 372 call = make_test_call 373 ActiveCall.client_invoke(call) 374 client_call = ActiveCall.new(call, @pass_through, 375 @pass_through, deadline) 376 msg = 'message is a string' 377 client_call.remote_send(msg) 378 server_call = expect_server_to_receive(msg) 379 server_call.remote_send('ignore me') 380 expect(client_call.metadata).to be_nil 381 client_call.remote_read 382 expect(client_call.metadata).to eq({}) 383 send_and_receive_close_and_status( 384 call, inner_call_of_active_call(server_call)) 385 end 386 387 it 'saves metadata add by the server' do 388 call = make_test_call 389 ActiveCall.client_invoke(call) 390 client_call = ActiveCall.new(call, @pass_through, 391 @pass_through, deadline) 392 msg = 'message is a string' 393 client_call.remote_send(msg) 394 server_call = expect_server_to_receive(msg, k1: 'v1', k2: 'v2') 395 server_call.remote_send('ignore me') 396 expect(client_call.metadata).to be_nil 397 client_call.remote_read 398 expected = { 'k1' => 'v1', 'k2' => 'v2' } 399 expect(client_call.metadata).to eq(expected) 400 send_and_receive_close_and_status( 401 call, inner_call_of_active_call(server_call)) 402 end 403 404 it 'get a status from server when nothing else sent from server' do 405 client_call = make_test_call 406 ActiveCall.client_invoke(client_call) 407 408 recvd_rpc = @server.request_call 409 recvd_call = recvd_rpc.call 410 411 server_call = ActiveCall.new( 412 recvd_call, 413 @pass_through, 414 @pass_through, 415 deadline, 416 started: false) 417 418 server_call.send_status(OK, 'OK') 419 420 # Check that we can receive initial metadata and a status 421 client_call.run_batch( 422 CallOps::RECV_INITIAL_METADATA => nil) 423 batch_result = client_call.run_batch( 424 CallOps::RECV_STATUS_ON_CLIENT => nil) 425 426 expect(batch_result.status.code).to eq(OK) 427 end 428 429 it 'get a nil msg before a status when an OK status is sent' do 430 call = make_test_call 431 ActiveCall.client_invoke(call) 432 client_call = ActiveCall.new(call, @pass_through, 433 @pass_through, deadline) 434 msg = 'message is a string' 435 client_call.remote_send(msg) 436 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 437 server_call = expect_server_to_receive(msg) 438 server_call.remote_send('server_response') 439 server_call.send_status(OK, 'OK') 440 expect(client_call.remote_read).to eq('server_response') 441 res = client_call.remote_read 442 expect(res).to be_nil 443 end 444 445 it 'unmarshals the response using the unmarshal func' do 446 call = make_test_call 447 ActiveCall.client_invoke(call) 448 unmarshal = proc { |x| 'unmarshalled:' + x } 449 client_call = ActiveCall.new(call, @pass_through, 450 unmarshal, deadline) 451 452 # confirm the client receives the unmarshalled message 453 msg = 'message is a string' 454 client_call.remote_send(msg) 455 server_call = expect_server_to_receive(msg) 456 server_call.remote_send('server_response') 457 expect(client_call.remote_read).to eq('unmarshalled:server_response') 458 send_and_receive_close_and_status( 459 call, inner_call_of_active_call(server_call)) 460 end 461 end 462 463 describe '#each_remote_read' do 464 it 'creates an Enumerator' do 465 call = make_test_call 466 client_call = ActiveCall.new(call, @pass_through, 467 @pass_through, deadline) 468 expect(client_call.each_remote_read).to be_a(Enumerator) 469 # finish the call 470 client_call.cancel 471 end 472 473 it 'the returned enumerator can read n responses' do 474 call = make_test_call 475 ActiveCall.client_invoke(call) 476 client_call = ActiveCall.new(call, @pass_through, 477 @pass_through, deadline) 478 msg = 'message is a string' 479 reply = 'server_response' 480 client_call.remote_send(msg) 481 server_call = expect_server_to_receive(msg) 482 e = client_call.each_remote_read 483 n = 3 # arbitrary value > 1 484 n.times do 485 server_call.remote_send(reply) 486 expect(e.next).to eq(reply) 487 end 488 send_and_receive_close_and_status( 489 call, inner_call_of_active_call(server_call)) 490 end 491 492 it 'the returns an enumerator that stops after an OK Status' do 493 call = make_test_call 494 ActiveCall.client_invoke(call) 495 client_call = ActiveCall.new(call, @pass_through, 496 @pass_through, deadline) 497 msg = 'message is a string' 498 reply = 'server_response' 499 client_call.remote_send(msg) 500 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 501 server_call = expect_server_to_receive(msg) 502 e = client_call.each_remote_read 503 n = 3 # arbitrary value > 1 504 n.times do 505 server_call.remote_send(reply) 506 expect(e.next).to eq(reply) 507 end 508 server_call.send_status(OK, 'OK', true) 509 expect { e.next }.to raise_error(StopIteration) 510 end 511 end 512 513 describe '#closing the call from the client' do 514 it 'finishes ok if the server sends a status response' do 515 call = make_test_call 516 ActiveCall.client_invoke(call) 517 client_call = ActiveCall.new(call, @pass_through, 518 @pass_through, deadline) 519 msg = 'message is a string' 520 client_call.remote_send(msg) 521 expect do 522 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 523 end.to_not raise_error 524 server_call = expect_server_to_receive(msg) 525 server_call.remote_send('server_response') 526 expect(client_call.remote_read).to eq('server_response') 527 server_call.send_status(OK, 'status code is OK') 528 expect { client_call.receive_and_check_status }.to_not raise_error 529 end 530 531 it 'finishes ok if the server sends an early status response' do 532 call = make_test_call 533 ActiveCall.client_invoke(call) 534 client_call = ActiveCall.new(call, @pass_through, 535 @pass_through, deadline) 536 msg = 'message is a string' 537 client_call.remote_send(msg) 538 server_call = expect_server_to_receive(msg) 539 server_call.remote_send('server_response') 540 server_call.send_status(OK, 'status code is OK') 541 expect(client_call.remote_read).to eq('server_response') 542 expect do 543 call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 544 end.to_not raise_error 545 expect { client_call.receive_and_check_status }.to_not raise_error 546 end 547 548 it 'finishes ok if SEND_CLOSE and RECV_STATUS has been sent' do 549 call = make_test_call 550 ActiveCall.client_invoke(call) 551 client_call = ActiveCall.new(call, @pass_through, 552 @pass_through, deadline) 553 msg = 'message is a string' 554 client_call.remote_send(msg) 555 server_call = expect_server_to_receive(msg) 556 server_call.remote_send('server_response') 557 server_call.send_status(OK, 'status code is OK') 558 expect(client_call.remote_read).to eq('server_response') 559 expect do 560 call.run_batch( 561 CallOps::SEND_CLOSE_FROM_CLIENT => nil, 562 CallOps::RECV_STATUS_ON_CLIENT => nil) 563 end.to_not raise_error 564 end 565 end 566 567 # Test sending of the initial metadata in #run_server_bidi 568 # from the server handler both implicitly and explicitly. 569 describe '#run_server_bidi metadata sending tests', run_server_bidi: true do 570 before(:each) do 571 @requests = ['first message', 'second message'] 572 @server_to_client_metadata = { 'test_key' => 'test_val' } 573 @server_status = OK 574 575 @client_call = make_test_call 576 @client_call.run_batch(CallOps::SEND_INITIAL_METADATA => {}) 577 578 recvd_rpc = @server.request_call 579 recvd_call = recvd_rpc.call 580 @server_call = ActiveCall.new( 581 recvd_call, 582 @pass_through, 583 @pass_through, 584 deadline, 585 metadata_received: true, 586 started: false, 587 metadata_to_send: @server_to_client_metadata) 588 end 589 590 after(:each) do 591 # Send the requests and send a close so the server can send a status 592 @requests.each do |message| 593 @client_call.run_batch(CallOps::SEND_MESSAGE => message) 594 end 595 @client_call.run_batch(CallOps::SEND_CLOSE_FROM_CLIENT => nil) 596 597 @server_thread.join 598 599 # Expect that initial metadata was sent, 600 # the requests were echoed, and a status was sent 601 batch_result = @client_call.run_batch( 602 CallOps::RECV_INITIAL_METADATA => nil) 603 expect(batch_result.metadata).to eq(@server_to_client_metadata) 604 605 @requests.each do |message| 606 batch_result = @client_call.run_batch( 607 CallOps::RECV_MESSAGE => nil) 608 expect(batch_result.message).to eq(message) 609 end 610 611 batch_result = @client_call.run_batch( 612 CallOps::RECV_STATUS_ON_CLIENT => nil) 613 expect(batch_result.status.code).to eq(@server_status) 614 end 615 616 it 'sends the initial metadata implicitly if not already sent' do 617 # Server handler that doesn't have access to a "call" 618 # It echoes the requests 619 fake_gen_each_reply_with_no_call_param = proc do |msgs| 620 msgs 621 end 622 623 int_ctx = GRPC::InterceptionContext.new 624 625 @server_thread = Thread.new do 626 @server_call.run_server_bidi( 627 fake_gen_each_reply_with_no_call_param, int_ctx) 628 @server_call.send_status(@server_status) 629 end 630 end 631 632 it 'sends the metadata when sent explicitly and not already sent' do 633 # Fake server handler that has access to a "call" object and 634 # uses it to explicitly update and send the initial metadata 635 fake_gen_each_reply_with_call_param = proc do |msgs, call_param| 636 call_param.merge_metadata_to_send(@server_to_client_metadata) 637 call_param.send_initial_metadata 638 msgs 639 end 640 int_ctx = GRPC::InterceptionContext.new 641 642 @server_thread = Thread.new do 643 @server_call.run_server_bidi( 644 fake_gen_each_reply_with_call_param, int_ctx) 645 @server_call.send_status(@server_status) 646 end 647 end 648 end 649 650 def expect_server_to_receive(sent_text, **kw) 651 c = expect_server_to_be_invoked(**kw) 652 expect(c.remote_read).to eq(sent_text) 653 c 654 end 655 656 def expect_server_to_be_invoked(**kw) 657 recvd_rpc = @server.request_call 658 expect(recvd_rpc).to_not eq nil 659 recvd_call = recvd_rpc.call 660 recvd_call.run_batch(CallOps::SEND_INITIAL_METADATA => kw) 661 ActiveCall.new(recvd_call, @pass_through, @pass_through, deadline, 662 metadata_received: true, started: true) 663 end 664 665 def make_test_call 666 @ch.create_call(nil, nil, '/method', nil, deadline) 667 end 668 669 def deadline 670 Time.now + 2 # in 2 seconds; arbitrary 671 end 672end 673