• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2024 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_transfer;
16 
17 import com.google.common.util.concurrent.ListenableFuture;
18 import dev.pigweed.pw_rpc.MethodClient;
19 import java.util.function.BooleanSupplier;
20 import java.util.function.Consumer;
21 
22 /**
23  * Manages ongoing pw_transfer data transfers.
24  *
25  * <p>
26  * Use TransferClient to send data to and receive data from a pw_transfer
27  * service running on a
28  * pw_rpc server.
29  */
30 public class TransferClient {
31   public static final TransferParameters DEFAULT_READ_TRANSFER_PARAMETERS =
32       TransferParameters.create(32768, 1024, 0);
33 
34   private final TransferTimeoutSettings settings;
35   private final BooleanSupplier shouldAbortCallback;
36 
37   private final TransferEventHandler transferEventHandler;
38   private final Thread transferEventHandlerThread;
39 
40   private ProtocolVersion desiredProtocolVersion = ProtocolVersion.VERSION_TWO;
41 
42   /**
43    * Creates a new transfer client for sending and receiving data with
44    * pw_transfer.
45    *
46    * @param readMethod  Method client for the pw.transfer.Transfer.Read method.
47    * @param writeMethod Method client for the pw.transfer.Transfer.Write method.
48    * @param settings    Settings for timeouts and retries.
49    */
TransferClient( MethodClient readMethod, MethodClient writeMethod, TransferTimeoutSettings settings)50   public TransferClient(
51       MethodClient readMethod, MethodClient writeMethod, TransferTimeoutSettings settings) {
52     this(readMethod, writeMethod, settings, () -> false, TransferEventHandler::run);
53   }
54 
55   /**
56    * Creates a new transfer client with a callback that can be used to terminate
57    * transfers.
58    *
59    * @param shouldAbortCallback BooleanSupplier that returns true if a transfer
60    *                            should be aborted.
61    */
TransferClient(MethodClient readMethod, MethodClient writeMethod, int transferTimeoutMillis, int initialTransferTimeoutMillis, int maxRetries, BooleanSupplier shouldAbortCallback)62   public TransferClient(MethodClient readMethod,
63       MethodClient writeMethod,
64       int transferTimeoutMillis,
65       int initialTransferTimeoutMillis,
66       int maxRetries,
67       BooleanSupplier shouldAbortCallback) {
68     this(readMethod,
69         writeMethod,
70         TransferTimeoutSettings.builder()
71             .setTimeoutMillis(transferTimeoutMillis)
72             .setInitialTimeoutMillis(initialTransferTimeoutMillis)
73             .setMaxRetries(maxRetries)
74             .build(),
75         shouldAbortCallback,
76         TransferEventHandler::run);
77   }
78 
79   /** Constructor exposed to package for test use only. */
TransferClient(MethodClient readMethod, MethodClient writeMethod, TransferTimeoutSettings settings, BooleanSupplier shouldAbortCallback, Consumer<TransferEventHandler> runFunction)80   TransferClient(MethodClient readMethod,
81       MethodClient writeMethod,
82       TransferTimeoutSettings settings,
83       BooleanSupplier shouldAbortCallback,
84       Consumer<TransferEventHandler> runFunction) {
85     this.settings = settings;
86     this.shouldAbortCallback = shouldAbortCallback;
87 
88     transferEventHandler = new TransferEventHandler(readMethod, writeMethod);
89     transferEventHandlerThread = new Thread(() -> runFunction.accept(transferEventHandler));
90     transferEventHandlerThread.start();
91   }
92 
93   /** Writes the provided data to the given transfer resource. */
write(int resourceId, byte[] data)94   public ListenableFuture<Void> write(int resourceId, byte[] data) {
95     return write(resourceId, data, transferProgress -> {}, 0);
96   }
97 
98   /**
99    * Writes the provided data to the given transfer resource, calling the progress
100    * callback as data
101    * is sent
102    */
write( int resourceId, byte[] data, Consumer<TransferProgress> progressCallback)103   public ListenableFuture<Void> write(
104       int resourceId, byte[] data, Consumer<TransferProgress> progressCallback) {
105     return write(resourceId, data, progressCallback, 0);
106   }
107 
108   /**
109    * Writes the provided data to the given transfer resource, starting at the
110    * given initial offset
111    */
write(int resourceId, byte[] data, int initialOffset)112   public ListenableFuture<Void> write(int resourceId, byte[] data, int initialOffset) {
113     return write(resourceId, data, transferProgress -> {}, initialOffset);
114   }
115 
116   /**
117    * Writes data to the specified transfer resource, calling the progress
118    * callback as data is sent.
119    *
120    * @param resourceId       The ID of the resource to which to write
121    * @param data             the data to write
122    * @param progressCallback called each time a packet is sent
123    * @param initialOffset    The offset to start writing to on the server side
124    */
write( int resourceId, byte[] data, Consumer<TransferProgress> progressCallback, int initialOffset)125   public ListenableFuture<Void> write(
126       int resourceId, byte[] data, Consumer<TransferProgress> progressCallback, int initialOffset) {
127     return transferEventHandler.startWriteTransferAsClient(resourceId,
128         desiredProtocolVersion,
129         settings,
130         data,
131         progressCallback,
132         shouldAbortCallback,
133         initialOffset);
134   }
135 
136   /** Reads the data from the given transfer resource ID. */
read(int resourceId)137   public ListenableFuture<byte[]> read(int resourceId) {
138     return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {}, 0);
139   }
140 
141   /**
142    * Reads the data for a transfer resource, calling the progress callback as data
143    * is received.
144    */
read( int resourceId, Consumer<TransferProgress> progressCallback)145   public ListenableFuture<byte[]> read(
146       int resourceId, Consumer<TransferProgress> progressCallback) {
147     return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback, 0);
148   }
149 
150   /**
151    * Reads the data for a transfer resource, using the specified transfer
152    * parameters.
153    */
read(int resourceId, TransferParameters parameters)154   public ListenableFuture<byte[]> read(int resourceId, TransferParameters parameters) {
155     return read(resourceId, parameters, (progressCallback) -> {}, 0);
156   }
157 
158   /**
159    * Reads the data for a transfer resource, using the specified parameters and
160    * progress callback.
161    */
read( int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback)162   public ListenableFuture<byte[]> read(
163       int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback) {
164     return read(resourceId, parameters, progressCallback, 0);
165   }
166 
167   /** Reads the data from the given transfer resource ID. */
read(int resourceId, int initialOffset)168   public ListenableFuture<byte[]> read(int resourceId, int initialOffset) {
169     return read(
170         resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback -> {}, initialOffset);
171   }
172 
173   /**
174    * Reads the data for a transfer resource, calling the progress callback as data
175    * is received.
176    */
read( int resourceId, Consumer<TransferProgress> progressCallback, int initialOffset)177   public ListenableFuture<byte[]> read(
178       int resourceId, Consumer<TransferProgress> progressCallback, int initialOffset) {
179     return read(resourceId, DEFAULT_READ_TRANSFER_PARAMETERS, progressCallback, initialOffset);
180   }
181 
182   /**
183    * Reads the data for a transfer resource, using the specified transfer
184    * parameters.
185    */
read( int resourceId, TransferParameters parameters, int initialOffset)186   public ListenableFuture<byte[]> read(
187       int resourceId, TransferParameters parameters, int initialOffset) {
188     return read(resourceId, parameters, (progressCallback) -> {}, initialOffset);
189   }
190 
191   /**
192    * Reads the data for a transfer resource, using the specified parameters and
193    * progress callback.
194    *
195    * @param resourceId       The ID of the resource to which to read
196    * @param parameters       The transfer parameters to use
197    * @param progressCallback called each time a packet is sent
198    * @param initialOffset    The offset to start reading from on the server side
199    */
read(int resourceId, TransferParameters parameters, Consumer<TransferProgress> progressCallback, int initialOffset)200   public ListenableFuture<byte[]> read(int resourceId,
201       TransferParameters parameters,
202       Consumer<TransferProgress> progressCallback,
203       int initialOffset) {
204     return transferEventHandler.startReadTransferAsClient(resourceId,
205         desiredProtocolVersion,
206         settings,
207         parameters,
208         progressCallback,
209         shouldAbortCallback,
210         initialOffset);
211   }
212 
213   /**
214    * Sets the protocol version to request for future transfers
215    *
216    * Does not affect ongoing transfers. Version cannot be set to UNKNOWN!
217    *
218    * @throws IllegalArgumentException if the protocol version is UNKNOWN
219    */
setProtocolVersion(ProtocolVersion version)220   public void setProtocolVersion(ProtocolVersion version) {
221     if (version == ProtocolVersion.UNKNOWN) {
222       throw new IllegalArgumentException("Cannot set protocol version to UNKNOWN!");
223     }
224     desiredProtocolVersion = version;
225   }
226 
227   /** Stops the background thread and waits until it terminates. */
close()228   public void close() throws InterruptedException {
229     transferEventHandler.stop();
230     transferEventHandlerThread.join();
231   }
232 
233   // Functions for test use only.
234   // TODO: b/279808806 - These could be annotated with test-only visibility.
235 
waitUntilEventsAreProcessedForTest()236   final void waitUntilEventsAreProcessedForTest() {
237     transferEventHandler.waitUntilEventsAreProcessedForTest();
238   }
239 
getNextSessionIdForTest()240   final int getNextSessionIdForTest() {
241     return transferEventHandler.getNextSessionIdForTest();
242   }
243 
getWriteTransferForTest(ListenableFuture<?> transferFuture)244   final WriteTransfer getWriteTransferForTest(ListenableFuture<?> transferFuture) {
245     return (WriteTransfer) transferFuture;
246   }
247 
getReadTransferForTest(ListenableFuture<?> transferFuture)248   final ReadTransfer getReadTransferForTest(ListenableFuture<?> transferFuture) {
249     return (ReadTransfer) transferFuture;
250   }
251 }
252