• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package http2
6
7import (
8	"errors"
9	"io"
10	"sync"
11)
12
13// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
14// io.Pipe except there are no PipeReader/PipeWriter halves, and the
15// underlying buffer is an interface. (io.Pipe is always unbuffered)
16type pipe struct {
17	mu       sync.Mutex
18	c        sync.Cond     // c.L lazily initialized to &p.mu
19	b        pipeBuffer    // nil when done reading
20	err      error         // read error once empty. non-nil means closed.
21	breakErr error         // immediate read error (caller doesn't see rest of b)
22	donec    chan struct{} // closed on error
23	readFn   func()        // optional code to run in Read before error
24}
25
26type pipeBuffer interface {
27	Len() int
28	io.Writer
29	io.Reader
30}
31
32func (p *pipe) Len() int {
33	p.mu.Lock()
34	defer p.mu.Unlock()
35	if p.b == nil {
36		return 0
37	}
38	return p.b.Len()
39}
40
41// Read waits until data is available and copies bytes
42// from the buffer into p.
43func (p *pipe) Read(d []byte) (n int, err error) {
44	p.mu.Lock()
45	defer p.mu.Unlock()
46	if p.c.L == nil {
47		p.c.L = &p.mu
48	}
49	for {
50		if p.breakErr != nil {
51			return 0, p.breakErr
52		}
53		if p.b != nil && p.b.Len() > 0 {
54			return p.b.Read(d)
55		}
56		if p.err != nil {
57			if p.readFn != nil {
58				p.readFn()     // e.g. copy trailers
59				p.readFn = nil // not sticky like p.err
60			}
61			p.b = nil
62			return 0, p.err
63		}
64		p.c.Wait()
65	}
66}
67
68var errClosedPipeWrite = errors.New("write on closed buffer")
69
70// Write copies bytes from p into the buffer and wakes a reader.
71// It is an error to write more data than the buffer can hold.
72func (p *pipe) Write(d []byte) (n int, err error) {
73	p.mu.Lock()
74	defer p.mu.Unlock()
75	if p.c.L == nil {
76		p.c.L = &p.mu
77	}
78	defer p.c.Signal()
79	if p.err != nil {
80		return 0, errClosedPipeWrite
81	}
82	if p.breakErr != nil {
83		return len(d), nil // discard when there is no reader
84	}
85	return p.b.Write(d)
86}
87
88// CloseWithError causes the next Read (waking up a current blocked
89// Read if needed) to return the provided err after all data has been
90// read.
91//
92// The error must be non-nil.
93func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
94
95// BreakWithError causes the next Read (waking up a current blocked
96// Read if needed) to return the provided err immediately, without
97// waiting for unread data.
98func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
99
100// closeWithErrorAndCode is like CloseWithError but also sets some code to run
101// in the caller's goroutine before returning the error.
102func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
103
104func (p *pipe) closeWithError(dst *error, err error, fn func()) {
105	if err == nil {
106		panic("err must be non-nil")
107	}
108	p.mu.Lock()
109	defer p.mu.Unlock()
110	if p.c.L == nil {
111		p.c.L = &p.mu
112	}
113	defer p.c.Signal()
114	if *dst != nil {
115		// Already been done.
116		return
117	}
118	p.readFn = fn
119	if dst == &p.breakErr {
120		p.b = nil
121	}
122	*dst = err
123	p.closeDoneLocked()
124}
125
126// requires p.mu be held.
127func (p *pipe) closeDoneLocked() {
128	if p.donec == nil {
129		return
130	}
131	// Close if unclosed. This isn't racy since we always
132	// hold p.mu while closing.
133	select {
134	case <-p.donec:
135	default:
136		close(p.donec)
137	}
138}
139
140// Err returns the error (if any) first set by BreakWithError or CloseWithError.
141func (p *pipe) Err() error {
142	p.mu.Lock()
143	defer p.mu.Unlock()
144	if p.breakErr != nil {
145		return p.breakErr
146	}
147	return p.err
148}
149
150// Done returns a channel which is closed if and when this pipe is closed
151// with CloseWithError.
152func (p *pipe) Done() <-chan struct{} {
153	p.mu.Lock()
154	defer p.mu.Unlock()
155	if p.donec == nil {
156		p.donec = make(chan struct{})
157		if p.err != nil || p.breakErr != nil {
158			// Already hit an error.
159			p.closeDoneLocked()
160		}
161	}
162	return p.donec
163}
164