1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19package transport 20 21import ( 22 "fmt" 23 "math" 24 "sync" 25 "sync/atomic" 26 "time" 27) 28 29const ( 30 // The default value of flow control window size in HTTP2 spec. 31 defaultWindowSize = 65535 32 // The initial window size for flow control. 33 initialWindowSize = defaultWindowSize // for an RPC 34 infinity = time.Duration(math.MaxInt64) 35 defaultClientKeepaliveTime = infinity 36 defaultClientKeepaliveTimeout = 20 * time.Second 37 defaultMaxStreamsClient = 100 38 defaultMaxConnectionIdle = infinity 39 defaultMaxConnectionAge = infinity 40 defaultMaxConnectionAgeGrace = infinity 41 defaultServerKeepaliveTime = 2 * time.Hour 42 defaultServerKeepaliveTimeout = 20 * time.Second 43 defaultKeepalivePolicyMinTime = 5 * time.Minute 44 // max window limit set by HTTP2 Specs. 45 maxWindowSize = math.MaxInt32 46 // defaultWriteQuota is the default value for number of data 47 // bytes that each stream can schedule before some of it being 48 // flushed out. 49 defaultWriteQuota = 64 * 1024 50) 51 52// writeQuota is a soft limit on the amount of data a stream can 53// schedule before some of it is written out. 54type writeQuota struct { 55 quota int32 56 // get waits on read from when quota goes less than or equal to zero. 57 // replenish writes on it when quota goes positive again. 58 ch chan struct{} 59 // done is triggered in error case. 60 done <-chan struct{} 61 // replenish is called by loopyWriter to give quota back to. 62 // It is implemented as a field so that it can be updated 63 // by tests. 64 replenish func(n int) 65} 66 67func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota { 68 w := &writeQuota{ 69 quota: sz, 70 ch: make(chan struct{}, 1), 71 done: done, 72 } 73 w.replenish = w.realReplenish 74 return w 75} 76 77func (w *writeQuota) get(sz int32) error { 78 for { 79 if atomic.LoadInt32(&w.quota) > 0 { 80 atomic.AddInt32(&w.quota, -sz) 81 return nil 82 } 83 select { 84 case <-w.ch: 85 continue 86 case <-w.done: 87 return errStreamDone 88 } 89 } 90} 91 92func (w *writeQuota) realReplenish(n int) { 93 sz := int32(n) 94 a := atomic.AddInt32(&w.quota, sz) 95 b := a - sz 96 if b <= 0 && a > 0 { 97 select { 98 case w.ch <- struct{}{}: 99 default: 100 } 101 } 102} 103 104type trInFlow struct { 105 limit uint32 106 unacked uint32 107 effectiveWindowSize uint32 108} 109 110func (f *trInFlow) newLimit(n uint32) uint32 { 111 d := n - f.limit 112 f.limit = n 113 f.updateEffectiveWindowSize() 114 return d 115} 116 117func (f *trInFlow) onData(n uint32) uint32 { 118 f.unacked += n 119 if f.unacked >= f.limit/4 { 120 w := f.unacked 121 f.unacked = 0 122 f.updateEffectiveWindowSize() 123 return w 124 } 125 f.updateEffectiveWindowSize() 126 return 0 127} 128 129func (f *trInFlow) reset() uint32 { 130 w := f.unacked 131 f.unacked = 0 132 f.updateEffectiveWindowSize() 133 return w 134} 135 136func (f *trInFlow) updateEffectiveWindowSize() { 137 atomic.StoreUint32(&f.effectiveWindowSize, f.limit-f.unacked) 138} 139 140func (f *trInFlow) getSize() uint32 { 141 return atomic.LoadUint32(&f.effectiveWindowSize) 142} 143 144// TODO(mmukhi): Simplify this code. 145// inFlow deals with inbound flow control 146type inFlow struct { 147 mu sync.Mutex 148 // The inbound flow control limit for pending data. 149 limit uint32 150 // pendingData is the overall data which have been received but not been 151 // consumed by applications. 152 pendingData uint32 153 // The amount of data the application has consumed but grpc has not sent 154 // window update for them. Used to reduce window update frequency. 155 pendingUpdate uint32 156 // delta is the extra window update given by receiver when an application 157 // is reading data bigger in size than the inFlow limit. 158 delta uint32 159} 160 161// newLimit updates the inflow window to a new value n. 162// It assumes that n is always greater than the old limit. 163func (f *inFlow) newLimit(n uint32) uint32 { 164 f.mu.Lock() 165 d := n - f.limit 166 f.limit = n 167 f.mu.Unlock() 168 return d 169} 170 171func (f *inFlow) maybeAdjust(n uint32) uint32 { 172 if n > uint32(math.MaxInt32) { 173 n = uint32(math.MaxInt32) 174 } 175 f.mu.Lock() 176 // estSenderQuota is the receiver's view of the maximum number of bytes the sender 177 // can send without a window update. 178 estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate)) 179 // estUntransmittedData is the maximum number of bytes the sends might not have put 180 // on the wire yet. A value of 0 or less means that we have already received all or 181 // more bytes than the application is requesting to read. 182 estUntransmittedData := int32(n - f.pendingData) // Casting into int32 since it could be negative. 183 // This implies that unless we send a window update, the sender won't be able to send all the bytes 184 // for this message. Therefore we must send an update over the limit since there's an active read 185 // request from the application. 186 if estUntransmittedData > estSenderQuota { 187 // Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec. 188 if f.limit+n > maxWindowSize { 189 f.delta = maxWindowSize - f.limit 190 } else { 191 // Send a window update for the whole message and not just the difference between 192 // estUntransmittedData and estSenderQuota. This will be helpful in case the message 193 // is padded; We will fallback on the current available window(at least a 1/4th of the limit). 194 f.delta = n 195 } 196 f.mu.Unlock() 197 return f.delta 198 } 199 f.mu.Unlock() 200 return 0 201} 202 203// onData is invoked when some data frame is received. It updates pendingData. 204func (f *inFlow) onData(n uint32) error { 205 f.mu.Lock() 206 f.pendingData += n 207 if f.pendingData+f.pendingUpdate > f.limit+f.delta { 208 limit := f.limit 209 rcvd := f.pendingData + f.pendingUpdate 210 f.mu.Unlock() 211 return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit) 212 } 213 f.mu.Unlock() 214 return nil 215} 216 217// onRead is invoked when the application reads the data. It returns the window size 218// to be sent to the peer. 219func (f *inFlow) onRead(n uint32) uint32 { 220 f.mu.Lock() 221 if f.pendingData == 0 { 222 f.mu.Unlock() 223 return 0 224 } 225 f.pendingData -= n 226 if n > f.delta { 227 n -= f.delta 228 f.delta = 0 229 } else { 230 f.delta -= n 231 n = 0 232 } 233 f.pendingUpdate += n 234 if f.pendingUpdate >= f.limit/4 { 235 wu := f.pendingUpdate 236 f.pendingUpdate = 0 237 f.mu.Unlock() 238 return wu 239 } 240 f.mu.Unlock() 241 return 0 242} 243