1// Copyright 2014 Google Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "encoding/base64" 19 "errors" 20 "fmt" 21 "io" 22 "unicode/utf8" 23 24 "golang.org/x/net/context" 25 "google.golang.org/api/googleapi" 26 raw "google.golang.org/api/storage/v1" 27) 28 29// A Writer writes a Cloud Storage object. 30type Writer struct { 31 // ObjectAttrs are optional attributes to set on the object. Any attributes 32 // must be initialized before the first Write call. Nil or zero-valued 33 // attributes are ignored. 34 ObjectAttrs 35 36 // SendCRC specifies whether to transmit a CRC32C field. It should be set 37 // to true in addition to setting the Writer's CRC32C field, because zero 38 // is a valid CRC and normally a zero would not be transmitted. 39 // If a CRC32C is sent, and the data written does not match the checksum, 40 // the write will be rejected. 41 SendCRC32C bool 42 43 // ChunkSize controls the maximum number of bytes of the object that the 44 // Writer will attempt to send to the server in a single request. Objects 45 // smaller than the size will be sent in a single request, while larger 46 // objects will be split over multiple requests. The size will be rounded up 47 // to the nearest multiple of 256K. If zero, chunking will be disabled and 48 // the object will be uploaded in a single request. 49 // 50 // ChunkSize will default to a reasonable value. Any custom configuration 51 // must be done before the first Write call. 52 ChunkSize int 53 54 // ProgressFunc can be used to monitor the progress of a large write. 55 // operation. If ProgressFunc is not nil and writing requires multiple 56 // calls to the underlying service (see 57 // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload), 58 // then ProgressFunc will be invoked after each call with the number of bytes of 59 // content copied so far. 60 // 61 // ProgressFunc should return quickly without blocking. 62 ProgressFunc func(int64) 63 64 ctx context.Context 65 o *ObjectHandle 66 67 opened bool 68 pw *io.PipeWriter 69 70 donec chan struct{} // closed after err and obj are set. 71 err error 72 obj *ObjectAttrs 73} 74 75func (w *Writer) open() error { 76 attrs := w.ObjectAttrs 77 // Check the developer didn't change the object Name (this is unfortunate, but 78 // we don't want to store an object under the wrong name). 79 if attrs.Name != w.o.object { 80 return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object) 81 } 82 if !utf8.ValidString(attrs.Name) { 83 return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name) 84 } 85 pr, pw := io.Pipe() 86 w.pw = pw 87 w.opened = true 88 89 if w.ChunkSize < 0 { 90 return errors.New("storage: Writer.ChunkSize must non-negative") 91 } 92 mediaOpts := []googleapi.MediaOption{ 93 googleapi.ChunkSize(w.ChunkSize), 94 } 95 if c := attrs.ContentType; c != "" { 96 mediaOpts = append(mediaOpts, googleapi.ContentType(c)) 97 } 98 99 go func() { 100 defer close(w.donec) 101 102 rawObj := attrs.toRawObject(w.o.bucket) 103 if w.SendCRC32C { 104 rawObj.Crc32c = encodeUint32(attrs.CRC32C) 105 } 106 if w.MD5 != nil { 107 rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5) 108 } 109 call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj). 110 Media(pr, mediaOpts...). 111 Projection("full"). 112 Context(w.ctx) 113 if w.ProgressFunc != nil { 114 call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) }) 115 } 116 if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil { 117 w.err = err 118 pr.CloseWithError(w.err) 119 return 120 } 121 var resp *raw.Object 122 err := applyConds("NewWriter", w.o.gen, w.o.conds, call) 123 if err == nil { 124 if w.o.userProject != "" { 125 call.UserProject(w.o.userProject) 126 } 127 setClientHeader(call.Header()) 128 // We will only retry here if the initial POST, which obtains a URI for 129 // the resumable upload, fails with a retryable error. The upload itself 130 // has its own retry logic. 131 err = runWithRetry(w.ctx, func() error { 132 var err2 error 133 resp, err2 = call.Do() 134 return err2 135 }) 136 } 137 if err != nil { 138 w.err = err 139 pr.CloseWithError(w.err) 140 return 141 } 142 w.obj = newObject(resp) 143 }() 144 return nil 145} 146 147// Write appends to w. It implements the io.Writer interface. 148// 149// Since writes happen asynchronously, Write may return a nil 150// error even though the write failed (or will fail). Always 151// use the error returned from Writer.Close to determine if 152// the upload was successful. 153func (w *Writer) Write(p []byte) (n int, err error) { 154 if w.err != nil { 155 return 0, w.err 156 } 157 if !w.opened { 158 if err := w.open(); err != nil { 159 return 0, err 160 } 161 } 162 return w.pw.Write(p) 163} 164 165// Close completes the write operation and flushes any buffered data. 166// If Close doesn't return an error, metadata about the written object 167// can be retrieved by calling Attrs. 168func (w *Writer) Close() error { 169 if !w.opened { 170 if err := w.open(); err != nil { 171 return err 172 } 173 } 174 if err := w.pw.Close(); err != nil { 175 return err 176 } 177 <-w.donec 178 return w.err 179} 180 181// CloseWithError aborts the write operation with the provided error. 182// CloseWithError always returns nil. 183func (w *Writer) CloseWithError(err error) error { 184 if !w.opened { 185 return nil 186 } 187 return w.pw.CloseWithError(err) 188} 189 190// Attrs returns metadata about a successfully-written object. 191// It's only valid to call it after Close returns nil. 192func (w *Writer) Attrs() *ObjectAttrs { 193 return w.obj 194} 195