• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2015 gRPC authors.
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14require 'spec_helper'
15require 'timeout'
16
17include Timeout
18include GRPC::Core
19include GRPC::Spec::Helpers
20
21def start_server(port = 0)
22  @srv = new_rpc_server_for_testing(pool_size: 1)
23  server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure)
24  @srv.handle(EchoService)
25  @server_thd = Thread.new { @srv.run }
26  @srv.wait_till_running
27  server_port
28end
29
30def stop_server
31  expect(@srv.stopped?).to be(false)
32  @srv.stop
33  @server_thd.join
34  expect(@srv.stopped?).to be(true)
35end
36
37describe 'channel connection behavior' do
38  it 'the client channel handles temporary loss of a transport' do
39    port = start_server
40    stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure)
41    req = EchoMsg.new
42    expect(stub.an_rpc(req)).to be_a(EchoMsg)
43    stop_server
44    sleep 1
45    # TODO(apolcyn) grabbing the same port might fail, is this stable enough?
46    start_server(port)
47    expect(stub.an_rpc(req)).to be_a(EchoMsg)
48    stop_server
49  end
50
51  it 'observably connects and reconnects to transient server' \
52    ' when using the channel state API' do
53    port = start_server
54    ch = GRPC::Core::Channel.new("localhost:#{port}", {},
55                                 :this_channel_is_insecure)
56
57    expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE)
58
59    state = ch.connectivity_state(true)
60
61    count = 0
62    while count < 20 && state != GRPC::Core::ConnectivityStates::READY
63      ch.watch_connectivity_state(state, Time.now + 60)
64      state = ch.connectivity_state(true)
65      count += 1
66    end
67
68    expect(state).to be(GRPC::Core::ConnectivityStates::READY)
69
70    stop_server
71
72    state = ch.connectivity_state
73
74    count = 0
75    while count < 20 && state == GRPC::Core::ConnectivityStates::READY
76      ch.watch_connectivity_state(state, Time.now + 60)
77      state = ch.connectivity_state
78      count += 1
79    end
80
81    expect(state).to_not be(GRPC::Core::ConnectivityStates::READY)
82
83    start_server(port)
84
85    state = ch.connectivity_state(true)
86
87    count = 0
88    while count < 20 && state != GRPC::Core::ConnectivityStates::READY
89      ch.watch_connectivity_state(state, Time.now + 60)
90      state = ch.connectivity_state(true)
91      count += 1
92    end
93
94    expect(state).to be(GRPC::Core::ConnectivityStates::READY)
95
96    stop_server
97  end
98
99  it 'concurrent watches on the same channel' do
100    timeout(180) do
101      port = start_server
102      ch = GRPC::Core::Channel.new("localhost:#{port}", {},
103                                   :this_channel_is_insecure)
104      stop_server
105
106      thds = []
107      50.times do
108        thds << Thread.new do
109          while ch.connectivity_state(true) != ConnectivityStates::READY
110            ch.watch_connectivity_state(
111              ConnectivityStates::READY, Time.now + 60)
112            break
113          end
114        end
115      end
116
117      sleep 0.01
118
119      start_server(port)
120
121      thds.each(&:join)
122
123      stop_server
124    end
125  end
126end
127