# Copyright 2015 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require 'spec_helper' Thread.abort_on_exception = true def wakey_thread(&blk) n = GRPC::Notifier.new t = Thread.new do blk.call(n) end t.abort_on_exception = true n.wait t end def load_test_certs test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata') files = ['ca.pem', 'server1.key', 'server1.pem'] files.map { |f| File.open(File.join(test_root, f)).read } end include GRPC::Core::StatusCodes include GRPC::Core::TimeConsts include GRPC::Core::CallOps # check that methods on a finished/closed call t crash def check_op_view_of_finished_client_call(op_view, expected_metadata, expected_trailing_metadata) # use read_response_stream to try to iterate through # possible response stream fail('need something to attempt reads') unless block_given? expect do resp = op_view.execute yield resp end.to raise_error(GRPC::Core::CallError) expect { op_view.start_call }.to raise_error(RuntimeError) sanity_check_values_of_accessors(op_view, expected_metadata, expected_trailing_metadata) expect do op_view.wait op_view.cancel op_view.write_flag = 1 end.to_not raise_error end def sanity_check_values_of_accessors(op_view, expected_metadata, expected_trailing_metadata) expected_status = Struct::Status.new expected_status.code = 0 expected_status.details = 'OK' expected_status.metadata = expected_trailing_metadata expect(op_view.status).to eq(expected_status) expect(op_view.metadata).to eq(expected_metadata) expect(op_view.trailing_metadata).to eq(expected_trailing_metadata) expect(op_view.cancelled?).to be(false) expect(op_view.write_flag).to be(nil) # The deadline attribute of a call can be either # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive. # TODO: fix so that the accessor always returns the same type. expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) || op_view.deadline.is_a?(Time)).to be(true) end def close_active_server_call(active_server_call) active_server_call.send(:set_input_stream_done) active_server_call.send(:set_output_stream_done) end describe 'ClientStub' do # rubocop:disable Metrics/BlockLength let(:noop) { proc { |x| x } } before(:each) do Thread.abort_on_exception = true @server = nil @method = 'an_rpc_method' @pass = OK @fail = INTERNAL @metadata = { k1: 'v1', k2: 'v2' } end after(:each) do unless @server.nil? @server.shutdown_and_notify(from_relative_time(2)) @server.close end end describe '#new' do let(:fake_host) { 'localhost:0' } it 'can be created from a host and args' do opts = { channel_args: { a_channel_arg: 'an_arg' } } blk = proc do GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) end expect(&blk).not_to raise_error end it 'can be created with an channel override' do opts = { channel_args: { a_channel_arg: 'an_arg' }, channel_override: @ch } blk = proc do GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) end expect(&blk).not_to raise_error end it 'cannot be created with a bad channel override' do blk = proc do opts = { channel_args: { a_channel_arg: 'an_arg' }, channel_override: Object.new } GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts) end expect(&blk).to raise_error end it 'cannot be created with bad credentials' do blk = proc do opts = { channel_args: { a_channel_arg: 'an_arg' } } GRPC::ClientStub.new(fake_host, Object.new, **opts) end expect(&blk).to raise_error end it 'can be created with test test credentials' do certs = load_test_certs blk = proc do opts = { channel_args: { GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr', a_channel_arg: 'an_arg' } } creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil) GRPC::ClientStub.new(fake_host, creds, **opts) end expect(&blk).to_not raise_error end end describe '#request_response', request_response: true do before(:each) do @sent_msg, @resp = 'a_msg', 'a_reply' end shared_examples 'request response' do it 'should send a request to/receive a reply from a server' do server_port = create_test_server th = run_request_response(@sent_msg, @resp, @pass) stub = GRPC::ClientStub.new("localhost:#{server_port}", :this_channel_is_insecure) expect(get_response(stub)).to eq(@resp) th.join end def metadata_test(md) server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass, expected_metadata: md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) @metadata = md expect(get_response(stub)).to eq(@resp) th.join end it 'should send metadata to the server ok' do metadata_test(k1: 'v1', k2: 'v2') end # these tests mostly try to exercise when md might be allocated # instead of inlined it 'should send metadata with multiple large md to the server ok' do val_array = %w( '00000000000000000000000000000000000000000000000000000000000000', '11111111111111111111111111111111111111111111111111111111111111', '22222222222222222222222222222222222222222222222222222222222222', ) md = { k1: val_array, k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb', k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc', keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5', 'k66666666666666666666666666666666666666666666666666666' => 'v6', 'k77777777777777777777777777777777777777777777777777777' => 'v7', 'k88888888888888888888888888888888888888888888888888888' => 'v8' } metadata_test(md) end it 'should send a request when configured using an override channel' do server_port = create_test_server alt_host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @pass) ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure) stub = GRPC::ClientStub.new('ignored-host', :this_channel_is_insecure, channel_override: ch) expect(get_response(stub)).to eq(@resp) th.join end it 'should raise an error if the status is not OK' do server_port = create_test_server host = "localhost:#{server_port}" th = run_request_response(@sent_msg, @resp, @fail) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) blk = proc { get_response(stub) } expect(&blk).to raise_error(GRPC::BadStatus) th.join end it 'should receive UNAVAILABLE if call credentials plugin fails' do server_port = create_secure_test_server server_started_notifier = GRPC::Notifier.new th = Thread.new do @server.start server_started_notifier.notify(nil) # Poll on the server so that the client connection can proceed. # We don't expect the server to actually accept a call though. expect { @server.request_call }.to raise_error(GRPC::Core::CallError) end server_started_notifier.wait certs = load_test_certs secure_channel_creds = GRPC::Core::ChannelCredentials.new( certs[0], nil, nil) secure_stub_opts = { channel_args: { GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr' } } stub = GRPC::ClientStub.new("localhost:#{server_port}", secure_channel_creds, **secure_stub_opts) error_message = 'Failing call credentials callback' failing_auth = proc do fail error_message end creds = GRPC::Core::CallCredentials.new(failing_auth) unavailable_error_occurred = false begin get_response(stub, credentials: creds) rescue GRPC::Unavailable => e unavailable_error_occurred = true expect(e.details.include?(error_message)).to be true end expect(unavailable_error_occurred).to eq(true) @server.shutdown_and_notify(Time.now + 3) th.join @server.close end it 'should raise ArgumentError if metadata contains invalid values' do @metadata.merge!(k3: 3) server_port = create_test_server host = "localhost:#{server_port}" stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect do get_response(stub) end.to raise_error(ArgumentError, /Header values must be of type string or array/) end end describe 'without a call operation' do def get_response(stub, credentials: nil) GRPC.logger.info(credentials.inspect) stub.request_response(@method, @sent_msg, noop, noop, metadata: @metadata, credentials: credentials) end it_behaves_like 'request response' end describe 'via a call operation' do after(:each) do # make sure op.wait doesn't hang, even if there's a bad status @op.wait end def get_response(stub, run_start_call_first: false, credentials: nil) @op = stub.request_response(@method, @sent_msg, noop, noop, return_op: true, metadata: @metadata, deadline: from_relative_time(2), credentials: credentials) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first result = @op.execute result end it_behaves_like 'request response' def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } th = run_request_response( @sent_msg, @resp, @pass, expected_metadata: @metadata, server_initial_md: @server_initial_md, server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect( get_response(stub, run_start_call_first: run_start_call_first)).to eq(@resp) th.join end it 'sends metadata to the server ok when running start_call first' do run_op_view_metadata_test(true) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md ) { |r| GRPC.logger.info(r) } end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md ) { |r| GRPC.logger.info(r) } end end end describe '#client_streamer', client_streamer: true do before(:each) do Thread.abort_on_exception = true server_port = create_test_server host = "localhost:#{server_port}" @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @resp = 'a_reply' end shared_examples 'client streaming' do it 'should send requests to/receive a reply from a server' do th = run_client_streamer(@sent_msgs, @resp, @pass) expect(get_response(@stub)).to eq(@resp) th.join end it 'should send metadata to the server ok' do th = run_client_streamer(@sent_msgs, @resp, @pass, expected_metadata: @metadata) expect(get_response(@stub)).to eq(@resp) th.join end it 'should raise an error if the status is not ok' do th = run_client_streamer(@sent_msgs, @resp, @fail) blk = proc { get_response(@stub) } expect(&blk).to raise_error(GRPC::BadStatus) th.join end it 'should raise ArgumentError if metadata contains invalid values' do @metadata.merge!(k3: 3) expect do get_response(@stub) end.to raise_error(ArgumentError, /Header values must be of type string or array/) end end describe 'without a call operation' do def get_response(stub) stub.client_streamer(@method, @sent_msgs, noop, noop, metadata: @metadata) end it_behaves_like 'client streaming' end describe 'via a call operation' do after(:each) do # make sure op.wait doesn't hang, even if there's a bad status @op.wait end def get_response(stub, run_start_call_first: false) @op = stub.client_streamer(@method, @sent_msgs, noop, noop, return_op: true, metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first result = @op.execute result end it_behaves_like 'client streaming' def run_op_view_metadata_test(run_start_call_first) @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } th = run_client_streamer( @sent_msgs, @resp, @pass, expected_metadata: @metadata, server_initial_md: @server_initial_md, server_trailing_md: @server_trailing_md) expect( get_response(@stub, run_start_call_first: run_start_call_first)).to eq(@resp) th.join end it 'sends metadata to the server ok when running start_call first' do run_op_view_metadata_test(true) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md ) { |r| GRPC.logger.info(r) } end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md ) { |r| GRPC.logger.info(r) } end end end describe '#server_streamer', server_streamer: true do before(:each) do @sent_msg = 'a_msg' @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } end shared_examples 'server streaming' do it 'should send a request to/receive replies from a server' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @pass) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect(get_responses(stub).collect { |r| r }).to eq(@replys) th.join end it 'should raise an error if the status is not ok' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end it 'should send metadata to the server ok' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @fail, expected_metadata: { k1: 'v1', k2: 'v2' }) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end it 'should raise ArgumentError if metadata contains invalid values' do @metadata.merge!(k3: 3) server_port = create_test_server host = "localhost:#{server_port}" stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) expect do get_responses(stub).collect { |r| r } end.to raise_error(ArgumentError, /Header values must be of type string or array/) end def run_server_streamer_against_client_with_unmarshal_error( expected_input, replys) wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) expect(c.remote_read).to eq(expected_input) begin replys.each { |r| c.remote_send(r) } rescue GRPC::Core::CallError # An attempt to write to the client might fail. This is ok # because the client call is expected to fail when # unmarshalling the first response, and to cancel the call, # and there is a race as for when the server-side call will # start to fail. p 'remote_send failed (allowed because call expected to cancel)' ensure c.send_status(OK, 'OK', true) close_active_server_call(c) end end end it 'the call terminates when there is an unmarshalling error' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer_against_client_with_unmarshal_error( @sent_msg, @replys) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') } expect do get_responses(stub, unmarshal: unmarshal).collect { |r| r } end.to raise_error(ArgumentError, 'test unmarshalling error') th.join end end describe 'without a call operation' do def get_responses(stub, unmarshal: noop) e = stub.server_streamer(@method, @sent_msg, noop, unmarshal, metadata: @metadata) expect(e).to be_a(Enumerator) e end it_behaves_like 'server streaming' end describe 'via a call operation' do after(:each) do @op.wait # make sure wait doesn't hang end def get_responses(stub, run_start_call_first: false, unmarshal: noop) @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal, return_op: true, metadata: @metadata) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute expect(e).to be_a(Enumerator) e end it_behaves_like 'server streaming' def run_op_view_metadata_test(run_start_call_first) server_port = create_test_server host = "localhost:#{server_port}" @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } th = run_server_streamer( @sent_msg, @replys, @pass, expected_metadata: @metadata, server_initial_md: @server_initial_md, server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) e = get_responses(stub, run_start_call_first: run_start_call_first) expect(e.collect { |r| r }).to eq(@replys) th.join end it 'should send metadata to the server ok when start_call is run first' do run_op_view_metadata_test(true) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| GRPC.logger.info(r) } end end it 'does not crash when used after the call has been finished' do run_op_view_metadata_test(false) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| GRPC.logger.info(r) } end end it 'raises GRPC::Cancelled after the call has been cancelled' do server_port = create_test_server host = "localhost:#{server_port}" th = run_server_streamer(@sent_msg, @replys, @pass) stub = GRPC::ClientStub.new(host, :this_channel_is_insecure) resp = get_responses(stub, run_start_call_first: false) expect(resp.next).to eq('reply_1') @op.cancel expect { resp.next }.to raise_error(GRPC::Cancelled) th.join end end end describe '#bidi_streamer', bidi: true do before(:each) do @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s } @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s } server_port = create_test_server @host = "localhost:#{server_port}" end shared_examples 'bidi streaming' do it 'supports sending all the requests first' do th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys, @pass) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@replys) th.join end it 'supports client-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'supports a server-initiated ping pong' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'should raise an error if the status is not ok' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus) th.join end it 'should raise ArgumentError if metadata contains invalid values' do @metadata.merge!(k3: 3) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) expect do get_responses(stub).collect { |r| r } end.to raise_error(ArgumentError, /Header values must be of type string or array/) end it 'terminates if the call fails to start' do # don't start the server stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) expect do get_responses(stub, deadline: from_relative_time(0)).collect { |r| r } end.to raise_error(GRPC::BadStatus) end it 'should send metadata to the server ok' do th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true, expected_metadata: @metadata) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end # Prompted by grpc/github #10526 describe 'surfacing of errors when sending requests' do def run_server_bidi_send_one_then_read_indefinitely @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('server response') loop do m = server_call.remote_read break if m.nil? end # can't fail since initial metadata already sent server_call.send_status(@pass, 'OK', true) close_active_server_call(server_call) end def verify_error_from_write_thread(stub, requests_to_push, request_queue, expected_description) # TODO: an improvement might be to raise the original exception from # bidi call write loops instead of only cancelling the call failing_marshal_proc = proc do |req| fail req if req.is_a?(StandardError) req end begin e = get_responses(stub, marshal_proc: failing_marshal_proc) first_response = e.next expect(first_response).to eq('server response') requests_to_push.each { |req| request_queue.push(req) } e.collect { |r| r } rescue GRPC::Unknown => e exception = e end expect(exception.message.include?(expected_description)).to be(true) end # Provides an Enumerable view of a Queue class BidiErrorTestingEnumerateForeverQueue def initialize(queue) @queue = queue end def each loop do msg = @queue.pop yield msg end end end def run_error_in_client_request_stream_test(requests_to_push, expected_error_message) # start a server that waits on a read indefinitely - it should # see a cancellation and be able to break out th = Thread.new { run_server_bidi_send_one_then_read_indefinitely } stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) request_queue = Queue.new @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue) verify_error_from_write_thread(stub, requests_to_push, request_queue, expected_error_message) # the write loop errror should cancel the call and end the # server's request stream th.join end it 'non-GRPC errors from the write loop surface when raised ' \ 'at the start of a request stream' do expected_error_message = 'expect error on first request' requests_to_push = [StandardError.new(expected_error_message)] run_error_in_client_request_stream_test(requests_to_push, expected_error_message) end it 'non-GRPC errors from the write loop surface when raised ' \ 'during the middle of a request stream' do expected_error_message = 'expect error on last request' requests_to_push = %w( one two ) requests_to_push << StandardError.new(expected_error_message) run_error_in_client_request_stream_test(requests_to_push, expected_error_message) end end # Prompted by grpc/github #14853 describe 'client-side error handling on bidi streams' do class EnumeratorQueue def initialize(queue) @queue = queue end def each loop do msg = @queue.pop break if msg.nil? yield msg end end end def run_server_bidi_shutdown_after_one_read @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) expect(server_call.remote_read).to eq('first message') @server.shutdown_and_notify(from_relative_time(0)) @server.close end it 'receives a grpc status code when writes to a bidi stream fail' do # This test tries to trigger the case when a 'SEND_MESSAGE' op # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails. # In this case, iteration through the response stream should result # in a grpc status code, and the writer thread should not raise an # exception. server_thread = Thread.new do run_server_bidi_shutdown_after_one_read end stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) request_queue = Queue.new @sent_msgs = EnumeratorQueue.new(request_queue) responses = get_responses(stub) request_queue.push('first message') # Now wait for the server to shut down. server_thread.join # Sanity check. This test is not interesting if # Thread.abort_on_exception is not set. expect(Thread.abort_on_exception).to be(true) # An attempt to send a second message should fail now that the # server is down. request_queue.push('second message') request_queue.push(nil) expect { responses.next }.to raise_error(GRPC::BadStatus) end def run_server_bidi_shutdown_after_one_write @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('message') @server.shutdown_and_notify(from_relative_time(0)) @server.close end it 'receives a grpc status code when reading from a failed bidi call' do server_thread = Thread.new do run_server_bidi_shutdown_after_one_write end stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) request_queue = Queue.new @sent_msgs = EnumeratorQueue.new(request_queue) responses = get_responses(stub) expect(responses.next).to eq('message') # Wait for the server to shut down server_thread.join expect { responses.next }.to raise_error(GRPC::BadStatus) # Push a sentinel to allow the writer thread to finish request_queue.push(nil) end end end describe 'without a call operation' do def get_responses(stub, deadline: nil, marshal_proc: noop) e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, metadata: @metadata, deadline: deadline) expect(e).to be_a(Enumerator) e end it_behaves_like 'bidi streaming' end describe 'via a call operation' do after(:each) do @op.wait # make sure wait doesn't hang end def get_responses(stub, run_start_call_first: false, deadline: nil, marshal_proc: noop) @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop, return_op: true, metadata: @metadata, deadline: deadline) expect(@op).to be_a(GRPC::ActiveCall::Operation) @op.start_call if run_start_call_first e = @op.execute expect(e).to be_a(Enumerator) e end it_behaves_like 'bidi streaming' def run_op_view_metadata_test(run_start_call_first) @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' } @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' } th = run_bidi_streamer_echo_ping_pong( @sent_msgs, @pass, true, expected_metadata: @metadata, server_initial_md: @server_initial_md, server_trailing_md: @server_trailing_md) stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) e = get_responses(stub, run_start_call_first: run_start_call_first) expect(e.collect { |r| r }).to eq(@sent_msgs) th.join end it 'can run start_call before executing the call' do run_op_view_metadata_test(true) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| GRPC.logger.info(r) } end end it 'doesnt crash when op_view used after call has finished' do run_op_view_metadata_test(false) check_op_view_of_finished_client_call( @op, @server_initial_md, @server_trailing_md) do |responses| responses.each { |r| GRPC.logger.info(r) } end end def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) @server.start recvd_rpc = @server.request_call recvd_call = recvd_rpc.call server_call = GRPC::ActiveCall.new( recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true, started: false) server_call.send_initial_metadata server_call.remote_send('server call received') wait_for_shutdown_ok_callback.call # since the client is cancelling the call, # we should be able to shut down cleanly @server.shutdown_and_notify(nil) @server.close end it 'receives a grpc status code when reading from a cancelled bidi call' do # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or # 'RECV_MESSAGE' op failure. # An attempt to read a message might fail; in that case, iteration # through the response stream should still result in a grpc status. server_can_shutdown = false server_can_shutdown_mu = Mutex.new server_can_shutdown_cv = ConditionVariable.new wait_for_shutdown_ok_callback = proc do server_can_shutdown_mu.synchronize do server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown end end server_thread = Thread.new do run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback) end stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure) request_queue = Queue.new @sent_msgs = EnumeratorQueue.new(request_queue) responses = get_responses(stub) expect(responses.next).to eq('server call received') @op.cancel expect { responses.next }.to raise_error(GRPC::Cancelled) # Now let the server proceed to shut down. server_can_shutdown_mu.synchronize do server_can_shutdown = true server_can_shutdown_cv.broadcast end server_thread.join # Push a sentinel to allow the writer thread to finish request_queue.push(nil) end end end def run_server_streamer(expected_input, replys, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expect(c.remote_read).to eq(expected_input) replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end def run_bidi_streamer_handle_inputs_first(expected_inputs, replys, status) wakey_thread do |notifier| c = expect_server_to_be_invoked(notifier) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } replys.each { |r| c.remote_send(r) } c.send_status(status, status == @pass ? 'OK' : 'NOK', true) close_active_server_call(c) end end def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end expected_inputs.each do |i| if client_starts expect(c.remote_read).to eq(i) c.remote_send(i) else c.remote_send(i) expect(c.remote_read).to eq(i) end end c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end def run_client_streamer(expected_inputs, resp, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) expected_inputs.each { |i| expect(c.remote_read).to eq(i) } wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end def run_request_response(expected_input, resp, status, expected_metadata: {}, server_initial_md: {}, server_trailing_md: {}) wanted_metadata = expected_metadata.clone wakey_thread do |notifier| c = expect_server_to_be_invoked( notifier, metadata_to_send: server_initial_md) expect(c.remote_read).to eq(expected_input) wanted_metadata.each do |k, v| expect(c.metadata[k.to_s]).to eq(v) end c.remote_send(resp) c.send_status(status, status == @pass ? 'OK' : 'NOK', true, metadata: server_trailing_md) close_active_server_call(c) end end def create_secure_test_server certs = load_test_certs secure_credentials = GRPC::Core::ServerCredentials.new( nil, [{ private_key: certs[1], cert_chain: certs[2] }], false) @server = new_core_server_for_testing(nil) @server.add_http2_port('0.0.0.0:0', secure_credentials) end def create_test_server @server = new_core_server_for_testing(nil) @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure) end def expect_server_to_be_invoked(notifier, metadata_to_send: nil) @server.start notifier.notify(nil) recvd_rpc = @server.request_call recvd_call = recvd_rpc.call recvd_call.metadata = recvd_rpc.metadata recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send) GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE, metadata_received: true) end end