• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2016 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package gensupport
6
7import (
8	"errors"
9	"fmt"
10	"io"
11	"net/http"
12	"sync"
13	"time"
14
15	"golang.org/x/net/context"
16)
17
18const (
19	// statusTooManyRequests is returned by the storage API if the
20	// per-project limits have been temporarily exceeded. The request
21	// should be retried.
22	// https://cloud.google.com/storage/docs/json_api/v1/status-codes#standardcodes
23	statusTooManyRequests = 429
24)
25
26// ResumableUpload is used by the generated APIs to provide resumable uploads.
27// It is not used by developers directly.
28type ResumableUpload struct {
29	Client *http.Client
30	// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
31	URI       string
32	UserAgent string // User-Agent for header of the request
33	// Media is the object being uploaded.
34	Media *MediaBuffer
35	// MediaType defines the media type, e.g. "image/jpeg".
36	MediaType string
37
38	mu       sync.Mutex // guards progress
39	progress int64      // number of bytes uploaded so far
40
41	// Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
42	Callback func(int64)
43
44	// If not specified, a default exponential backoff strategy will be used.
45	Backoff BackoffStrategy
46}
47
48// Progress returns the number of bytes uploaded at this point.
49func (rx *ResumableUpload) Progress() int64 {
50	rx.mu.Lock()
51	defer rx.mu.Unlock()
52	return rx.progress
53}
54
55// doUploadRequest performs a single HTTP request to upload data.
56// off specifies the offset in rx.Media from which data is drawn.
57// size is the number of bytes in data.
58// final specifies whether data is the final chunk to be uploaded.
59func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
60	req, err := http.NewRequest("POST", rx.URI, data)
61	if err != nil {
62		return nil, err
63	}
64
65	req.ContentLength = size
66	var contentRange string
67	if final {
68		if size == 0 {
69			contentRange = fmt.Sprintf("bytes */%v", off)
70		} else {
71			contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
72		}
73	} else {
74		contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
75	}
76	req.Header.Set("Content-Range", contentRange)
77	req.Header.Set("Content-Type", rx.MediaType)
78	req.Header.Set("User-Agent", rx.UserAgent)
79
80	// Google's upload endpoint uses status code 308 for a
81	// different purpose than the "308 Permanent Redirect"
82	// since-standardized in RFC 7238. Because of the conflict in
83	// semantics, Google added this new request header which
84	// causes it to not use "308" and instead reply with 200 OK
85	// and sets the upload-specific "X-HTTP-Status-Code-Override:
86	// 308" response header.
87	req.Header.Set("X-GUploader-No-308", "yes")
88
89	return SendRequest(ctx, rx.Client, req)
90}
91
92func statusResumeIncomplete(resp *http.Response) bool {
93	// This is how the server signals "status resume incomplete"
94	// when X-GUploader-No-308 is set to "yes":
95	return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
96}
97
98// reportProgress calls a user-supplied callback to report upload progress.
99// If old==updated, the callback is not called.
100func (rx *ResumableUpload) reportProgress(old, updated int64) {
101	if updated-old == 0 {
102		return
103	}
104	rx.mu.Lock()
105	rx.progress = updated
106	rx.mu.Unlock()
107	if rx.Callback != nil {
108		rx.Callback(updated)
109	}
110}
111
112// transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
113func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
114	chunk, off, size, err := rx.Media.Chunk()
115
116	done := err == io.EOF
117	if !done && err != nil {
118		return nil, err
119	}
120
121	res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
122	if err != nil {
123		return res, err
124	}
125
126	// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
127	// this file), so we don't expect to get a 308.
128	if res.StatusCode == 308 {
129		return nil, errors.New("unexpected 308 response status code")
130	}
131
132	if res.StatusCode == http.StatusOK {
133		rx.reportProgress(off, off+int64(size))
134	}
135
136	if statusResumeIncomplete(res) {
137		rx.Media.Next()
138	}
139	return res, nil
140}
141
142func contextDone(ctx context.Context) bool {
143	select {
144	case <-ctx.Done():
145		return true
146	default:
147		return false
148	}
149}
150
151// Upload starts the process of a resumable upload with a cancellable context.
152// It retries using the provided back off strategy until cancelled or the
153// strategy indicates to stop retrying.
154// It is called from the auto-generated API code and is not visible to the user.
155// Before sending an HTTP request, Upload calls any registered hook functions,
156// and calls the returned functions after the request returns (see send.go).
157// rx is private to the auto-generated API code.
158// Exactly one of resp or err will be nil.  If resp is non-nil, the caller must call resp.Body.Close.
159func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
160	var pause time.Duration
161	backoff := rx.Backoff
162	if backoff == nil {
163		backoff = DefaultBackoffStrategy()
164	}
165
166	for {
167		// Ensure that we return in the case of cancelled context, even if pause is 0.
168		if contextDone(ctx) {
169			return nil, ctx.Err()
170		}
171		select {
172		case <-ctx.Done():
173			return nil, ctx.Err()
174		case <-time.After(pause):
175		}
176
177		resp, err = rx.transferChunk(ctx)
178
179		var status int
180		if resp != nil {
181			status = resp.StatusCode
182		}
183
184		// Check if we should retry the request.
185		if shouldRetry(status, err) {
186			var retry bool
187			pause, retry = backoff.Pause()
188			if retry {
189				if resp != nil && resp.Body != nil {
190					resp.Body.Close()
191				}
192				continue
193			}
194		}
195
196		// If the chunk was uploaded successfully, but there's still
197		// more to go, upload the next chunk without any delay.
198		if statusResumeIncomplete(resp) {
199			pause = 0
200			backoff.Reset()
201			resp.Body.Close()
202			continue
203		}
204
205		// It's possible for err and resp to both be non-nil here, but we expose a simpler
206		// contract to our callers: exactly one of resp and err will be non-nil.  This means
207		// that any response body must be closed here before returning a non-nil error.
208		if err != nil {
209			if resp != nil && resp.Body != nil {
210				resp.Body.Close()
211			}
212			return nil, err
213		}
214
215		return resp, nil
216	}
217}
218