1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14require 'spec_helper' 15require 'timeout' 16 17include Timeout 18include GRPC::Core 19include GRPC::Spec::Helpers 20 21def start_server(port = 0) 22 @srv = new_rpc_server_for_testing(pool_size: 1) 23 server_port = @srv.add_http2_port("localhost:#{port}", :this_port_is_insecure) 24 @srv.handle(EchoService) 25 @server_thd = Thread.new { @srv.run } 26 @srv.wait_till_running 27 server_port 28end 29 30def stop_server 31 expect(@srv.stopped?).to be(false) 32 @srv.stop 33 @server_thd.join 34 expect(@srv.stopped?).to be(true) 35end 36 37describe 'channel connection behavior' do 38 it 'the client channel handles temporary loss of a transport' do 39 port = start_server 40 stub = EchoStub.new("localhost:#{port}", :this_channel_is_insecure) 41 req = EchoMsg.new 42 expect(stub.an_rpc(req)).to be_a(EchoMsg) 43 stop_server 44 sleep 1 45 # TODO(apolcyn) grabbing the same port might fail, is this stable enough? 46 start_server(port) 47 expect(stub.an_rpc(req)).to be_a(EchoMsg) 48 stop_server 49 end 50 51 it 'observably connects and reconnects to transient server' \ 52 ' when using the channel state API' do 53 port = start_server 54 ch = GRPC::Core::Channel.new("localhost:#{port}", {}, 55 :this_channel_is_insecure) 56 57 expect(ch.connectivity_state).to be(GRPC::Core::ConnectivityStates::IDLE) 58 59 state = ch.connectivity_state(true) 60 61 count = 0 62 while count < 20 && state != GRPC::Core::ConnectivityStates::READY 63 ch.watch_connectivity_state(state, Time.now + 60) 64 state = ch.connectivity_state(true) 65 count += 1 66 end 67 68 expect(state).to be(GRPC::Core::ConnectivityStates::READY) 69 70 stop_server 71 72 state = ch.connectivity_state 73 74 count = 0 75 while count < 20 && state == GRPC::Core::ConnectivityStates::READY 76 ch.watch_connectivity_state(state, Time.now + 60) 77 state = ch.connectivity_state 78 count += 1 79 end 80 81 expect(state).to_not be(GRPC::Core::ConnectivityStates::READY) 82 83 start_server(port) 84 85 state = ch.connectivity_state(true) 86 87 count = 0 88 while count < 20 && state != GRPC::Core::ConnectivityStates::READY 89 ch.watch_connectivity_state(state, Time.now + 60) 90 state = ch.connectivity_state(true) 91 count += 1 92 end 93 94 expect(state).to be(GRPC::Core::ConnectivityStates::READY) 95 96 stop_server 97 end 98 99 it 'concurrent watches on the same channel' do 100 timeout(180) do 101 port = start_server 102 ch = GRPC::Core::Channel.new("localhost:#{port}", {}, 103 :this_channel_is_insecure) 104 stop_server 105 106 thds = [] 107 50.times do 108 thds << Thread.new do 109 while ch.connectivity_state(true) != ConnectivityStates::READY 110 ch.watch_connectivity_state( 111 ConnectivityStates::READY, Time.now + 60) 112 break 113 end 114 end 115 end 116 117 sleep 0.01 118 119 start_server(port) 120 121 thds.each(&:join) 122 123 stop_server 124 end 125 end 126end 127