1// Copyright 2014 The Go Authors. All rights reserved. 2// Use of this source code is governed by a BSD-style 3// license that can be found in the LICENSE file. 4 5package http2 6 7import ( 8 "errors" 9 "io" 10 "sync" 11) 12 13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like 14// io.Pipe except there are no PipeReader/PipeWriter halves, and the 15// underlying buffer is an interface. (io.Pipe is always unbuffered) 16type pipe struct { 17 mu sync.Mutex 18 c sync.Cond // c.L lazily initialized to &p.mu 19 b pipeBuffer // nil when done reading 20 err error // read error once empty. non-nil means closed. 21 breakErr error // immediate read error (caller doesn't see rest of b) 22 donec chan struct{} // closed on error 23 readFn func() // optional code to run in Read before error 24} 25 26type pipeBuffer interface { 27 Len() int 28 io.Writer 29 io.Reader 30} 31 32func (p *pipe) Len() int { 33 p.mu.Lock() 34 defer p.mu.Unlock() 35 if p.b == nil { 36 return 0 37 } 38 return p.b.Len() 39} 40 41// Read waits until data is available and copies bytes 42// from the buffer into p. 43func (p *pipe) Read(d []byte) (n int, err error) { 44 p.mu.Lock() 45 defer p.mu.Unlock() 46 if p.c.L == nil { 47 p.c.L = &p.mu 48 } 49 for { 50 if p.breakErr != nil { 51 return 0, p.breakErr 52 } 53 if p.b != nil && p.b.Len() > 0 { 54 return p.b.Read(d) 55 } 56 if p.err != nil { 57 if p.readFn != nil { 58 p.readFn() // e.g. copy trailers 59 p.readFn = nil // not sticky like p.err 60 } 61 p.b = nil 62 return 0, p.err 63 } 64 p.c.Wait() 65 } 66} 67 68var errClosedPipeWrite = errors.New("write on closed buffer") 69 70// Write copies bytes from p into the buffer and wakes a reader. 71// It is an error to write more data than the buffer can hold. 72func (p *pipe) Write(d []byte) (n int, err error) { 73 p.mu.Lock() 74 defer p.mu.Unlock() 75 if p.c.L == nil { 76 p.c.L = &p.mu 77 } 78 defer p.c.Signal() 79 if p.err != nil { 80 return 0, errClosedPipeWrite 81 } 82 if p.breakErr != nil { 83 return len(d), nil // discard when there is no reader 84 } 85 return p.b.Write(d) 86} 87 88// CloseWithError causes the next Read (waking up a current blocked 89// Read if needed) to return the provided err after all data has been 90// read. 91// 92// The error must be non-nil. 93func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } 94 95// BreakWithError causes the next Read (waking up a current blocked 96// Read if needed) to return the provided err immediately, without 97// waiting for unread data. 98func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } 99 100// closeWithErrorAndCode is like CloseWithError but also sets some code to run 101// in the caller's goroutine before returning the error. 102func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } 103 104func (p *pipe) closeWithError(dst *error, err error, fn func()) { 105 if err == nil { 106 panic("err must be non-nil") 107 } 108 p.mu.Lock() 109 defer p.mu.Unlock() 110 if p.c.L == nil { 111 p.c.L = &p.mu 112 } 113 defer p.c.Signal() 114 if *dst != nil { 115 // Already been done. 116 return 117 } 118 p.readFn = fn 119 if dst == &p.breakErr { 120 p.b = nil 121 } 122 *dst = err 123 p.closeDoneLocked() 124} 125 126// requires p.mu be held. 127func (p *pipe) closeDoneLocked() { 128 if p.donec == nil { 129 return 130 } 131 // Close if unclosed. This isn't racy since we always 132 // hold p.mu while closing. 133 select { 134 case <-p.donec: 135 default: 136 close(p.donec) 137 } 138} 139 140// Err returns the error (if any) first set by BreakWithError or CloseWithError. 141func (p *pipe) Err() error { 142 p.mu.Lock() 143 defer p.mu.Unlock() 144 if p.breakErr != nil { 145 return p.breakErr 146 } 147 return p.err 148} 149 150// Done returns a channel which is closed if and when this pipe is closed 151// with CloseWithError. 152func (p *pipe) Done() <-chan struct{} { 153 p.mu.Lock() 154 defer p.mu.Unlock() 155 if p.donec == nil { 156 p.donec = make(chan struct{}) 157 if p.err != nil || p.breakErr != nil { 158 // Already hit an error. 159 p.closeDoneLocked() 160 } 161 } 162 return p.donec 163} 164