1 // Copyright 2024 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_transfer; 16 17 import com.google.common.util.concurrent.ListenableFuture; 18 import dev.pigweed.pw_rpc.MethodClient; 19 import java.util.function.BooleanSupplier; 20 import java.util.function.Consumer; 21 22 /** 23 * Manages ongoing pw_transfer data transfers. 24 * 25 * <p> 26 * Use TransferClient to send data to and receive data from a pw_transfer 27 * service running on a 28 * pw_rpc server. 29 */ 30 public class TransferClient { 31 public static final TransferParameters DEFAULT_READ_TRANSFER_PARAMETERS = 32 TransferParameters.create(32768, 1024, 0); 33 34 private final TransferTimeoutSettings settings; 35 private final BooleanSupplier shouldAbortCallback; 36 37 private final TransferEventHandler transferEventHandler; 38 private final Thread transferEventHandlerThread; 39 40 private ProtocolVersion desiredProtocolVersion = ProtocolVersion.VERSION_TWO; 41 42 /** 43 * Creates a new transfer client for sending and receiving data with 44 * pw_transfer. 45 * 46 * @param readMethod Method client for the pw.transfer.Transfer.Read method. 47 * @param writeMethod Method client for the pw.transfer.Transfer.Write method. 48 * @param settings Settings for timeouts and retries. 49 */ TransferClient( MethodClient readMethod, MethodClient writeMethod, TransferTimeoutSettings settings)50 public TransferClient( 51 MethodClient readMethod, MethodClient writeMethod, TransferTimeoutSettings settings) { 52 this(readMethod, writeMethod, settings, () -> false, TransferEventHandler::run); 53 } 54 55 /** 56 * Creates a new transfer client with a callback that can be used to terminate 57 * transfers. 58 * 59 * @param shouldAbortCallback BooleanSupplier that returns true if a transfer 60 * should be aborted. 61 */ TransferClient(MethodClient readMethod, MethodClient writeMethod, int transferTimeoutMillis, int initialTransferTimeoutMillis, int maxRetries, BooleanSupplier shouldAbortCallback)62 public TransferClient(MethodClient readMethod, 63 MethodClient writeMethod, 64 int transferTimeoutMillis, 65 int initialTransferTimeoutMillis, 66 int maxRetries, 67 BooleanSupplier shouldAbortCallback) { 68 this(readMethod, 69 writeMethod, 70 TransferTimeoutSettings.builder() 71 .setTimeoutMillis(transferTimeoutMillis) 72 .setInitialTimeoutMillis(initialTransferTimeoutMillis) 73 .setMaxRetries(maxRetries) 74 .build(), 75 shouldAbortCallback, 76 TransferEventHandler::run); 77 } 78 79 /** Constructor exposed to package for test use only. */ TransferClient(MethodClient readMethod, MethodClient writeMethod, TransferTimeoutSettings settings, BooleanSupplier shouldAbortCallback, Consumer<TransferEventHandler> runFunction)80 TransferClient(MethodClient readMethod, 81 MethodClient writeMethod, 82 TransferTimeoutSettings settings, 83 BooleanSupplier shouldAbortCallback, 84 Consumer<TransferEventHandler> runFunction) { 85 this.settings = settings; 86 this.shouldAbortCallback = shouldAbortCallback; 87 88 transferEventHandler = new TransferEventHandler(readMethod, writeMethod); 89 transferEventHandlerThread = new Thread(() -> runFunction.accept(transferEventHandler)); 90 transferEventHandlerThread.start(); 91 } 92 93 /** Writes the provided data to the given transfer resource. */ write(int resourceId, byte[] data)94 public ListenableFuture<Void> write(int resourceId, byte[] data) { 95 return write(resourceId, data, transferProgress -> {}, 0); 96 } 97 98 /** 99 * Writes the provided data to the given transfer resource, calling the progress 100 * callback as data 101 * is sent 102 */ write( int resourceId, byte[] data, Consumer<TransferProgress> progressCallback)103 public ListenableFuture<Void> write( 104 int resourceId, byte[] data, Consumer<TransferProgress> progressCallback) { 105 return write(resourceId, data, progressCallback, 0); 106 } 107 108 /** 109 * Writes the provided data to the given transfer resource, starting at the 110 * given initial offset 111 */ write(int resourceId, byte[] data, int initialOffset)112 public ListenableFuture<Void> write(int resourceId, byte[] data, int initialOffset) { 113 return write(resourceId, data, transferProgress -> {}, initialOffset); 114 } 115 116 /** 117 * Writes data to the specified transfer resource, calling the progress 118 * callback as data is sent. 119 * 120 * @param resourceId The ID of the resource to which to write 121 * @param data the data to write 122 * @param progressCallback called each time a packet is sent 123 * @param initialOffset The offset to start writing to on the server side 124 */ write( int resourceId, byte[] data, Consumer<TransferProgress> progressCallback, int initialOffset)125 public ListenableFuture<Void> write( 126 int resourceId, byte[] data, Consumer<TransferProgress> progressCallback, int initialOffset) { 127 return transferEventHandler.startWriteTransferAsClient(resourceId, 128 desiredProtocolVersion, 129 settings, 130 data, 131 progressCallback, 132 shouldAbortCallback, 133 initialOffset); 134 } 135 136 /** Reads the data from the given transfer resource ID. */ read(int resourceId)137 public ListenableFuture<byte[]> read(int resourceId) { 138 return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {}, 0); 139 } 140 141 /** 142 * Reads the data for a transfer resource, calling the progress callback as data 143 * is received. 144 */ read( int resourceId, Consumer<TransferProgress> progressCallback)145 public ListenableFuture<byte[]> read( 146 int resourceId, Consumer<TransferProgress> progressCallback) { 147 return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback, 0); 148 } 149 150 /** 151 * Reads the data for a transfer resource, using the specified transfer 152 * parameters. 153 */ read(int resourceId, TransferParameters parameters)154 public ListenableFuture<byte[]> read(int resourceId, TransferParameters parameters) { 155 return read(resourceId, parameters, (progressCallback) -> {}, 0); 156 } 157 158 /** 159 * Reads the data for a transfer resource, using the specified parameters and 160 * progress callback. 161 */ read( int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback)162 public ListenableFuture<byte[]> read( 163 int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback) { 164 return read(resourceId, parameters, progressCallback, 0); 165 } 166 167 /** Reads the data from the given transfer resource ID. */ read(int resourceId, int initialOffset)168 public ListenableFuture<byte[]> read(int resourceId, int initialOffset) { 169 return read( 170 resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {}, initialOffset); 171 } 172 173 /** 174 * Reads the data for a transfer resource, calling the progress callback as data 175 * is received. 176 */ read( int resourceId, Consumer<TransferProgress> progressCallback, int initialOffset)177 public ListenableFuture<byte[]> read( 178 int resourceId, Consumer<TransferProgress> progressCallback, int initialOffset) { 179 return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback, initialOffset); 180 } 181 182 /** 183 * Reads the data for a transfer resource, using the specified transfer 184 * parameters. 185 */ read( int resourceId, TransferParameters parameters, int initialOffset)186 public ListenableFuture<byte[]> read( 187 int resourceId, TransferParameters parameters, int initialOffset) { 188 return read(resourceId, parameters, (progressCallback) -> {}, initialOffset); 189 } 190 191 /** 192 * Reads the data for a transfer resource, using the specified parameters and 193 * progress callback. 194 * 195 * @param resourceId The ID of the resource to which to read 196 * @param parameters The transfer parameters to use 197 * @param progressCallback called each time a packet is sent 198 * @param initialOffset The offset to start reading from on the server side 199 */ read(int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback, int initialOffset)200 public ListenableFuture<byte[]> read(int resourceId, 201 TransferParameters parameters, 202 Consumer<TransferProgress> progressCallback, 203 int initialOffset) { 204 return transferEventHandler.startReadTransferAsClient(resourceId, 205 desiredProtocolVersion, 206 settings, 207 parameters, 208 progressCallback, 209 shouldAbortCallback, 210 initialOffset); 211 } 212 213 /** 214 * Sets the protocol version to request for future transfers 215 * 216 * Does not affect ongoing transfers. Version cannot be set to UNKNOWN! 217 * 218 * @throws IllegalArgumentException if the protocol version is UNKNOWN 219 */ setProtocolVersion(ProtocolVersion version)220 public void setProtocolVersion(ProtocolVersion version) { 221 if (version == ProtocolVersion.UNKNOWN) { 222 throw new IllegalArgumentException("Cannot set protocol version to UNKNOWN!"); 223 } 224 desiredProtocolVersion = version; 225 } 226 227 /** Stops the background thread and waits until it terminates. */ close()228 public void close() throws InterruptedException { 229 transferEventHandler.stop(); 230 transferEventHandlerThread.join(); 231 } 232 233 // Functions for test use only. 234 // TODO: b/279808806 - These could be annotated with test-only visibility. 235 waitUntilEventsAreProcessedForTest()236 final void waitUntilEventsAreProcessedForTest() { 237 transferEventHandler.waitUntilEventsAreProcessedForTest(); 238 } 239 getNextSessionIdForTest()240 final int getNextSessionIdForTest() { 241 return transferEventHandler.getNextSessionIdForTest(); 242 } 243 getWriteTransferForTest(ListenableFuture<?> transferFuture)244 final WriteTransfer getWriteTransferForTest(ListenableFuture<?> transferFuture) { 245 return (WriteTransfer) transferFuture; 246 } 247 getReadTransferForTest(ListenableFuture<?> transferFuture)248 final ReadTransfer getReadTransferForTest(ListenableFuture<?> transferFuture) { 249 return (ReadTransfer) transferFuture; 250 } 251 } 252