• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2014 Google Inc. All Rights Reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//      http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package storage
16
17import (
18	"encoding/base64"
19	"errors"
20	"fmt"
21	"io"
22	"unicode/utf8"
23
24	"golang.org/x/net/context"
25	"google.golang.org/api/googleapi"
26	raw "google.golang.org/api/storage/v1"
27)
28
29// A Writer writes a Cloud Storage object.
30type Writer struct {
31	// ObjectAttrs are optional attributes to set on the object. Any attributes
32	// must be initialized before the first Write call. Nil or zero-valued
33	// attributes are ignored.
34	ObjectAttrs
35
36	// SendCRC specifies whether to transmit a CRC32C field. It should be set
37	// to true in addition to setting the Writer's CRC32C field, because zero
38	// is a valid CRC and normally a zero would not be transmitted.
39	// If a CRC32C is sent, and the data written does not match the checksum,
40	// the write will be rejected.
41	SendCRC32C bool
42
43	// ChunkSize controls the maximum number of bytes of the object that the
44	// Writer will attempt to send to the server in a single request. Objects
45	// smaller than the size will be sent in a single request, while larger
46	// objects will be split over multiple requests. The size will be rounded up
47	// to the nearest multiple of 256K. If zero, chunking will be disabled and
48	// the object will be uploaded in a single request.
49	//
50	// ChunkSize will default to a reasonable value. Any custom configuration
51	// must be done before the first Write call.
52	ChunkSize int
53
54	// ProgressFunc can be used to monitor the progress of a large write.
55	// operation. If ProgressFunc is not nil and writing requires multiple
56	// calls to the underlying service (see
57	// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload),
58	// then ProgressFunc will be invoked after each call with the number of bytes of
59	// content copied so far.
60	//
61	// ProgressFunc should return quickly without blocking.
62	ProgressFunc func(int64)
63
64	ctx context.Context
65	o   *ObjectHandle
66
67	opened bool
68	pw     *io.PipeWriter
69
70	donec chan struct{} // closed after err and obj are set.
71	err   error
72	obj   *ObjectAttrs
73}
74
75func (w *Writer) open() error {
76	attrs := w.ObjectAttrs
77	// Check the developer didn't change the object Name (this is unfortunate, but
78	// we don't want to store an object under the wrong name).
79	if attrs.Name != w.o.object {
80		return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object)
81	}
82	if !utf8.ValidString(attrs.Name) {
83		return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name)
84	}
85	pr, pw := io.Pipe()
86	w.pw = pw
87	w.opened = true
88
89	if w.ChunkSize < 0 {
90		return errors.New("storage: Writer.ChunkSize must non-negative")
91	}
92	mediaOpts := []googleapi.MediaOption{
93		googleapi.ChunkSize(w.ChunkSize),
94	}
95	if c := attrs.ContentType; c != "" {
96		mediaOpts = append(mediaOpts, googleapi.ContentType(c))
97	}
98
99	go func() {
100		defer close(w.donec)
101
102		rawObj := attrs.toRawObject(w.o.bucket)
103		if w.SendCRC32C {
104			rawObj.Crc32c = encodeUint32(attrs.CRC32C)
105		}
106		if w.MD5 != nil {
107			rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5)
108		}
109		call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj).
110			Media(pr, mediaOpts...).
111			Projection("full").
112			Context(w.ctx)
113		if w.ProgressFunc != nil {
114			call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) })
115		}
116		if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil {
117			w.err = err
118			pr.CloseWithError(w.err)
119			return
120		}
121		var resp *raw.Object
122		err := applyConds("NewWriter", w.o.gen, w.o.conds, call)
123		if err == nil {
124			if w.o.userProject != "" {
125				call.UserProject(w.o.userProject)
126			}
127			setClientHeader(call.Header())
128			// We will only retry here if the initial POST, which obtains a URI for
129			// the resumable upload, fails with a retryable error. The upload itself
130			// has its own retry logic.
131			err = runWithRetry(w.ctx, func() error {
132				var err2 error
133				resp, err2 = call.Do()
134				return err2
135			})
136		}
137		if err != nil {
138			w.err = err
139			pr.CloseWithError(w.err)
140			return
141		}
142		w.obj = newObject(resp)
143	}()
144	return nil
145}
146
147// Write appends to w. It implements the io.Writer interface.
148//
149// Since writes happen asynchronously, Write may return a nil
150// error even though the write failed (or will fail). Always
151// use the error returned from Writer.Close to determine if
152// the upload was successful.
153func (w *Writer) Write(p []byte) (n int, err error) {
154	if w.err != nil {
155		return 0, w.err
156	}
157	if !w.opened {
158		if err := w.open(); err != nil {
159			return 0, err
160		}
161	}
162	return w.pw.Write(p)
163}
164
165// Close completes the write operation and flushes any buffered data.
166// If Close doesn't return an error, metadata about the written object
167// can be retrieved by calling Attrs.
168func (w *Writer) Close() error {
169	if !w.opened {
170		if err := w.open(); err != nil {
171			return err
172		}
173	}
174	if err := w.pw.Close(); err != nil {
175		return err
176	}
177	<-w.donec
178	return w.err
179}
180
181// CloseWithError aborts the write operation with the provided error.
182// CloseWithError always returns nil.
183func (w *Writer) CloseWithError(err error) error {
184	if !w.opened {
185		return nil
186	}
187	return w.pw.CloseWithError(err)
188}
189
190// Attrs returns metadata about a successfully-written object.
191// It's only valid to call it after Close returns nil.
192func (w *Writer) Attrs() *ObjectAttrs {
193	return w.obj
194}
195