• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (C) 2024 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.tradefed.cache.remote;
18 
19 import static com.google.common.base.Preconditions.checkArgument;
20 
21 import build.bazel.remote.execution.v2.Digest;
22 import com.google.bytestream.ByteStreamGrpc;
23 import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
24 import com.google.bytestream.ByteStreamProto.WriteRequest;
25 import com.google.bytestream.ByteStreamProto.WriteResponse;
26 import com.google.common.annotations.VisibleForTesting;
27 import com.google.common.base.Strings;
28 import com.google.common.util.concurrent.Futures;
29 import com.google.common.util.concurrent.ListenableFuture;
30 import com.google.common.util.concurrent.MoreExecutors;
31 import com.google.common.util.concurrent.SettableFuture;
32 import com.google.protobuf.ByteString;
33 import io.grpc.CallCredentials;
34 import io.grpc.Channel;
35 import io.grpc.Status.Code;
36 import io.grpc.StatusRuntimeException;
37 import io.grpc.stub.ClientCallStreamObserver;
38 import io.grpc.stub.ClientResponseObserver;
39 import java.io.File;
40 import java.io.FileInputStream;
41 import java.io.IOException;
42 import java.time.Duration;
43 import java.util.concurrent.TimeUnit;
44 import java.util.UUID;
45 
46 /** A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. */
47 public class ByteStreamUploader {
48 
49     // Uses 16KB as the default chunk size that is also used by Bazel.
50     private static final int DEFAULT_CHUNK_SIZE = 1024 * 16;
51     private final String mInstanceName;
52     private final Channel mChannel;
53     private final CallCredentials mCallCredentials;
54     private final Duration mCallTimeout;
55     private final int mChunkSize;
56 
ByteStreamUploader( String instanceName, Channel channel, CallCredentials callCredentials, Duration callTimeout)57     public ByteStreamUploader(
58             String instanceName,
59             Channel channel,
60             CallCredentials callCredentials,
61             Duration callTimeout) {
62         this(instanceName, channel, callCredentials, callTimeout, DEFAULT_CHUNK_SIZE);
63     }
64 
65     @VisibleForTesting
ByteStreamUploader( String instanceName, Channel channel, CallCredentials callCredentials, Duration callTimeout, int chunkSize)66     ByteStreamUploader(
67             String instanceName,
68             Channel channel,
69             CallCredentials callCredentials,
70             Duration callTimeout,
71             int chunkSize) {
72         checkArgument(callTimeout.getSeconds() > 0, "callTimeout must be greater than 0.");
73         checkArgument(!Strings.isNullOrEmpty(instanceName), "instanceName must be specified.");
74         mInstanceName = instanceName;
75         mChannel = channel;
76         mCallCredentials = callCredentials;
77         mCallTimeout = callTimeout;
78         mChunkSize = chunkSize;
79     }
80 
81     /**
82      * Uploads a BLOB by the remote {@code ByteStream} service.
83      *
84      * @param digest the digest of the BLOB to upload.
85      * @param blob the BLOB to upload.
86      */
uploadBlob(Digest digest, ByteString blob)87     public ListenableFuture<Void> uploadBlob(Digest digest, ByteString blob) {
88         return uploadBlob(digest, new Chunker(blob.newInput(), digest.getSizeBytes(), mChunkSize));
89     }
90 
91     /**
92      * Uploads a file by the remote {@code ByteStream} service.
93      *
94      * @param digest the digest of the file to upload.
95      * @param file the file to upload.
96      */
uploadFile(Digest digest, File file)97     public ListenableFuture<Void> uploadFile(Digest digest, File file) {
98         try {
99             return uploadBlob(
100                     digest,
101                     new Chunker(new FileInputStream(file), digest.getSizeBytes(), mChunkSize));
102         } catch (IOException e) {
103             return Futures.immediateFailedFuture(e);
104         }
105     }
106 
uploadBlob(Digest digest, Chunker chunker)107     private ListenableFuture<Void> uploadBlob(Digest digest, Chunker chunker) {
108         String resourceName = getResourceName(digest);
109         return Futures.catchingAsync(
110                 Futures.transformAsync(
111                         write(resourceName, chunker),
112                         committedSize ->
113                                 committedSize == digest.getSizeBytes()
114                                         ? Futures.immediateVoidFuture()
115                                         : Futures.immediateFailedFuture(
116                                                 new IOException(
117                                                         String.format(
118                                                                 "write incomplete: committed_size"
119                                                                         + " %d for %d total - %s",
120                                                                 committedSize,
121                                                                 digest.getSizeBytes(),
122                                                                 resourceName))),
123                         MoreExecutors.directExecutor()),
124                 StatusRuntimeException.class,
125                 (sre) ->
126                         sre.getStatus().getCode() == Code.ALREADY_EXISTS
127                                 ? Futures.immediateVoidFuture()
128                                 : Futures.immediateFailedFuture(
129                                         new IOException(
130                                                 String.format(
131                                                         "Error while uploading artifact with digest"
132                                                                 + " '%s/%s'",
133                                                         digest.getHash(), digest.getSizeBytes()),
134                                                 sre)),
135                 MoreExecutors.directExecutor());
136     }
137 
write(String resourceName, Chunker chunker)138     private ListenableFuture<Long> write(String resourceName, Chunker chunker) {
139         SettableFuture<Long> uploadResult = SettableFuture.create();
140         bsAsyncStub().write(new Writer(resourceName, uploadResult, chunker));
141         return uploadResult;
142     }
143 
144     /** A writer used to stream the BLOB to the remote service and handle the response. */
145     private static final class Writer
146             implements ClientResponseObserver<WriteRequest, WriteResponse>, Runnable {
147         private final String mResourceName;
148         private final SettableFuture<Long> mUploadResult;
149         private final Chunker mChunker;
150         private ClientCallStreamObserver<WriteRequest> mRequestObserver;
151         private long mCommittedSize = -1;
152         private boolean mFirstRequest = true;
153         private boolean mFinishedWriting;
154 
Writer(String resourceName, SettableFuture<Long> uploadResult, Chunker chunker)155         private Writer(String resourceName, SettableFuture<Long> uploadResult, Chunker chunker) {
156             mResourceName = resourceName;
157             mUploadResult = uploadResult;
158             mChunker = chunker;
159         }
160 
161         @Override
beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver)162         public void beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver) {
163             mRequestObserver = requestObserver;
164             mUploadResult.addListener(
165                     () -> {
166                         if (mUploadResult.isCancelled()) {
167                             mRequestObserver.cancel("cancelled by user", null);
168                         }
169                     },
170                     MoreExecutors.directExecutor());
171             mRequestObserver.setOnReadyHandler(this);
172         }
173 
174         @Override
run()175         public void run() {
176             while (mRequestObserver.isReady()) {
177                 WriteRequest.Builder request = WriteRequest.newBuilder();
178                 if (mFirstRequest) {
179                     // Resource name only needs to be set on the first write for each file.
180                     request.setResourceName(mResourceName);
181                     mFirstRequest = false;
182                 }
183                 Chunker.Chunk chunk;
184                 try {
185                     chunk = mChunker.next();
186                 } catch (IOException e) {
187                     mRequestObserver.cancel("Failed to read next chunk.", e);
188                     return;
189                 }
190                 boolean isLastChunk = !mChunker.hasNext();
191                 mRequestObserver.onNext(
192                         request.setData(chunk.getData())
193                                 .setWriteOffset(chunk.getOffset())
194                                 .setFinishWrite(isLastChunk)
195                                 .build());
196                 if (isLastChunk) {
197                     mRequestObserver.onCompleted();
198                     mFinishedWriting = true;
199                 }
200             }
201         }
202 
203         @Override
onNext(WriteResponse response)204         public void onNext(WriteResponse response) {
205             mCommittedSize = response.getCommittedSize();
206         }
207 
208         @Override
onError(Throwable t)209         public void onError(Throwable t) {
210             mUploadResult.setException(t);
211         }
212 
213         @Override
onCompleted()214         public void onCompleted() {
215             // Server completed successfully before we finished writing all the data, meaning the
216             // blob already exists.
217             if (mFinishedWriting) {
218                 mRequestObserver.cancel("server has returned early", null);
219             }
220             mUploadResult.set(mCommittedSize);
221         }
222     }
223 
bsAsyncStub()224     private ByteStreamStub bsAsyncStub() {
225         return ByteStreamGrpc.newStub(mChannel)
226                 .withCallCredentials(mCallCredentials)
227                 .withDeadlineAfter(mCallTimeout.getSeconds(), TimeUnit.SECONDS);
228     }
229 
getResourceName(Digest digest)230     private String getResourceName(Digest digest) {
231         return String.format(
232                 "%s/uploads/%s/blobs/%s/%d",
233                 mInstanceName, UUID.randomUUID(), digest.getHash(), digest.getSizeBytes());
234     }
235 }
236