• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14require 'spec_helper'
15
16def load_test_certs
17  test_root = File.join(File.dirname(File.dirname(__FILE__)), 'testdata')
18  files = ['ca.pem', 'server1.key', 'server1.pem']
19  files.map { |f| File.open(File.join(test_root, f)).read }
20end
21
22def check_md(wanted_md, received_md)
23  wanted_md.zip(received_md).each do |w, r|
24    w.each do |key, value|
25      expect(r[key]).to eq(value)
26    end
27  end
28end
29
30# A test service with no methods.
31class EmptyService
32  include GRPC::GenericService
33end
34
35# A test service without an implementation.
36class NoRpcImplementation
37  include GRPC::GenericService
38  rpc :an_rpc, EchoMsg, EchoMsg
39end
40
41# A test service with an implementation that fails with BadStatus
42class FailingService
43  include GRPC::GenericService
44  rpc :an_rpc, EchoMsg, EchoMsg
45  attr_reader :details, :code, :md
46
47  def initialize(_default_var = 'ignored')
48    @details = 'app error'
49    @code = 101
50    @md = { 'failed_method' => 'an_rpc' }
51  end
52
53  def an_rpc(_req, _call)
54    fail GRPC::BadStatus.new(@code, @details, @md)
55  end
56end
57
58FailingStub = FailingService.rpc_stub_class
59
60# A slow test service.
61class SlowService
62  include GRPC::GenericService
63  rpc :an_rpc, EchoMsg, EchoMsg
64  attr_reader :received_md, :delay
65
66  def initialize(_default_var = 'ignored')
67    @delay = 0.25
68    @received_md = []
69  end
70
71  def an_rpc(req, call)
72    GRPC.logger.info("starting a slow #{@delay} rpc")
73    sleep @delay
74    @received_md << call.metadata unless call.metadata.nil?
75    req  # send back the req as the response
76  end
77end
78
79SlowStub = SlowService.rpc_stub_class
80
81# A test service that allows a synchronized RPC cancellation
82class SynchronizedCancellationService
83  include GRPC::GenericService
84  rpc :an_rpc, EchoMsg, EchoMsg
85  attr_reader :received_md, :delay
86
87  # notify_request_received and wait_until_rpc_cancelled are
88  # callbacks to synchronously allow the client to proceed with
89  # cancellation (after the unary request has been received),
90  # and to synchronously wait until the client has cancelled the
91  # current RPC.
92  def initialize(notify_request_received, wait_until_rpc_cancelled)
93    @notify_request_received = notify_request_received
94    @wait_until_rpc_cancelled = wait_until_rpc_cancelled
95  end
96
97  def an_rpc(req, _call)
98    GRPC.logger.info('starting a synchronusly cancelled rpc')
99    @notify_request_received.call(req)
100    @wait_until_rpc_cancelled.call
101    req  # send back the req as the response
102  end
103end
104
105SynchronizedCancellationStub = SynchronizedCancellationService.rpc_stub_class
106
107# a test service that hangs onto call objects
108# and uses them after the server-side call has been
109# finished
110class CheckCallAfterFinishedService
111  include GRPC::GenericService
112  rpc :an_rpc, EchoMsg, EchoMsg
113  rpc :a_client_streaming_rpc, stream(EchoMsg), EchoMsg
114  rpc :a_server_streaming_rpc, EchoMsg, stream(EchoMsg)
115  rpc :a_bidi_rpc, stream(EchoMsg), stream(EchoMsg)
116  attr_reader :server_side_call
117
118  def an_rpc(req, call)
119    fail 'shouldnt reuse service' unless @server_side_call.nil?
120    @server_side_call = call
121    req
122  end
123
124  def a_client_streaming_rpc(call)
125    fail 'shouldnt reuse service' unless @server_side_call.nil?
126    @server_side_call = call
127    # iterate through requests so call can complete
128    call.each_remote_read.each { |r| GRPC.logger.info(r) }
129    EchoMsg.new
130  end
131
132  def a_server_streaming_rpc(_, call)
133    fail 'shouldnt reuse service' unless @server_side_call.nil?
134    @server_side_call = call
135    [EchoMsg.new, EchoMsg.new]
136  end
137
138  def a_bidi_rpc(requests, call)
139    fail 'shouldnt reuse service' unless @server_side_call.nil?
140    @server_side_call = call
141    requests.each { |r| GRPC.logger.info(r) }
142    [EchoMsg.new, EchoMsg.new]
143  end
144end
145
146CheckCallAfterFinishedServiceStub = CheckCallAfterFinishedService.rpc_stub_class
147
148# A service with a bidi streaming method.
149class BidiService
150  include GRPC::GenericService
151  rpc :server_sends_bad_input, stream(EchoMsg), stream(EchoMsg)
152
153  def server_sends_bad_input(_, _)
154    'bad response. (not an enumerable, client sees an error)'
155  end
156end
157
158BidiStub = BidiService.rpc_stub_class
159
160describe GRPC::RpcServer do
161  RpcServer = GRPC::RpcServer
162  StatusCodes = GRPC::Core::StatusCodes
163
164  before(:each) do
165    @method = 'an_rpc_method'
166    @pass = 0
167    @fail = 1
168    @noop = proc { |x| x }
169  end
170
171  describe '#new' do
172    it 'can be created with just some args' do
173      opts = { server_args: { a_channel_arg: 'an_arg' } }
174      blk = proc do
175        new_rpc_server_for_testing(**opts)
176      end
177      expect(&blk).not_to raise_error
178    end
179
180    it 'cannot be created with invalid ServerCredentials' do
181      blk = proc do
182        opts = {
183          server_args: { a_channel_arg: 'an_arg' },
184          creds: Object.new
185        }
186        new_rpc_server_for_testing(**opts)
187      end
188      expect(&blk).to raise_error
189    end
190  end
191
192  describe '#stopped?' do
193    before(:each) do
194      opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1.5 }
195      @srv = new_rpc_server_for_testing(**opts)
196      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
197    end
198
199    it 'starts out false' do
200      expect(@srv.stopped?).to be(false)
201    end
202
203    it 'stays false after the server starts running', server: true do
204      @srv.handle(EchoService)
205      t = Thread.new { @srv.run }
206      @srv.wait_till_running
207      expect(@srv.stopped?).to be(false)
208      @srv.stop
209      t.join
210    end
211
212    it 'is true after a running server is stopped', server: true do
213      @srv.handle(EchoService)
214      t = Thread.new { @srv.run }
215      @srv.wait_till_running
216      @srv.stop
217      t.join
218      expect(@srv.stopped?).to be(true)
219    end
220  end
221
222  describe '#running?' do
223    it 'starts out false' do
224      opts = {
225        server_args: { a_channel_arg: 'an_arg' }
226      }
227      r = new_rpc_server_for_testing(**opts)
228      expect(r.running?).to be(false)
229    end
230
231    it 'is false if run is called with no services registered', server: true do
232      opts = {
233        server_args: { a_channel_arg: 'an_arg' },
234        poll_period: 2
235      }
236      r = new_rpc_server_for_testing(**opts)
237      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
238      expect { r.run }.to raise_error(RuntimeError)
239    end
240
241    it 'is true after run is called with a registered service' do
242      opts = {
243        server_args: { a_channel_arg: 'an_arg' },
244        poll_period: 2.5
245      }
246      r = new_rpc_server_for_testing(**opts)
247      r.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
248      r.handle(EchoService)
249      t = Thread.new { r.run }
250      r.wait_till_running
251      expect(r.running?).to be(true)
252      r.stop
253      t.join
254    end
255  end
256
257  describe '#handle' do
258    before(:each) do
259      @opts = { server_args: { a_channel_arg: 'an_arg' }, poll_period: 1 }
260      @srv = new_rpc_server_for_testing(**@opts)
261      @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
262    end
263
264    it 'raises if #run has already been called' do
265      @srv.handle(EchoService)
266      t = Thread.new { @srv.run }
267      @srv.wait_till_running
268      expect { @srv.handle(EchoService) }.to raise_error
269      @srv.stop
270      t.join
271    end
272
273    it 'raises if the server has been run and stopped' do
274      @srv.handle(EchoService)
275      t = Thread.new { @srv.run }
276      @srv.wait_till_running
277      @srv.stop
278      t.join
279      expect { @srv.handle(EchoService) }.to raise_error
280    end
281
282    it 'raises if the service does not include GenericService ' do
283      expect { @srv.handle(Object) }.to raise_error
284    end
285
286    it 'raises if the service does not declare any rpc methods' do
287      expect { @srv.handle(EmptyService) }.to raise_error
288    end
289
290    it 'raises if a handler method is already registered' do
291      @srv.handle(EchoService)
292      expect { r.handle(EchoService) }.to raise_error
293    end
294  end
295
296  describe '#run' do
297    let(:client_opts) { { channel_override: @ch } }
298    let(:marshal) { EchoService.rpc_descs[:an_rpc].marshal_proc }
299    let(:unmarshal) { EchoService.rpc_descs[:an_rpc].unmarshal_proc(:output) }
300
301    context 'with no connect_metadata' do
302      before(:each) do
303        server_opts = {
304          poll_period: 1
305        }
306        @srv = new_rpc_server_for_testing(**server_opts)
307        server_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
308        @host = "localhost:#{server_port}"
309        @ch = GRPC::Core::Channel.new(@host, nil, :this_channel_is_insecure)
310      end
311
312      it 'should return NOT_FOUND status on unknown methods', server: true do
313        @srv.handle(EchoService)
314        t = Thread.new { @srv.run }
315        @srv.wait_till_running
316        req = EchoMsg.new
317        blk = proc do
318          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
319                                      **client_opts)
320          stub.request_response('/unknown', req, marshal, unmarshal)
321        end
322        expect(&blk).to raise_error GRPC::BadStatus
323        @srv.stop
324        t.join
325      end
326
327      it 'should return UNIMPLEMENTED on unimplemented methods', server: true do
328        @srv.handle(NoRpcImplementation)
329        t = Thread.new { @srv.run }
330        @srv.wait_till_running
331        req = EchoMsg.new
332        blk = proc do
333          stub = GRPC::ClientStub.new(@host, :this_channel_is_insecure,
334                                      **client_opts)
335          stub.request_response('/an_rpc', req, marshal, unmarshal)
336        end
337        expect(&blk).to raise_error do |error|
338          expect(error).to be_a(GRPC::BadStatus)
339          expect(error.code).to be(GRPC::Core::StatusCodes::UNIMPLEMENTED)
340        end
341        @srv.stop
342        t.join
343      end
344
345      it 'should return UNIMPLEMENTED on unimplemented ' \
346         'methods for client_streamer', server: true do
347        @srv.handle(EchoService)
348        t = Thread.new { @srv.run }
349        @srv.wait_till_running
350        blk = proc do
351          stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
352          requests = [EchoMsg.new, EchoMsg.new]
353          stub.a_client_streaming_rpc_unimplemented(requests)
354        end
355
356        begin
357          expect(&blk).to raise_error do |error|
358            expect(error).to be_a(GRPC::BadStatus)
359            expect(error.code).to eq(GRPC::Core::StatusCodes::UNIMPLEMENTED)
360          end
361        ensure
362          @srv.stop # should be call not to crash
363          t.join
364        end
365      end
366
367      it 'should handle multiple sequential requests', server: true do
368        @srv.handle(EchoService)
369        t = Thread.new { @srv.run }
370        @srv.wait_till_running
371        req = EchoMsg.new
372        n = 5  # arbitrary
373        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
374        n.times { expect(stub.an_rpc(req)).to be_a(EchoMsg) }
375        @srv.stop
376        t.join
377      end
378
379      it 'should receive metadata sent as rpc keyword args', server: true do
380        service = EchoService.new
381        @srv.handle(service)
382        t = Thread.new { @srv.run }
383        @srv.wait_till_running
384        req = EchoMsg.new
385        stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
386        expect(stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }))
387          .to be_a(EchoMsg)
388        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
389        check_md(wanted_md, service.received_md)
390        @srv.stop
391        t.join
392      end
393
394      it 'should receive metadata if a deadline is specified', server: true do
395        service = SlowService.new
396        @srv.handle(service)
397        t = Thread.new { @srv.run }
398        @srv.wait_till_running
399        req = EchoMsg.new
400        stub = SlowStub.new(@host, :this_channel_is_insecure, **client_opts)
401        timeout = service.delay + 1.0
402        deadline = GRPC::Core::TimeConsts.from_relative_time(timeout)
403        resp = stub.an_rpc(req,
404                           deadline: deadline,
405                           metadata: { k1: 'v1', k2: 'v2' })
406        expect(resp).to be_a(EchoMsg)
407        wanted_md = [{ 'k1' => 'v1', 'k2' => 'v2' }]
408        check_md(wanted_md, service.received_md)
409        @srv.stop
410        t.join
411      end
412
413      it 'should handle cancellation correctly', server: true do
414        request_received = false
415        request_received_mu = Mutex.new
416        request_received_cv = ConditionVariable.new
417        notify_request_received = proc do |req|
418          request_received_mu.synchronize do
419            fail 'req is nil' if req.nil?
420            expect(req.is_a?(EchoMsg)).to be true
421            fail 'test bug - already set' if request_received
422            request_received = true
423            request_received_cv.signal
424          end
425        end
426
427        rpc_cancelled = false
428        rpc_cancelled_mu = Mutex.new
429        rpc_cancelled_cv = ConditionVariable.new
430        wait_until_rpc_cancelled = proc do
431          rpc_cancelled_mu.synchronize do
432            loop do
433              break if rpc_cancelled
434              rpc_cancelled_cv.wait(rpc_cancelled_mu)
435            end
436          end
437        end
438
439        service = SynchronizedCancellationService.new(notify_request_received,
440                                                      wait_until_rpc_cancelled)
441        @srv.handle(service)
442        srv_thd = Thread.new { @srv.run }
443        @srv.wait_till_running
444        req = EchoMsg.new
445        stub = SynchronizedCancellationStub.new(@host,
446                                                :this_channel_is_insecure,
447                                                **client_opts)
448        op = stub.an_rpc(req, return_op: true)
449
450        client_thd = Thread.new do
451          expect { op.execute }.to raise_error GRPC::Cancelled
452        end
453
454        request_received_mu.synchronize do
455          loop do
456            break if request_received
457            request_received_cv.wait(request_received_mu)
458          end
459        end
460
461        op.cancel
462
463        rpc_cancelled_mu.synchronize do
464          fail 'test bug - already set' if rpc_cancelled
465          rpc_cancelled = true
466          rpc_cancelled_cv.signal
467        end
468
469        client_thd.join
470        @srv.stop
471        srv_thd.join
472      end
473
474      it 'should handle multiple parallel requests', server: true do
475        @srv.handle(EchoService)
476        t = Thread.new { @srv.run }
477        @srv.wait_till_running
478        req, q = EchoMsg.new, Queue.new
479        n = 5  # arbitrary
480        threads = [t]
481        n.times do
482          threads << Thread.new do
483            stub = EchoStub.new(@host, :this_channel_is_insecure, **client_opts)
484            q << stub.an_rpc(req)
485          end
486        end
487        n.times { expect(q.pop).to be_a(EchoMsg) }
488        @srv.stop
489        threads.each(&:join)
490      end
491
492      it 'should return RESOURCE_EXHAUSTED on too many jobs', server: true do
493        opts = {
494          server_args: { a_channel_arg: 'an_arg' },
495          pool_size: 2,
496          poll_period: 1,
497          max_waiting_requests: 1
498        }
499        alt_srv = new_rpc_server_for_testing(**opts)
500        alt_srv.handle(SlowService)
501        alt_port = alt_srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
502        alt_host = "0.0.0.0:#{alt_port}"
503        t = Thread.new { alt_srv.run }
504        alt_srv.wait_till_running
505        req = EchoMsg.new
506        n = 20 # arbitrary, use as many to ensure the server pool is exceeded
507        threads = []
508        one_failed_as_unavailable = false
509        n.times do
510          threads << Thread.new do
511            stub = SlowStub.new(alt_host, :this_channel_is_insecure)
512            begin
513              stub.an_rpc(req)
514            rescue GRPC::ResourceExhausted
515              one_failed_as_unavailable = true
516            end
517          end
518        end
519        threads.each(&:join)
520        alt_srv.stop
521        t.join
522        expect(one_failed_as_unavailable).to be(true)
523      end
524
525      it 'should send a status UNKNOWN with a relevant message when the' \
526        'servers response stream is not an enumerable' do
527        @srv.handle(BidiService)
528        t = Thread.new { @srv.run }
529        @srv.wait_till_running
530        stub = BidiStub.new(@host, :this_channel_is_insecure, **client_opts)
531        responses = stub.server_sends_bad_input([])
532        exception = nil
533        begin
534          responses.each { |r| r }
535        rescue GRPC::Unknown => e
536          exception = e
537        end
538        # Erroneous responses sent from the server handler should cause an
539        # exception on the client with relevant info.
540        expected_details = 'NoMethodError: undefined method `each\' for '\
541          '"bad response. (not an enumerable, client sees an error)"'
542
543        expect(exception.inspect.include?(expected_details)).to be true
544        @srv.stop
545        t.join
546      end
547    end
548
549    context 'with connect metadata' do
550      let(:test_md_proc) do
551        proc do |mth, md|
552          res = md.clone
553          res['method'] = mth
554          res['connect_k1'] = 'connect_v1'
555          res
556        end
557      end
558      before(:each) do
559        server_opts = {
560          poll_period: 1,
561          connect_md_proc: test_md_proc
562        }
563        @srv = new_rpc_server_for_testing(**server_opts)
564        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
565        @alt_host = "0.0.0.0:#{alt_port}"
566      end
567
568      it 'should send connect metadata to the client', server: true do
569        service = EchoService.new
570        @srv.handle(service)
571        t = Thread.new { @srv.run }
572        @srv.wait_till_running
573        req = EchoMsg.new
574        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
575        op = stub.an_rpc(req, metadata: { k1: 'v1', k2: 'v2' }, return_op: true)
576        expect(op.metadata).to be nil
577        expect(op.execute).to be_a(EchoMsg)
578        wanted_md = {
579          'k1' => 'v1',
580          'k2' => 'v2',
581          'method' => '/EchoService/an_rpc',
582          'connect_k1' => 'connect_v1'
583        }
584        wanted_md.each do |key, value|
585          GRPC.logger.info("key: #{key}")
586          expect(op.metadata[key]).to eq(value)
587        end
588        @srv.stop
589        t.join
590      end
591    end
592
593    context 'with trailing metadata' do
594      before(:each) do
595        server_opts = {
596          poll_period: 1
597        }
598        @srv = new_rpc_server_for_testing(**server_opts)
599        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
600        @alt_host = "0.0.0.0:#{alt_port}"
601      end
602
603      it 'should be added to BadStatus when requests fail', server: true do
604        service = FailingService.new
605        @srv.handle(service)
606        t = Thread.new { @srv.run }
607        @srv.wait_till_running
608        req = EchoMsg.new
609        stub = FailingStub.new(@alt_host, :this_channel_is_insecure)
610        blk = proc { stub.an_rpc(req) }
611
612        # confirm it raise the expected error
613        expect(&blk).to raise_error GRPC::BadStatus
614
615        # call again and confirm exception contained the trailing metadata.
616        begin
617          blk.call
618        rescue GRPC::BadStatus => e
619          expect(e.code).to eq(service.code)
620          expect(e.details).to eq(service.details)
621          expect(e.metadata).to eq(service.md)
622        end
623        @srv.stop
624        t.join
625      end
626
627      it 'should be received by the client', server: true do
628        wanted_trailers = { 'k1' => 'out_v1', 'k2' => 'out_v2' }
629        service = EchoService.new(k1: 'out_v1', k2: 'out_v2')
630        @srv.handle(service)
631        t = Thread.new { @srv.run }
632        @srv.wait_till_running
633        req = EchoMsg.new
634        stub = EchoStub.new(@alt_host, :this_channel_is_insecure)
635        op = stub.an_rpc(req, return_op: true, metadata: { k1: 'v1', k2: 'v2' })
636        expect(op.metadata).to be nil
637        expect(op.execute).to be_a(EchoMsg)
638        expect(op.trailing_metadata).to eq(wanted_trailers)
639        @srv.stop
640        t.join
641      end
642    end
643
644    context 'when call objects are used after calls have completed' do
645      before(:each) do
646        server_opts = {
647          poll_period: 1
648        }
649        @srv = new_rpc_server_for_testing(**server_opts)
650        alt_port = @srv.add_http2_port('0.0.0.0:0', :this_port_is_insecure)
651        @alt_host = "0.0.0.0:#{alt_port}"
652
653        @service = CheckCallAfterFinishedService.new
654        @srv.handle(@service)
655        @srv_thd  = Thread.new { @srv.run }
656        @srv.wait_till_running
657      end
658
659      # check that the server-side call is still in a usable state even
660      # after it has finished
661      def check_single_req_view_of_finished_call(call)
662        common_check_of_finished_server_call(call)
663
664        expect(call.peer).to be_a(String)
665        expect(call.peer_cert).to be(nil)
666      end
667
668      def check_multi_req_view_of_finished_call(call)
669        common_check_of_finished_server_call(call)
670
671        expect do
672          call.each_remote_read.each { |r| p r }
673        end.to raise_error(GRPC::Core::CallError)
674      end
675
676      def common_check_of_finished_server_call(call)
677        expect do
678          call.merge_metadata_to_send({})
679        end.to raise_error(RuntimeError)
680
681        expect do
682          call.send_initial_metadata
683        end.to_not raise_error
684
685        expect(call.cancelled?).to be(false)
686        expect(call.metadata).to be_a(Hash)
687        expect(call.metadata['user-agent']).to be_a(String)
688
689        expect(call.metadata_sent).to be(true)
690        expect(call.output_metadata).to eq({})
691        expect(call.metadata_to_send).to eq({})
692        expect(call.deadline.is_a?(Time)).to be(true)
693      end
694
695      it 'should not crash when call used after an unary call is finished' do
696        req = EchoMsg.new
697        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
698                                                     :this_channel_is_insecure)
699        resp = stub.an_rpc(req)
700        expect(resp).to be_a(EchoMsg)
701        @srv.stop
702        @srv_thd.join
703
704        check_single_req_view_of_finished_call(@service.server_side_call)
705      end
706
707      it 'should not crash when call used after client streaming finished' do
708        requests = [EchoMsg.new, EchoMsg.new]
709        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
710                                                     :this_channel_is_insecure)
711        resp = stub.a_client_streaming_rpc(requests)
712        expect(resp).to be_a(EchoMsg)
713        @srv.stop
714        @srv_thd.join
715
716        check_multi_req_view_of_finished_call(@service.server_side_call)
717      end
718
719      it 'should not crash when call used after server streaming finished' do
720        req = EchoMsg.new
721        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
722                                                     :this_channel_is_insecure)
723        responses = stub.a_server_streaming_rpc(req)
724        responses.each do |r|
725          expect(r).to be_a(EchoMsg)
726        end
727        @srv.stop
728        @srv_thd.join
729
730        check_single_req_view_of_finished_call(@service.server_side_call)
731      end
732
733      it 'should not crash when call used after a bidi call is finished' do
734        requests = [EchoMsg.new, EchoMsg.new]
735        stub = CheckCallAfterFinishedServiceStub.new(@alt_host,
736                                                     :this_channel_is_insecure)
737        responses = stub.a_bidi_rpc(requests)
738        responses.each do |r|
739          expect(r).to be_a(EchoMsg)
740        end
741        @srv.stop
742        @srv_thd.join
743
744        check_multi_req_view_of_finished_call(@service.server_side_call)
745      end
746    end
747  end
748end
749