1// Copyright 2016 Google Inc. All Rights Reserved. 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package storage 16 17import ( 18 "errors" 19 "fmt" 20 21 "golang.org/x/net/context" 22 raw "google.golang.org/api/storage/v1" 23) 24 25// CopierFrom creates a Copier that can copy src to dst. 26// You can immediately call Run on the returned Copier, or 27// you can configure it first. 28// 29// For Requester Pays buckets, the user project of dst is billed, unless it is empty, 30// in which case the user project of src is billed. 31func (dst *ObjectHandle) CopierFrom(src *ObjectHandle) *Copier { 32 return &Copier{dst: dst, src: src} 33} 34 35// A Copier copies a source object to a destination. 36type Copier struct { 37 // ObjectAttrs are optional attributes to set on the destination object. 38 // Any attributes must be initialized before any calls on the Copier. Nil 39 // or zero-valued attributes are ignored. 40 ObjectAttrs 41 42 // RewriteToken can be set before calling Run to resume a copy 43 // operation. After Run returns a non-nil error, RewriteToken will 44 // have been updated to contain the value needed to resume the copy. 45 RewriteToken string 46 47 // ProgressFunc can be used to monitor the progress of a multi-RPC copy 48 // operation. If ProgressFunc is not nil and copying requires multiple 49 // calls to the underlying service (see 50 // https://cloud.google.com/storage/docs/json_api/v1/objects/rewrite), then 51 // ProgressFunc will be invoked after each call with the number of bytes of 52 // content copied so far and the total size in bytes of the source object. 53 // 54 // ProgressFunc is intended to make upload progress available to the 55 // application. For example, the implementation of ProgressFunc may update 56 // a progress bar in the application's UI, or log the result of 57 // float64(copiedBytes)/float64(totalBytes). 58 // 59 // ProgressFunc should return quickly without blocking. 60 ProgressFunc func(copiedBytes, totalBytes uint64) 61 62 dst, src *ObjectHandle 63} 64 65// Run performs the copy. 66func (c *Copier) Run(ctx context.Context) (*ObjectAttrs, error) { 67 if err := c.src.validate(); err != nil { 68 return nil, err 69 } 70 if err := c.dst.validate(); err != nil { 71 return nil, err 72 } 73 // Convert destination attributes to raw form, omitting the bucket. 74 // If the bucket is included but name or content-type aren't, the service 75 // returns a 400 with "Required" as the only message. Omitting the bucket 76 // does not cause any problems. 77 rawObject := c.ObjectAttrs.toRawObject("") 78 for { 79 res, err := c.callRewrite(ctx, rawObject) 80 if err != nil { 81 return nil, err 82 } 83 if c.ProgressFunc != nil { 84 c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize)) 85 } 86 if res.Done { // Finished successfully. 87 return newObject(res.Resource), nil 88 } 89 } 90} 91 92func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) { 93 call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj) 94 95 call.Context(ctx).Projection("full") 96 if c.RewriteToken != "" { 97 call.RewriteToken(c.RewriteToken) 98 } 99 if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil { 100 return nil, err 101 } 102 if c.dst.userProject != "" { 103 call.UserProject(c.dst.userProject) 104 } else if c.src.userProject != "" { 105 call.UserProject(c.src.userProject) 106 } 107 if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil { 108 return nil, err 109 } 110 if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { 111 return nil, err 112 } 113 if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil { 114 return nil, err 115 } 116 var res *raw.RewriteResponse 117 var err error 118 setClientHeader(call.Header()) 119 err = runWithRetry(ctx, func() error { res, err = call.Do(); return err }) 120 if err != nil { 121 return nil, err 122 } 123 c.RewriteToken = res.RewriteToken 124 return res, nil 125} 126 127// ComposerFrom creates a Composer that can compose srcs into dst. 128// You can immediately call Run on the returned Composer, or you can 129// configure it first. 130// 131// The encryption key for the destination object will be used to decrypt all 132// source objects and encrypt the destination object. It is an error 133// to specify an encryption key for any of the source objects. 134func (dst *ObjectHandle) ComposerFrom(srcs ...*ObjectHandle) *Composer { 135 return &Composer{dst: dst, srcs: srcs} 136} 137 138// A Composer composes source objects into a destination object. 139// 140// For Requester Pays buckets, the user project of dst is billed. 141type Composer struct { 142 // ObjectAttrs are optional attributes to set on the destination object. 143 // Any attributes must be initialized before any calls on the Composer. Nil 144 // or zero-valued attributes are ignored. 145 ObjectAttrs 146 147 dst *ObjectHandle 148 srcs []*ObjectHandle 149} 150 151// Run performs the compose operation. 152func (c *Composer) Run(ctx context.Context) (*ObjectAttrs, error) { 153 if err := c.dst.validate(); err != nil { 154 return nil, err 155 } 156 if len(c.srcs) == 0 { 157 return nil, errors.New("storage: at least one source object must be specified") 158 } 159 160 req := &raw.ComposeRequest{} 161 // Compose requires a non-empty Destination, so we always set it, 162 // even if the caller-provided ObjectAttrs is the zero value. 163 req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket) 164 for _, src := range c.srcs { 165 if err := src.validate(); err != nil { 166 return nil, err 167 } 168 if src.bucket != c.dst.bucket { 169 return nil, fmt.Errorf("storage: all source objects must be in bucket %q, found %q", c.dst.bucket, src.bucket) 170 } 171 if src.encryptionKey != nil { 172 return nil, fmt.Errorf("storage: compose source %s.%s must not have encryption key", src.bucket, src.object) 173 } 174 srcObj := &raw.ComposeRequestSourceObjects{ 175 Name: src.object, 176 } 177 if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil { 178 return nil, err 179 } 180 req.SourceObjects = append(req.SourceObjects, srcObj) 181 } 182 183 call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx) 184 if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil { 185 return nil, err 186 } 187 if c.dst.userProject != "" { 188 call.UserProject(c.dst.userProject) 189 } 190 if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { 191 return nil, err 192 } 193 var obj *raw.Object 194 var err error 195 setClientHeader(call.Header()) 196 err = runWithRetry(ctx, func() error { obj, err = call.Do(); return err }) 197 if err != nil { 198 return nil, err 199 } 200 return newObject(obj), nil 201} 202