1# Copyright 2015 gRPC authors. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15require 'spec_helper' 16 17Thread.abort_on_exception = true 18 19describe GRPC::Pool do 20 Pool = GRPC::Pool 21 22 describe '#new' do 23 it 'raises if a non-positive size is used' do 24 expect { Pool.new(0) }.to raise_error 25 expect { Pool.new(-1) }.to raise_error 26 expect { Pool.new(Object.new) }.to raise_error 27 end 28 29 it 'is constructed OK with a positive size' do 30 expect { Pool.new(1) }.not_to raise_error 31 end 32 end 33 34 describe '#ready_for_work?' do 35 it 'before start it is not ready' do 36 p = Pool.new(1) 37 expect(p.ready_for_work?).to be(false) 38 end 39 40 it 'it stops being ready after all workers are busy' do 41 p = Pool.new(5) 42 p.start 43 44 wait_mu = Mutex.new 45 wait_cv = ConditionVariable.new 46 wait = true 47 48 job = proc do 49 wait_mu.synchronize do 50 wait_cv.wait(wait_mu) while wait 51 end 52 end 53 54 5.times do 55 expect(p.ready_for_work?).to be(true) 56 p.schedule(&job) 57 end 58 59 expect(p.ready_for_work?).to be(false) 60 61 wait_mu.synchronize do 62 wait = false 63 wait_cv.broadcast 64 end 65 end 66 end 67 68 describe '#schedule' do 69 it 'return if the pool is already stopped' do 70 p = Pool.new(1) 71 p.stop 72 job = proc {} 73 expect { p.schedule(&job) }.to_not raise_error 74 end 75 76 it 'adds jobs that get run by the pool' do 77 p = Pool.new(1) 78 p.start 79 o, q = Object.new, Queue.new 80 job = proc { q.push(o) } 81 p.schedule(&job) 82 expect(q.pop).to be(o) 83 p.stop 84 end 85 end 86 87 describe '#stop' do 88 it 'works when there are no scheduled tasks' do 89 p = Pool.new(1) 90 expect { p.stop }.not_to raise_error 91 end 92 93 it 'stops jobs when there are long running jobs' do 94 p = Pool.new(1) 95 p.start 96 97 wait_forever_mu = Mutex.new 98 wait_forever_cv = ConditionVariable.new 99 wait_forever = true 100 101 job_running = Queue.new 102 job = proc do 103 job_running.push(Object.new) 104 wait_forever_mu.synchronize do 105 wait_forever_cv.wait while wait_forever 106 end 107 end 108 p.schedule(&job) 109 job_running.pop 110 expect { p.stop }.not_to raise_error 111 end 112 end 113 114 describe '#start' do 115 it 'runs jobs as they are scheduled' do 116 p = Pool.new(5) 117 o, q = Object.new, Queue.new 118 p.start 119 n = 5 # arbitrary 120 n.times do 121 p.schedule(o, &q.method(:push)) 122 expect(q.pop).to be(o) 123 end 124 p.stop 125 end 126 end 127end 128