1 /* 2 * Copyright (C) 2024 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.tradefed.cache.remote; 18 19 import static com.google.common.base.Preconditions.checkArgument; 20 21 import build.bazel.remote.execution.v2.Digest; 22 import com.google.bytestream.ByteStreamGrpc; 23 import com.google.bytestream.ByteStreamGrpc.ByteStreamStub; 24 import com.google.bytestream.ByteStreamProto.WriteRequest; 25 import com.google.bytestream.ByteStreamProto.WriteResponse; 26 import com.google.common.annotations.VisibleForTesting; 27 import com.google.common.base.Strings; 28 import com.google.common.util.concurrent.Futures; 29 import com.google.common.util.concurrent.ListenableFuture; 30 import com.google.common.util.concurrent.MoreExecutors; 31 import com.google.common.util.concurrent.SettableFuture; 32 import com.google.protobuf.ByteString; 33 import io.grpc.CallCredentials; 34 import io.grpc.Channel; 35 import io.grpc.Status.Code; 36 import io.grpc.StatusRuntimeException; 37 import io.grpc.stub.ClientCallStreamObserver; 38 import io.grpc.stub.ClientResponseObserver; 39 import java.io.File; 40 import java.io.FileInputStream; 41 import java.io.IOException; 42 import java.time.Duration; 43 import java.util.concurrent.TimeUnit; 44 import java.util.UUID; 45 46 /** A client implementing the {@code Write} method of the {@code ByteStream} gRPC service. */ 47 public class ByteStreamUploader { 48 49 // Uses 16KB as the default chunk size that is also used by Bazel. 50 private static final int DEFAULT_CHUNK_SIZE = 1024 * 16; 51 private final String mInstanceName; 52 private final Channel mChannel; 53 private final CallCredentials mCallCredentials; 54 private final Duration mCallTimeout; 55 private final int mChunkSize; 56 ByteStreamUploader( String instanceName, Channel channel, CallCredentials callCredentials, Duration callTimeout)57 public ByteStreamUploader( 58 String instanceName, 59 Channel channel, 60 CallCredentials callCredentials, 61 Duration callTimeout) { 62 this(instanceName, channel, callCredentials, callTimeout, DEFAULT_CHUNK_SIZE); 63 } 64 65 @VisibleForTesting ByteStreamUploader( String instanceName, Channel channel, CallCredentials callCredentials, Duration callTimeout, int chunkSize)66 ByteStreamUploader( 67 String instanceName, 68 Channel channel, 69 CallCredentials callCredentials, 70 Duration callTimeout, 71 int chunkSize) { 72 checkArgument(callTimeout.getSeconds() > 0, "callTimeout must be greater than 0."); 73 checkArgument(!Strings.isNullOrEmpty(instanceName), "instanceName must be specified."); 74 mInstanceName = instanceName; 75 mChannel = channel; 76 mCallCredentials = callCredentials; 77 mCallTimeout = callTimeout; 78 mChunkSize = chunkSize; 79 } 80 81 /** 82 * Uploads a BLOB by the remote {@code ByteStream} service. 83 * 84 * @param digest the digest of the BLOB to upload. 85 * @param blob the BLOB to upload. 86 */ uploadBlob(Digest digest, ByteString blob)87 public ListenableFuture<Void> uploadBlob(Digest digest, ByteString blob) { 88 return uploadBlob(digest, new Chunker(blob.newInput(), digest.getSizeBytes(), mChunkSize)); 89 } 90 91 /** 92 * Uploads a file by the remote {@code ByteStream} service. 93 * 94 * @param digest the digest of the file to upload. 95 * @param file the file to upload. 96 */ uploadFile(Digest digest, File file)97 public ListenableFuture<Void> uploadFile(Digest digest, File file) { 98 try { 99 return uploadBlob( 100 digest, 101 new Chunker(new FileInputStream(file), digest.getSizeBytes(), mChunkSize)); 102 } catch (IOException e) { 103 return Futures.immediateFailedFuture(e); 104 } 105 } 106 uploadBlob(Digest digest, Chunker chunker)107 private ListenableFuture<Void> uploadBlob(Digest digest, Chunker chunker) { 108 String resourceName = getResourceName(digest); 109 return Futures.catchingAsync( 110 Futures.transformAsync( 111 write(resourceName, chunker), 112 committedSize -> 113 committedSize == digest.getSizeBytes() 114 ? Futures.immediateVoidFuture() 115 : Futures.immediateFailedFuture( 116 new IOException( 117 String.format( 118 "write incomplete: committed_size" 119 + " %d for %d total - %s", 120 committedSize, 121 digest.getSizeBytes(), 122 resourceName))), 123 MoreExecutors.directExecutor()), 124 StatusRuntimeException.class, 125 (sre) -> 126 sre.getStatus().getCode() == Code.ALREADY_EXISTS 127 ? Futures.immediateVoidFuture() 128 : Futures.immediateFailedFuture( 129 new IOException( 130 String.format( 131 "Error while uploading artifact with digest" 132 + " '%s/%s'", 133 digest.getHash(), digest.getSizeBytes()), 134 sre)), 135 MoreExecutors.directExecutor()); 136 } 137 write(String resourceName, Chunker chunker)138 private ListenableFuture<Long> write(String resourceName, Chunker chunker) { 139 SettableFuture<Long> uploadResult = SettableFuture.create(); 140 bsAsyncStub().write(new Writer(resourceName, uploadResult, chunker)); 141 return uploadResult; 142 } 143 144 /** A writer used to stream the BLOB to the remote service and handle the response. */ 145 private static final class Writer 146 implements ClientResponseObserver<WriteRequest, WriteResponse>, Runnable { 147 private final String mResourceName; 148 private final SettableFuture<Long> mUploadResult; 149 private final Chunker mChunker; 150 private ClientCallStreamObserver<WriteRequest> mRequestObserver; 151 private long mCommittedSize = -1; 152 private boolean mFirstRequest = true; 153 private boolean mFinishedWriting; 154 Writer(String resourceName, SettableFuture<Long> uploadResult, Chunker chunker)155 private Writer(String resourceName, SettableFuture<Long> uploadResult, Chunker chunker) { 156 mResourceName = resourceName; 157 mUploadResult = uploadResult; 158 mChunker = chunker; 159 } 160 161 @Override beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver)162 public void beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver) { 163 mRequestObserver = requestObserver; 164 mUploadResult.addListener( 165 () -> { 166 if (mUploadResult.isCancelled()) { 167 mRequestObserver.cancel("cancelled by user", null); 168 } 169 }, 170 MoreExecutors.directExecutor()); 171 mRequestObserver.setOnReadyHandler(this); 172 } 173 174 @Override run()175 public void run() { 176 while (mRequestObserver.isReady()) { 177 WriteRequest.Builder request = WriteRequest.newBuilder(); 178 if (mFirstRequest) { 179 // Resource name only needs to be set on the first write for each file. 180 request.setResourceName(mResourceName); 181 mFirstRequest = false; 182 } 183 Chunker.Chunk chunk; 184 try { 185 chunk = mChunker.next(); 186 } catch (IOException e) { 187 mRequestObserver.cancel("Failed to read next chunk.", e); 188 return; 189 } 190 boolean isLastChunk = !mChunker.hasNext(); 191 mRequestObserver.onNext( 192 request.setData(chunk.getData()) 193 .setWriteOffset(chunk.getOffset()) 194 .setFinishWrite(isLastChunk) 195 .build()); 196 if (isLastChunk) { 197 mRequestObserver.onCompleted(); 198 mFinishedWriting = true; 199 } 200 } 201 } 202 203 @Override onNext(WriteResponse response)204 public void onNext(WriteResponse response) { 205 mCommittedSize = response.getCommittedSize(); 206 } 207 208 @Override onError(Throwable t)209 public void onError(Throwable t) { 210 mUploadResult.setException(t); 211 } 212 213 @Override onCompleted()214 public void onCompleted() { 215 // Server completed successfully before we finished writing all the data, meaning the 216 // blob already exists. 217 if (mFinishedWriting) { 218 mRequestObserver.cancel("server has returned early", null); 219 } 220 mUploadResult.set(mCommittedSize); 221 } 222 } 223 bsAsyncStub()224 private ByteStreamStub bsAsyncStub() { 225 return ByteStreamGrpc.newStub(mChannel) 226 .withCallCredentials(mCallCredentials) 227 .withDeadlineAfter(mCallTimeout.getSeconds(), TimeUnit.SECONDS); 228 } 229 getResourceName(Digest digest)230 private String getResourceName(Digest digest) { 231 return String.format( 232 "%s/uploads/%s/blobs/%s/%d", 233 mInstanceName, UUID.randomUUID(), digest.getHash(), digest.getSizeBytes()); 234 } 235 } 236