• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15require 'spec_helper'
16
17Thread.abort_on_exception = true
18
19def wakey_thread(&blk)
20  n = GRPC::Notifier.new
21  t = Thread.new do
22    blk.call(n)
23  end
24  t.abort_on_exception = true
25  n.wait
26  t
27end
28
29def load_test_certs
30  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
31  files = ['ca.pem', 'server1.key', 'server1.pem']
32  files.map { |f| File.open(File.join(test_root, f)).read }
33end
34
35include GRPC::Core::StatusCodes
36include GRPC::Core::TimeConsts
37include GRPC::Core::CallOps
38
39# check that methods on a finished/closed call t crash
40def check_op_view_of_finished_client_call(op_view,
41                                          expected_metadata,
42                                          expected_trailing_metadata)
43  # use read_response_stream to try to iterate through
44  # possible response stream
45  fail('need something to attempt reads') unless block_given?
46  expect do
47    resp = op_view.execute
48    yield resp
49  end.to raise_error(GRPC::Core::CallError)
50
51  expect { op_view.start_call }.to raise_error(RuntimeError)
52
53  sanity_check_values_of_accessors(op_view,
54                                   expected_metadata,
55                                   expected_trailing_metadata)
56
57  expect do
58    op_view.wait
59    op_view.cancel
60    op_view.write_flag = 1
61  end.to_not raise_error
62end
63
64def sanity_check_values_of_accessors(op_view,
65                                     expected_metadata,
66                                     expected_trailing_metadata)
67  expected_status = Struct::Status.new
68  expected_status.code = 0
69  expected_status.details = 'OK'
70  expected_status.metadata = expected_trailing_metadata
71
72  expect(op_view.status).to eq(expected_status)
73  expect(op_view.metadata).to eq(expected_metadata)
74  expect(op_view.trailing_metadata).to eq(expected_trailing_metadata)
75
76  expect(op_view.cancelled?).to be(false)
77  expect(op_view.write_flag).to be(nil)
78
79  # The deadline attribute of a call can be either
80  # a GRPC::Core::TimeSpec or a Time, which are mutually exclusive.
81  # TODO: fix so that the accessor always returns the same type.
82  expect(op_view.deadline.is_a?(GRPC::Core::TimeSpec) ||
83         op_view.deadline.is_a?(Time)).to be(true)
84end
85
86def close_active_server_call(active_server_call)
87  active_server_call.send(:set_input_stream_done)
88  active_server_call.send(:set_output_stream_done)
89end
90
91describe 'ClientStub' do  # rubocop:disable Metrics/BlockLength
92  let(:noop) { proc { |x| x } }
93
94  before(:each) do
95    Thread.abort_on_exception = true
96    @server = nil
97    @method = 'an_rpc_method'
98    @pass = OK
99    @fail = INTERNAL
100    @metadata = { k1: 'v1', k2: 'v2' }
101  end
102
103  after(:each) do
104    unless @server.nil?
105      @server.shutdown_and_notify(from_relative_time(2))
106      @server.close
107    end
108  end
109
110  describe '#new' do
111    let(:fake_host) { 'localhost:0' }
112    it 'can be created from a host and args' do
113      opts = { channel_args: { a_channel_arg: 'an_arg' } }
114      blk = proc do
115        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
116      end
117      expect(&blk).not_to raise_error
118    end
119
120    it 'can be created with an channel override' do
121      opts = {
122        channel_args: { a_channel_arg: 'an_arg' },
123        channel_override: @ch
124      }
125      blk = proc do
126        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
127      end
128      expect(&blk).not_to raise_error
129    end
130
131    it 'cannot be created with a bad channel override' do
132      blk = proc do
133        opts = {
134          channel_args: { a_channel_arg: 'an_arg' },
135          channel_override: Object.new
136        }
137        GRPC::ClientStub.new(fake_host, :this_channel_is_insecure, **opts)
138      end
139      expect(&blk).to raise_error
140    end
141
142    it 'cannot be created with bad credentials' do
143      blk = proc do
144        opts = { channel_args: { a_channel_arg: 'an_arg' } }
145        GRPC::ClientStub.new(fake_host, Object.new, **opts)
146      end
147      expect(&blk).to raise_error
148    end
149
150    it 'can be created with test test credentials' do
151      certs = load_test_certs
152      blk = proc do
153        opts = {
154          channel_args: {
155            GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr',
156            a_channel_arg: 'an_arg'
157          }
158        }
159        creds = GRPC::Core::ChannelCredentials.new(certs[0], nil, nil)
160        GRPC::ClientStub.new(fake_host, creds,  **opts)
161      end
162      expect(&blk).to_not raise_error
163    end
164  end
165
166  describe '#request_response', request_response: true do
167    before(:each) do
168      @sent_msg, @resp = 'a_msg', 'a_reply'
169    end
170
171    shared_examples 'request response' do
172      it 'should send a request to/receive a reply from a server' do
173        server_port = create_test_server
174        th = run_request_response(@sent_msg, @resp, @pass)
175        stub = GRPC::ClientStub.new("localhost:#{server_port}",
176                                    :this_channel_is_insecure)
177        expect(get_response(stub)).to eq(@resp)
178        th.join
179      end
180
181      def metadata_test(md)
182        server_port = create_test_server
183        host = "localhost:#{server_port}"
184        th = run_request_response(@sent_msg, @resp, @pass,
185                                  expected_metadata: md)
186        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
187        @metadata = md
188        expect(get_response(stub)).to eq(@resp)
189        th.join
190      end
191
192      it 'should send metadata to the server ok' do
193        metadata_test(k1: 'v1', k2: 'v2')
194      end
195
196      # these tests mostly try to exercise when md might be allocated
197      # instead of inlined
198      it 'should send metadata with multiple large md to the server ok' do
199        val_array = %w(
200          '00000000000000000000000000000000000000000000000000000000000000',
201          '11111111111111111111111111111111111111111111111111111111111111',
202          '22222222222222222222222222222222222222222222222222222222222222',
203        )
204        md = {
205          k1: val_array,
206          k2: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa',
207          k3: 'bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb',
208          k4: 'cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc',
209          keeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey5: 'v5',
210          'k66666666666666666666666666666666666666666666666666666' => 'v6',
211          'k77777777777777777777777777777777777777777777777777777' => 'v7',
212          'k88888888888888888888888888888888888888888888888888888' => 'v8'
213        }
214        metadata_test(md)
215      end
216
217      it 'should send a request when configured using an override channel' do
218        server_port = create_test_server
219        alt_host = "localhost:#{server_port}"
220        th = run_request_response(@sent_msg, @resp, @pass)
221        ch = GRPC::Core::Channel.new(alt_host, nil, :this_channel_is_insecure)
222        stub = GRPC::ClientStub.new('ignored-host',
223                                    :this_channel_is_insecure,
224                                    channel_override: ch)
225        expect(get_response(stub)).to eq(@resp)
226        th.join
227      end
228
229      it 'should raise an error if the status is not OK' do
230        server_port = create_test_server
231        host = "localhost:#{server_port}"
232        th = run_request_response(@sent_msg, @resp, @fail)
233        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
234        blk = proc { get_response(stub) }
235        expect(&blk).to raise_error(GRPC::BadStatus)
236        th.join
237      end
238
239      it 'should receive UNAVAILABLE if call credentials plugin fails' do
240        server_port = create_secure_test_server
241        server_started_notifier = GRPC::Notifier.new
242        th = Thread.new do
243          @server.start
244          server_started_notifier.notify(nil)
245          # Poll on the server so that the client connection can proceed.
246          # We don't expect the server to actually accept a call though.
247          expect { @server.request_call }.to raise_error(GRPC::Core::CallError)
248        end
249        server_started_notifier.wait
250
251        certs = load_test_certs
252        secure_channel_creds = GRPC::Core::ChannelCredentials.new(
253          certs[0], nil, nil)
254        secure_stub_opts = {
255          channel_args: {
256            GRPC::Core::Channel::SSL_TARGET => 'foo.test.google.fr'
257          }
258        }
259        stub = GRPC::ClientStub.new("localhost:#{server_port}",
260                                    secure_channel_creds, **secure_stub_opts)
261
262        error_message = 'Failing call credentials callback'
263        failing_auth = proc do
264          fail error_message
265        end
266        creds = GRPC::Core::CallCredentials.new(failing_auth)
267
268        unavailable_error_occurred = false
269        begin
270          get_response(stub, credentials: creds)
271        rescue GRPC::Unavailable => e
272          unavailable_error_occurred = true
273          expect(e.details.include?(error_message)).to be true
274        end
275        expect(unavailable_error_occurred).to eq(true)
276
277        @server.shutdown_and_notify(Time.now + 3)
278        th.join
279        @server.close
280      end
281
282      it 'should raise ArgumentError if metadata contains invalid values' do
283        @metadata.merge!(k3: 3)
284        server_port = create_test_server
285        host = "localhost:#{server_port}"
286        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
287        expect do
288          get_response(stub)
289        end.to raise_error(ArgumentError,
290                           /Header values must be of type string or array/)
291      end
292    end
293
294    describe 'without a call operation' do
295      def get_response(stub, credentials: nil)
296        GRPC.logger.info(credentials.inspect)
297        stub.request_response(@method, @sent_msg, noop, noop,
298                              metadata: @metadata,
299                              credentials: credentials)
300      end
301
302      it_behaves_like 'request response'
303    end
304
305    describe 'via a call operation' do
306      after(:each) do
307        # make sure op.wait doesn't hang, even if there's a bad status
308        @op.wait
309      end
310      def get_response(stub, run_start_call_first: false, credentials: nil)
311        @op = stub.request_response(@method, @sent_msg, noop, noop,
312                                    return_op: true,
313                                    metadata: @metadata,
314                                    deadline: from_relative_time(2),
315                                    credentials: credentials)
316        expect(@op).to be_a(GRPC::ActiveCall::Operation)
317        @op.start_call if run_start_call_first
318        result = @op.execute
319        result
320      end
321
322      it_behaves_like 'request response'
323
324      def run_op_view_metadata_test(run_start_call_first)
325        server_port = create_test_server
326        host = "localhost:#{server_port}"
327
328        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
329        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
330        th = run_request_response(
331          @sent_msg, @resp, @pass,
332          expected_metadata: @metadata,
333          server_initial_md: @server_initial_md,
334          server_trailing_md: @server_trailing_md)
335        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
336        expect(
337          get_response(stub,
338                       run_start_call_first: run_start_call_first)).to eq(@resp)
339        th.join
340      end
341
342      it 'sends metadata to the server ok when running start_call first' do
343        run_op_view_metadata_test(true)
344        check_op_view_of_finished_client_call(
345          @op, @server_initial_md, @server_trailing_md
346        ) { |r| GRPC.logger.info(r) }
347      end
348
349      it 'does not crash when used after the call has been finished' do
350        run_op_view_metadata_test(false)
351        check_op_view_of_finished_client_call(
352          @op, @server_initial_md, @server_trailing_md
353        ) { |r| GRPC.logger.info(r) }
354      end
355    end
356  end
357
358  describe '#client_streamer', client_streamer: true do
359    before(:each) do
360      Thread.abort_on_exception = true
361      server_port = create_test_server
362      host = "localhost:#{server_port}"
363      @stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
364      @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
365      @resp = 'a_reply'
366    end
367
368    shared_examples 'client streaming' do
369      it 'should send requests to/receive a reply from a server' do
370        th = run_client_streamer(@sent_msgs, @resp, @pass)
371        expect(get_response(@stub)).to eq(@resp)
372        th.join
373      end
374
375      it 'should send metadata to the server ok' do
376        th = run_client_streamer(@sent_msgs, @resp, @pass,
377                                 expected_metadata: @metadata)
378        expect(get_response(@stub)).to eq(@resp)
379        th.join
380      end
381
382      it 'should raise an error if the status is not ok' do
383        th = run_client_streamer(@sent_msgs, @resp, @fail)
384        blk = proc { get_response(@stub) }
385        expect(&blk).to raise_error(GRPC::BadStatus)
386        th.join
387      end
388
389      it 'should raise ArgumentError if metadata contains invalid values' do
390        @metadata.merge!(k3: 3)
391        expect do
392          get_response(@stub)
393        end.to raise_error(ArgumentError,
394                           /Header values must be of type string or array/)
395      end
396    end
397
398    describe 'without a call operation' do
399      def get_response(stub)
400        stub.client_streamer(@method, @sent_msgs, noop, noop,
401                             metadata: @metadata)
402      end
403
404      it_behaves_like 'client streaming'
405    end
406
407    describe 'via a call operation' do
408      after(:each) do
409        # make sure op.wait doesn't hang, even if there's a bad status
410        @op.wait
411      end
412      def get_response(stub, run_start_call_first: false)
413        @op = stub.client_streamer(@method, @sent_msgs, noop, noop,
414                                   return_op: true, metadata: @metadata)
415        expect(@op).to be_a(GRPC::ActiveCall::Operation)
416        @op.start_call if run_start_call_first
417        result = @op.execute
418        result
419      end
420
421      it_behaves_like 'client streaming'
422
423      def run_op_view_metadata_test(run_start_call_first)
424        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
425        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
426        th = run_client_streamer(
427          @sent_msgs, @resp, @pass,
428          expected_metadata: @metadata,
429          server_initial_md: @server_initial_md,
430          server_trailing_md: @server_trailing_md)
431        expect(
432          get_response(@stub,
433                       run_start_call_first: run_start_call_first)).to eq(@resp)
434        th.join
435      end
436
437      it 'sends metadata to the server ok when running start_call first' do
438        run_op_view_metadata_test(true)
439        check_op_view_of_finished_client_call(
440          @op, @server_initial_md, @server_trailing_md
441        ) { |r| GRPC.logger.info(r) }
442      end
443
444      it 'does not crash when used after the call has been finished' do
445        run_op_view_metadata_test(false)
446        check_op_view_of_finished_client_call(
447          @op, @server_initial_md, @server_trailing_md
448        ) { |r| GRPC.logger.info(r) }
449      end
450    end
451  end
452
453  describe '#server_streamer', server_streamer: true do
454    before(:each) do
455      @sent_msg = 'a_msg'
456      @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
457    end
458
459    shared_examples 'server streaming' do
460      it 'should send a request to/receive replies from a server' do
461        server_port = create_test_server
462        host = "localhost:#{server_port}"
463        th = run_server_streamer(@sent_msg, @replys, @pass)
464        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
465        expect(get_responses(stub).collect { |r| r }).to eq(@replys)
466        th.join
467      end
468
469      it 'should raise an error if the status is not ok' do
470        server_port = create_test_server
471        host = "localhost:#{server_port}"
472        th = run_server_streamer(@sent_msg, @replys, @fail)
473        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
474        e = get_responses(stub)
475        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
476        th.join
477      end
478
479      it 'should send metadata to the server ok' do
480        server_port = create_test_server
481        host = "localhost:#{server_port}"
482        th = run_server_streamer(@sent_msg, @replys, @fail,
483                                 expected_metadata: { k1: 'v1', k2: 'v2' })
484        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
485        e = get_responses(stub)
486        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
487        th.join
488      end
489
490      it 'should raise ArgumentError if metadata contains invalid values' do
491        @metadata.merge!(k3: 3)
492        server_port = create_test_server
493        host = "localhost:#{server_port}"
494        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
495        expect do
496          get_responses(stub).collect { |r| r }
497        end.to raise_error(ArgumentError,
498                           /Header values must be of type string or array/)
499      end
500
501      def run_server_streamer_against_client_with_unmarshal_error(
502        expected_input, replys)
503        wakey_thread do |notifier|
504          c = expect_server_to_be_invoked(notifier)
505          expect(c.remote_read).to eq(expected_input)
506          begin
507            replys.each { |r| c.remote_send(r) }
508          rescue GRPC::Core::CallError
509            # An attempt to write to the client might fail. This is ok
510            # because the client call is expected to fail when
511            # unmarshalling the first response, and to cancel the call,
512            # and there is a race as for when the server-side call will
513            # start to fail.
514            p 'remote_send failed (allowed because call expected to cancel)'
515          ensure
516            c.send_status(OK, 'OK', true)
517            close_active_server_call(c)
518          end
519        end
520      end
521
522      it 'the call terminates when there is an unmarshalling error' do
523        server_port = create_test_server
524        host = "localhost:#{server_port}"
525        th = run_server_streamer_against_client_with_unmarshal_error(
526          @sent_msg, @replys)
527        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
528
529        unmarshal = proc { fail(ArgumentError, 'test unmarshalling error') }
530        expect do
531          get_responses(stub, unmarshal: unmarshal).collect { |r| r }
532        end.to raise_error(ArgumentError, 'test unmarshalling error')
533        th.join
534      end
535    end
536
537    describe 'without a call operation' do
538      def get_responses(stub, unmarshal: noop)
539        e = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
540                                 metadata: @metadata)
541        expect(e).to be_a(Enumerator)
542        e
543      end
544
545      it_behaves_like 'server streaming'
546    end
547
548    describe 'via a call operation' do
549      after(:each) do
550        @op.wait # make sure wait doesn't hang
551      end
552      def get_responses(stub, run_start_call_first: false, unmarshal: noop)
553        @op = stub.server_streamer(@method, @sent_msg, noop, unmarshal,
554                                   return_op: true,
555                                   metadata: @metadata)
556        expect(@op).to be_a(GRPC::ActiveCall::Operation)
557        @op.start_call if run_start_call_first
558        e = @op.execute
559        expect(e).to be_a(Enumerator)
560        e
561      end
562
563      it_behaves_like 'server streaming'
564
565      def run_op_view_metadata_test(run_start_call_first)
566        server_port = create_test_server
567        host = "localhost:#{server_port}"
568        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
569        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
570        th = run_server_streamer(
571          @sent_msg, @replys, @pass,
572          expected_metadata: @metadata,
573          server_initial_md: @server_initial_md,
574          server_trailing_md: @server_trailing_md)
575        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
576        e = get_responses(stub, run_start_call_first: run_start_call_first)
577        expect(e.collect { |r| r }).to eq(@replys)
578        th.join
579      end
580
581      it 'should send metadata to the server ok when start_call is run first' do
582        run_op_view_metadata_test(true)
583        check_op_view_of_finished_client_call(
584          @op, @server_initial_md, @server_trailing_md) do |responses|
585          responses.each { |r| GRPC.logger.info(r) }
586        end
587      end
588
589      it 'does not crash when used after the call has been finished' do
590        run_op_view_metadata_test(false)
591        check_op_view_of_finished_client_call(
592          @op, @server_initial_md, @server_trailing_md) do |responses|
593          responses.each { |r| GRPC.logger.info(r) }
594        end
595      end
596
597      it 'raises GRPC::Cancelled after the call has been cancelled' do
598        server_port = create_test_server
599        host = "localhost:#{server_port}"
600        th = run_server_streamer(@sent_msg, @replys, @pass)
601        stub = GRPC::ClientStub.new(host, :this_channel_is_insecure)
602        resp = get_responses(stub, run_start_call_first: false)
603        expect(resp.next).to eq('reply_1')
604        @op.cancel
605        expect { resp.next }.to raise_error(GRPC::Cancelled)
606        th.join
607      end
608    end
609  end
610
611  describe '#bidi_streamer', bidi: true do
612    before(:each) do
613      @sent_msgs = Array.new(3) { |i| 'msg_' + (i + 1).to_s }
614      @replys = Array.new(3) { |i| 'reply_' + (i + 1).to_s }
615      server_port = create_test_server
616      @host = "localhost:#{server_port}"
617    end
618
619    shared_examples 'bidi streaming' do
620      it 'supports sending all the requests first' do
621        th = run_bidi_streamer_handle_inputs_first(@sent_msgs, @replys,
622                                                   @pass)
623        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
624        e = get_responses(stub)
625        expect(e.collect { |r| r }).to eq(@replys)
626        th.join
627      end
628
629      it 'supports client-initiated ping pong' do
630        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true)
631        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
632        e = get_responses(stub)
633        expect(e.collect { |r| r }).to eq(@sent_msgs)
634        th.join
635      end
636
637      it 'supports a server-initiated ping pong' do
638        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, false)
639        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
640        e = get_responses(stub)
641        expect(e.collect { |r| r }).to eq(@sent_msgs)
642        th.join
643      end
644
645      it 'should raise an error if the status is not ok' do
646        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @fail, false)
647        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
648        e = get_responses(stub)
649        expect { e.collect { |r| r } }.to raise_error(GRPC::BadStatus)
650        th.join
651      end
652
653      it 'should raise ArgumentError if metadata contains invalid values' do
654        @metadata.merge!(k3: 3)
655        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
656        expect do
657          get_responses(stub).collect { |r| r }
658        end.to raise_error(ArgumentError,
659                           /Header values must be of type string or array/)
660      end
661
662      it 'terminates if the call fails to start' do
663        # don't start the server
664        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
665        expect do
666          get_responses(stub, deadline: from_relative_time(0)).collect { |r| r }
667        end.to raise_error(GRPC::BadStatus)
668      end
669
670      it 'should send metadata to the server ok' do
671        th = run_bidi_streamer_echo_ping_pong(@sent_msgs, @pass, true,
672                                              expected_metadata: @metadata)
673        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
674        e = get_responses(stub)
675        expect(e.collect { |r| r }).to eq(@sent_msgs)
676        th.join
677      end
678
679      # Prompted by grpc/github #10526
680      describe 'surfacing of errors when sending requests' do
681        def run_server_bidi_send_one_then_read_indefinitely
682          @server.start
683          recvd_rpc = @server.request_call
684          recvd_call = recvd_rpc.call
685          server_call = GRPC::ActiveCall.new(
686            recvd_call, noop, noop, INFINITE_FUTURE,
687            metadata_received: true, started: false)
688          server_call.send_initial_metadata
689          server_call.remote_send('server response')
690          loop do
691            m = server_call.remote_read
692            break if m.nil?
693          end
694          # can't fail since initial metadata already sent
695          server_call.send_status(@pass, 'OK', true)
696          close_active_server_call(server_call)
697        end
698
699        def verify_error_from_write_thread(stub, requests_to_push,
700                                           request_queue, expected_description)
701          # TODO: an improvement might be to raise the original exception from
702          # bidi call write loops instead of only cancelling the call
703          failing_marshal_proc = proc do |req|
704            fail req if req.is_a?(StandardError)
705            req
706          end
707          begin
708            e = get_responses(stub, marshal_proc: failing_marshal_proc)
709            first_response = e.next
710            expect(first_response).to eq('server response')
711            requests_to_push.each { |req| request_queue.push(req) }
712            e.collect { |r| r }
713          rescue GRPC::Unknown => e
714            exception = e
715          end
716          expect(exception.message.include?(expected_description)).to be(true)
717        end
718
719        # Provides an Enumerable view of a Queue
720        class BidiErrorTestingEnumerateForeverQueue
721          def initialize(queue)
722            @queue = queue
723          end
724
725          def each
726            loop do
727              msg = @queue.pop
728              yield msg
729            end
730          end
731        end
732
733        def run_error_in_client_request_stream_test(requests_to_push,
734                                                    expected_error_message)
735          # start a server that waits on a read indefinitely - it should
736          # see a cancellation and be able to break out
737          th = Thread.new { run_server_bidi_send_one_then_read_indefinitely }
738          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
739
740          request_queue = Queue.new
741          @sent_msgs = BidiErrorTestingEnumerateForeverQueue.new(request_queue)
742
743          verify_error_from_write_thread(stub,
744                                         requests_to_push,
745                                         request_queue,
746                                         expected_error_message)
747          # the write loop errror should cancel the call and end the
748          # server's request stream
749          th.join
750        end
751
752        it 'non-GRPC errors from the write loop surface when raised ' \
753          'at the start of a request stream' do
754          expected_error_message = 'expect error on first request'
755          requests_to_push = [StandardError.new(expected_error_message)]
756          run_error_in_client_request_stream_test(requests_to_push,
757                                                  expected_error_message)
758        end
759
760        it 'non-GRPC errors from the write loop surface when raised ' \
761          'during the middle of a request stream' do
762          expected_error_message = 'expect error on last request'
763          requests_to_push = %w( one two )
764          requests_to_push << StandardError.new(expected_error_message)
765          run_error_in_client_request_stream_test(requests_to_push,
766                                                  expected_error_message)
767        end
768      end
769
770      # Prompted by grpc/github #14853
771      describe 'client-side error handling on bidi streams' do
772        class EnumeratorQueue
773          def initialize(queue)
774            @queue = queue
775          end
776
777          def each
778            loop do
779              msg = @queue.pop
780              break if msg.nil?
781              yield msg
782            end
783          end
784        end
785
786        def run_server_bidi_shutdown_after_one_read
787          @server.start
788          recvd_rpc = @server.request_call
789          recvd_call = recvd_rpc.call
790          server_call = GRPC::ActiveCall.new(
791            recvd_call, noop, noop, INFINITE_FUTURE,
792            metadata_received: true, started: false)
793          expect(server_call.remote_read).to eq('first message')
794          @server.shutdown_and_notify(from_relative_time(0))
795          @server.close
796        end
797
798        it 'receives a grpc status code when writes to a bidi stream fail' do
799          # This test tries to trigger the case when a 'SEND_MESSAGE' op
800          # and subseqeunt 'SEND_CLOSE_FROM_CLIENT' op of a bidi stream fails.
801          # In this case, iteration through the response stream should result
802          # in a grpc status code, and the writer thread should not raise an
803          # exception.
804          server_thread = Thread.new do
805            run_server_bidi_shutdown_after_one_read
806          end
807          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
808          request_queue = Queue.new
809          @sent_msgs = EnumeratorQueue.new(request_queue)
810          responses = get_responses(stub)
811          request_queue.push('first message')
812          # Now wait for the server to shut down.
813          server_thread.join
814          # Sanity check. This test is not interesting if
815          # Thread.abort_on_exception is not set.
816          expect(Thread.abort_on_exception).to be(true)
817          # An attempt to send a second message should fail now that the
818          # server is down.
819          request_queue.push('second message')
820          request_queue.push(nil)
821          expect { responses.next }.to raise_error(GRPC::BadStatus)
822        end
823
824        def run_server_bidi_shutdown_after_one_write
825          @server.start
826          recvd_rpc = @server.request_call
827          recvd_call = recvd_rpc.call
828          server_call = GRPC::ActiveCall.new(
829            recvd_call, noop, noop, INFINITE_FUTURE,
830            metadata_received: true, started: false)
831          server_call.send_initial_metadata
832          server_call.remote_send('message')
833          @server.shutdown_and_notify(from_relative_time(0))
834          @server.close
835        end
836
837        it 'receives a grpc status code when reading from a failed bidi call' do
838          server_thread = Thread.new do
839            run_server_bidi_shutdown_after_one_write
840          end
841          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
842          request_queue = Queue.new
843          @sent_msgs = EnumeratorQueue.new(request_queue)
844          responses = get_responses(stub)
845          expect(responses.next).to eq('message')
846          # Wait for the server to shut down
847          server_thread.join
848          expect { responses.next }.to raise_error(GRPC::BadStatus)
849          # Push a sentinel to allow the writer thread to finish
850          request_queue.push(nil)
851        end
852      end
853    end
854
855    describe 'without a call operation' do
856      def get_responses(stub, deadline: nil, marshal_proc: noop)
857        e = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
858                               metadata: @metadata, deadline: deadline)
859        expect(e).to be_a(Enumerator)
860        e
861      end
862
863      it_behaves_like 'bidi streaming'
864    end
865
866    describe 'via a call operation' do
867      after(:each) do
868        @op.wait # make sure wait doesn't hang
869      end
870      def get_responses(stub, run_start_call_first: false, deadline: nil,
871                        marshal_proc: noop)
872        @op = stub.bidi_streamer(@method, @sent_msgs, marshal_proc, noop,
873                                 return_op: true,
874                                 metadata: @metadata, deadline: deadline)
875        expect(@op).to be_a(GRPC::ActiveCall::Operation)
876        @op.start_call if run_start_call_first
877        e = @op.execute
878        expect(e).to be_a(Enumerator)
879        e
880      end
881
882      it_behaves_like 'bidi streaming'
883
884      def run_op_view_metadata_test(run_start_call_first)
885        @server_initial_md = { 'sk1' => 'sv1', 'sk2' => 'sv2' }
886        @server_trailing_md = { 'tk1' => 'tv1', 'tk2' => 'tv2' }
887        th = run_bidi_streamer_echo_ping_pong(
888          @sent_msgs, @pass, true,
889          expected_metadata: @metadata,
890          server_initial_md: @server_initial_md,
891          server_trailing_md: @server_trailing_md)
892        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
893        e = get_responses(stub, run_start_call_first: run_start_call_first)
894        expect(e.collect { |r| r }).to eq(@sent_msgs)
895        th.join
896      end
897
898      it 'can run start_call before executing the call' do
899        run_op_view_metadata_test(true)
900        check_op_view_of_finished_client_call(
901          @op, @server_initial_md, @server_trailing_md) do |responses|
902          responses.each { |r| GRPC.logger.info(r) }
903        end
904      end
905
906      it 'doesnt crash when op_view used after call has finished' do
907        run_op_view_metadata_test(false)
908        check_op_view_of_finished_client_call(
909          @op, @server_initial_md, @server_trailing_md) do |responses|
910          responses.each { |r| GRPC.logger.info(r) }
911        end
912      end
913
914      def run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
915        @server.start
916        recvd_rpc = @server.request_call
917        recvd_call = recvd_rpc.call
918        server_call = GRPC::ActiveCall.new(
919          recvd_call, noop, noop, INFINITE_FUTURE,
920          metadata_received: true, started: false)
921        server_call.send_initial_metadata
922        server_call.remote_send('server call received')
923        wait_for_shutdown_ok_callback.call
924        # since the client is cancelling the call,
925        # we should be able to shut down cleanly
926        @server.shutdown_and_notify(nil)
927        @server.close
928      end
929
930      it 'receives a grpc status code when reading from a cancelled bidi call' do
931        # This test tries to trigger a 'RECV_INITIAL_METADATA' and/or
932        # 'RECV_MESSAGE' op failure.
933        # An attempt to read a message might fail; in that case, iteration
934        # through the response stream should still result in a grpc status.
935        server_can_shutdown = false
936        server_can_shutdown_mu = Mutex.new
937        server_can_shutdown_cv = ConditionVariable.new
938        wait_for_shutdown_ok_callback = proc do
939          server_can_shutdown_mu.synchronize do
940            server_can_shutdown_cv.wait(server_can_shutdown_mu) until server_can_shutdown
941          end
942        end
943        server_thread = Thread.new do
944          run_server_bidi_expect_client_to_cancel(wait_for_shutdown_ok_callback)
945        end
946        stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure)
947        request_queue = Queue.new
948        @sent_msgs = EnumeratorQueue.new(request_queue)
949        responses = get_responses(stub)
950        expect(responses.next).to eq('server call received')
951        @op.cancel
952        expect { responses.next }.to raise_error(GRPC::Cancelled)
953        # Now let the server proceed to shut down.
954        server_can_shutdown_mu.synchronize do
955          server_can_shutdown = true
956          server_can_shutdown_cv.broadcast
957        end
958        server_thread.join
959        # Push a sentinel to allow the writer thread to finish
960        request_queue.push(nil)
961      end
962    end
963  end
964
965  def run_server_streamer(expected_input, replys, status,
966                          expected_metadata: {},
967                          server_initial_md: {},
968                          server_trailing_md: {})
969    wanted_metadata = expected_metadata.clone
970    wakey_thread do |notifier|
971      c = expect_server_to_be_invoked(
972        notifier, metadata_to_send: server_initial_md)
973      wanted_metadata.each do |k, v|
974        expect(c.metadata[k.to_s]).to eq(v)
975      end
976      expect(c.remote_read).to eq(expected_input)
977      replys.each { |r| c.remote_send(r) }
978      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
979                    metadata: server_trailing_md)
980      close_active_server_call(c)
981    end
982  end
983
984  def run_bidi_streamer_handle_inputs_first(expected_inputs, replys,
985                                            status)
986    wakey_thread do |notifier|
987      c = expect_server_to_be_invoked(notifier)
988      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
989      replys.each { |r| c.remote_send(r) }
990      c.send_status(status, status == @pass ? 'OK' : 'NOK', true)
991      close_active_server_call(c)
992    end
993  end
994
995  def run_bidi_streamer_echo_ping_pong(expected_inputs, status, client_starts,
996                                       expected_metadata: {},
997                                       server_initial_md: {},
998                                       server_trailing_md: {})
999    wanted_metadata = expected_metadata.clone
1000    wakey_thread do |notifier|
1001      c = expect_server_to_be_invoked(
1002        notifier, metadata_to_send: server_initial_md)
1003      wanted_metadata.each do |k, v|
1004        expect(c.metadata[k.to_s]).to eq(v)
1005      end
1006      expected_inputs.each do |i|
1007        if client_starts
1008          expect(c.remote_read).to eq(i)
1009          c.remote_send(i)
1010        else
1011          c.remote_send(i)
1012          expect(c.remote_read).to eq(i)
1013        end
1014      end
1015      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
1016                    metadata: server_trailing_md)
1017      close_active_server_call(c)
1018    end
1019  end
1020
1021  def run_client_streamer(expected_inputs, resp, status,
1022                          expected_metadata: {},
1023                          server_initial_md: {},
1024                          server_trailing_md: {})
1025    wanted_metadata = expected_metadata.clone
1026    wakey_thread do |notifier|
1027      c = expect_server_to_be_invoked(
1028        notifier, metadata_to_send: server_initial_md)
1029      expected_inputs.each { |i| expect(c.remote_read).to eq(i) }
1030      wanted_metadata.each do |k, v|
1031        expect(c.metadata[k.to_s]).to eq(v)
1032      end
1033      c.remote_send(resp)
1034      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
1035                    metadata: server_trailing_md)
1036      close_active_server_call(c)
1037    end
1038  end
1039
1040  def run_request_response(expected_input, resp, status,
1041                           expected_metadata: {},
1042                           server_initial_md: {},
1043                           server_trailing_md: {})
1044    wanted_metadata = expected_metadata.clone
1045    wakey_thread do |notifier|
1046      c = expect_server_to_be_invoked(
1047        notifier, metadata_to_send: server_initial_md)
1048      expect(c.remote_read).to eq(expected_input)
1049      wanted_metadata.each do |k, v|
1050        expect(c.metadata[k.to_s]).to eq(v)
1051      end
1052      c.remote_send(resp)
1053      c.send_status(status, status == @pass ? 'OK' : 'NOK', true,
1054                    metadata: server_trailing_md)
1055      close_active_server_call(c)
1056    end
1057  end
1058
1059  def create_secure_test_server
1060    certs = load_test_certs
1061    secure_credentials = GRPC::Core::ServerCredentials.new(
1062      nil, [{ private_key: certs[1], cert_chain: certs[2] }], false)
1063
1064    @server = new_core_server_for_testing(nil)
1065    @server.add_http2_port('0.0.0.0:0', secure_credentials)
1066  end
1067
1068  def create_test_server
1069    @server = new_core_server_for_testing(nil)
1070    @server.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
1071  end
1072
1073  def expect_server_to_be_invoked(notifier, metadata_to_send: nil)
1074    @server.start
1075    notifier.notify(nil)
1076    recvd_rpc = @server.request_call
1077    recvd_call = recvd_rpc.call
1078    recvd_call.metadata = recvd_rpc.metadata
1079    recvd_call.run_batch(SEND_INITIAL_METADATA => metadata_to_send)
1080    GRPC::ActiveCall.new(recvd_call, noop, noop, INFINITE_FUTURE,
1081                         metadata_received: true)
1082  end
1083end
1084