• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 package dev.pigweed.pw_transfer;
16 
17 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
18 import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE;
19 
20 import com.google.common.util.concurrent.AbstractFuture;
21 import com.google.common.util.concurrent.SettableFuture;
22 import dev.pigweed.pw_log.Logger;
23 import dev.pigweed.pw_rpc.Status;
24 import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface;
25 import java.time.Duration;
26 import java.time.Instant;
27 import java.util.Locale;
28 import java.util.function.BooleanSupplier;
29 import java.util.function.Consumer;
30 
31 /** Base class for tracking the state of a read or write transfer. */
32 abstract class Transfer<T> extends AbstractFuture<T> {
33   private static final Logger logger = Logger.forClass(Transfer.class);
34 
35   // Largest nanosecond instant. Used to block indefinitely when no transfers are
36   // pending.
37   static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE);
38 
39   // Whether to output some particularly noisy logs.
40   static final boolean VERBOSE_LOGGING = false;
41 
42   private final int resourceId;
43   private final int sessionId;
44   private int offset;
45   private final ProtocolVersion desiredProtocolVersion;
46   private final TransferEventHandler.TransferInterface eventHandler;
47   private final TransferTimeoutSettings timeoutSettings;
48   private final Consumer<TransferProgress> progressCallback;
49   private final BooleanSupplier shouldAbortCallback;
50   private final Instant startTime;
51 
52   private ProtocolVersion configuredProtocolVersion = ProtocolVersion.UNKNOWN;
53   private Instant deadline = NO_TIMEOUT;
54   private State state;
55   private VersionedChunk lastChunkSent;
56 
57   private int lifetimeRetries = 0;
58 
59   /**
60    * Creates a new read or write transfer.
61    *
62    * @param resourceId             The resource ID of the transfer
63    * @param desiredProtocolVersion protocol version to request
64    * @param eventHandler           Interface to use to send a chunk.
65    * @param timeoutSettings        Timeout and retry settings for this transfer.
66    * @param progressCallback       Called each time a packet is sent.
67    * @param shouldAbortCallback    BooleanSupplier that returns true if a transfer
68    *                               should be aborted.
69    */
Transfer(int resourceId, int sessionId, ProtocolVersion desiredProtocolVersion, TransferInterface eventHandler, TransferTimeoutSettings timeoutSettings, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback, int initial_offset)70   Transfer(int resourceId,
71       int sessionId,
72       ProtocolVersion desiredProtocolVersion,
73       TransferInterface eventHandler,
74       TransferTimeoutSettings timeoutSettings,
75       Consumer<TransferProgress> progressCallback,
76       BooleanSupplier shouldAbortCallback,
77       int initial_offset) {
78     this.resourceId = resourceId;
79     this.sessionId = sessionId;
80     this.offset = initial_offset;
81     this.desiredProtocolVersion = desiredProtocolVersion;
82     this.eventHandler = eventHandler;
83 
84     this.timeoutSettings = timeoutSettings;
85     this.progressCallback = progressCallback;
86     this.shouldAbortCallback = shouldAbortCallback;
87 
88     // If the future is cancelled, tell the TransferEventHandler to cancel the
89     // transfer.
90     addListener(() -> {
91       if (isCancelled()) {
92         eventHandler.cancelTransfer(this);
93       }
94     }, directExecutor());
95 
96     if (desiredProtocolVersion == ProtocolVersion.LEGACY) {
97       // Legacy transfers skip protocol negotiation stage and use the resource ID as
98       // the session ID.
99       configuredProtocolVersion = ProtocolVersion.LEGACY;
100       state = getWaitingForDataState();
101     } else {
102       state = new Initiating();
103     }
104 
105     startTime = Instant.now();
106   }
107 
108   @Override
toString()109   public String toString() {
110     return String.format(Locale.ENGLISH,
111         "%s(%d:%d)[%s]",
112         this.getClass().getSimpleName(),
113         resourceId,
114         sessionId,
115         state.getClass().getSimpleName());
116   }
117 
getResourceId()118   public final int getResourceId() {
119     return resourceId;
120   }
121 
getSessionId()122   public final int getSessionId() {
123     return sessionId;
124   }
125 
getOffset()126   public final int getOffset() {
127     return offset;
128   }
129 
getDesiredProtocolVersion()130   final ProtocolVersion getDesiredProtocolVersion() {
131     return desiredProtocolVersion;
132   }
133 
134   /** Terminates the transfer without sending any packets. */
terminate(TransferError error)135   public final void terminate(TransferError error) {
136     changeState(new Completed(error));
137   }
138 
getDeadline()139   final Instant getDeadline() {
140     return deadline;
141   }
142 
setOffset(int offset)143   final void setOffset(int offset) {
144     this.offset = offset;
145   }
146 
setNextChunkTimeout()147   final void setNextChunkTimeout() {
148     deadline = Instant.now().plusMillis(timeoutSettings.timeoutMillis());
149   }
150 
setInitialTimeout()151   private void setInitialTimeout() {
152     deadline = Instant.now().plusMillis(timeoutSettings.initialTimeoutMillis());
153   }
154 
setTimeoutMicros(int timeoutMicros)155   final void setTimeoutMicros(int timeoutMicros) {
156     deadline = Instant.now().plusNanos((long) timeoutMicros * 1000);
157   }
158 
start()159   final void start() {
160     logger.atInfo().log(
161         "%s starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires",
162         this,
163         timeoutSettings.timeoutMillis(),
164         timeoutSettings.initialTimeoutMillis(),
165         timeoutSettings.maxRetries());
166     VersionedChunk.Builder chunk =
167         VersionedChunk.createInitialChunk(desiredProtocolVersion, resourceId, sessionId);
168     prepareInitialChunk(chunk);
169     try {
170       sendChunk(chunk.build());
171     } catch (TransferAbortedException e) {
172       return; // Sending failed, transfer is cancelled
173     }
174     setInitialTimeout();
175   }
176 
177   /** Processes an incoming chunk from the server. */
handleChunk(VersionedChunk chunk)178   final void handleChunk(VersionedChunk chunk) {
179     try {
180       if (chunk.type() == Chunk.Type.COMPLETION) {
181         state.handleFinalChunk(chunk.status().orElseGet(() -> {
182           logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL");
183           return Status.INTERNAL.code();
184         }));
185       } else {
186         state.handleDataChunk(chunk);
187       }
188     } catch (TransferAbortedException e) {
189       // Transfer was aborted; nothing else to do.
190     }
191   }
192 
handleTimeoutIfDeadlineExceeded()193   final void handleTimeoutIfDeadlineExceeded() {
194     if (Instant.now().isAfter(deadline)) {
195       try {
196         state.handleTimeout();
197       } catch (TransferAbortedException e) {
198         // Transfer was aborted; nothing else to do.
199       }
200     }
201   }
202 
handleTermination()203   final void handleTermination() {
204     state.handleTermination();
205   }
206 
handleCancellation()207   final void handleCancellation() {
208     state.handleCancellation();
209   }
210 
211   /** Returns the State to enter immediately after sending the first packet. */
getWaitingForDataState()212   abstract State getWaitingForDataState();
213 
prepareInitialChunk(VersionedChunk.Builder chunk)214   abstract void prepareInitialChunk(VersionedChunk.Builder chunk);
215 
216   /**
217    * Returns the chunk to send for a retry. Returns the initial chunk if no chunks
218    * have been sent.
219    */
getChunkForRetry()220   abstract VersionedChunk getChunkForRetry();
221 
222   /** Sets the result for the future after a successful transfer. */
setFutureResult()223   abstract void setFutureResult();
224 
newChunk(Chunk.Type type)225   final VersionedChunk.Builder newChunk(Chunk.Type type) {
226     return VersionedChunk.builder()
227         .setVersion(configuredProtocolVersion != ProtocolVersion.UNKNOWN ? configuredProtocolVersion
228                                                                          : desiredProtocolVersion)
229         .setType(type)
230         .setSessionId(sessionId)
231         .setResourceId(resourceId);
232   }
233 
getLastChunkSent()234   final VersionedChunk getLastChunkSent() {
235     return lastChunkSent;
236   }
237 
changeState(State newState)238   final State changeState(State newState) {
239     if (newState != state) {
240       logger.atFinest().log("%s state %s -> %s",
241           this,
242           state.getClass().getSimpleName(),
243           newState.getClass().getSimpleName());
244     }
245     state = newState;
246     return state;
247   }
248 
249   /** Exception thrown when the transfer is aborted. */
250   static class TransferAbortedException extends Exception {}
251 
252   /**
253    * Sends a chunk.
254    *
255    * If sending fails, the transfer cannot proceed. sendChunk() sets the state to
256    * completed and
257    * throws a TransferAbortedException.
258    */
sendChunk(VersionedChunk chunk)259   final void sendChunk(VersionedChunk chunk) throws TransferAbortedException {
260     lastChunkSent = chunk;
261     if (shouldAbortCallback.getAsBoolean()) {
262       logger.atWarning().log("Abort signal received.");
263       changeState(new Completed(new TransferError(this, Status.ABORTED)));
264       throw new TransferAbortedException();
265     }
266 
267     try {
268       if (VERBOSE_LOGGING) {
269         logger.atFinest().log("%s sending %s", this, chunk);
270       }
271       eventHandler.sendChunk(chunk.toMessage());
272     } catch (TransferError transferError) {
273       changeState(new Completed(transferError));
274       throw new TransferAbortedException();
275     }
276   }
277 
278   /** Sends a status chunk to the server and finishes the transfer. */
setStateTerminatingAndSendFinalChunk(Status status)279   final void setStateTerminatingAndSendFinalChunk(Status status) throws TransferAbortedException {
280     logger.atFine().log("%s sending final chunk with status %s", this, status);
281     sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build());
282     if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) {
283       changeState(new Terminating(status));
284     } else {
285       changeState(new Completed(status));
286     }
287   }
288 
289   /** Invokes the transfer progress callback and logs the progress. */
updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes)290   final void updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes) {
291     TransferProgress progress =
292         TransferProgress.create(bytesSent, bytesConfirmedReceived, totalSizeBytes);
293     progressCallback.accept(progress);
294 
295     long durationNanos = Duration.between(startTime, Instant.now()).toNanos();
296     long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos);
297 
298     logger.atFiner().log("%s progress: "
299             + "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s",
300         this,
301         progress.percentReceived(),
302         bytesSent,
303         bytesConfirmedReceived,
304         totalSizeBytes == UNKNOWN_TRANSFER_SIZE ? "unknown" : totalSizeBytes,
305         totalRate);
306   }
307 
308   interface State {
309     /**
310      * Called to handle a non-final chunk for this transfer.
311      */
handleDataChunk(VersionedChunk chunk)312     void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException;
313 
314     /**
315      * Called to handle the final chunk for this transfer.
316      */
handleFinalChunk(int statusCode)317     void handleFinalChunk(int statusCode) throws TransferAbortedException;
318 
319     /**
320      * Called when this transfer's deadline expires.
321      */
handleTimeout()322     void handleTimeout() throws TransferAbortedException;
323 
324     /**
325      * Called if the transfer is cancelled by the user.
326      */
handleCancellation()327     void handleCancellation();
328 
329     /**
330      * Called when the transfer thread is shutting down.
331      */
handleTermination()332     void handleTermination();
333   }
334 
335   /** Represents an active state in the transfer state machine. */
336   abstract class ActiveState implements State {
337     @Override
handleFinalChunk(int statusCode)338     public final void handleFinalChunk(int statusCode) throws TransferAbortedException {
339       Status status = Status.fromCode(statusCode);
340       if (status == null) {
341         logger.atWarning().log("Received invalid status value %d, using INTERNAL", statusCode);
342         status = Status.INTERNAL;
343       }
344 
345       // If this is not version 2, immediately clean up. If it is, send the
346       // COMPLETION_ACK first and
347       // clean up if that succeeded.
348       if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) {
349         sendChunk(newChunk(Chunk.Type.COMPLETION_ACK).build());
350       }
351       changeState(new Completed(status));
352     }
353 
354     /** Enters the recovery state and returns to this state if recovery succeeds. */
355     @Override
handleTimeout()356     public void handleTimeout() throws TransferAbortedException {
357       changeState(new TimeoutRecovery(this)).handleTimeout();
358     }
359 
360     @Override
handleCancellation()361     public final void handleCancellation() {
362       try {
363         setStateTerminatingAndSendFinalChunk(Status.CANCELLED);
364       } catch (TransferAbortedException e) {
365         // Transfer was aborted; nothing to do.
366       }
367     }
368 
369     @Override
handleTermination()370     public final void handleTermination() {
371       try {
372         setStateTerminatingAndSendFinalChunk(Status.ABORTED);
373       } catch (TransferAbortedException e) {
374         // Transfer was aborted; nothing to do.
375       }
376     }
377   }
378 
379   private class Initiating extends ActiveState {
380     @Override
handleDataChunk(VersionedChunk chunk)381     public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
382       if (chunk.version() == ProtocolVersion.UNKNOWN) {
383         logger.atWarning().log(
384             "%s aborting due to unsupported protocol version: %s", Transfer.this, chunk);
385         setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT);
386         return;
387       }
388 
389       changeState(getWaitingForDataState());
390 
391       if (chunk.type() != Chunk.Type.START_ACK) {
392         if (offset != 0) {
393           logger.atWarning().log(
394               "%s aborting due to unsupported non-zero offset transfer: %s", Transfer.this, chunk);
395           setStateTerminatingAndSendFinalChunk(Status.INTERNAL);
396           return;
397         }
398         logger.atFine().log(
399             "%s got non-handshake chunk; reverting to legacy protocol", Transfer.this);
400         configuredProtocolVersion = ProtocolVersion.LEGACY;
401         state.handleDataChunk(chunk);
402         return;
403       }
404 
405       if (chunk.version().compareTo(desiredProtocolVersion) <= 0) {
406         configuredProtocolVersion = chunk.version();
407       } else {
408         configuredProtocolVersion = desiredProtocolVersion;
409       }
410 
411       logger.atFine().log("%s negotiated protocol %s (ours=%s, theirs=%s)",
412           Transfer.this,
413           configuredProtocolVersion,
414           desiredProtocolVersion,
415           chunk.version());
416 
417       if (offset != chunk.initialOffset()) {
418         logger.atWarning().log(
419             "%s aborting due to unconfirmed non-zero offset transfer: %s", Transfer.this, chunk);
420         setStateTerminatingAndSendFinalChunk(Status.UNIMPLEMENTED);
421         return;
422       }
423 
424       VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION);
425       prepareInitialChunk(startAckConfirmation);
426       sendChunk(startAckConfirmation.build());
427     }
428   }
429 
430   /** Recovering from an expired timeout. */
431   class TimeoutRecovery extends ActiveState {
432     private final State nextState;
433     private int retries;
434 
TimeoutRecovery(State nextState)435     TimeoutRecovery(State nextState) {
436       this.nextState = nextState;
437     }
438 
439     @Override
handleDataChunk(VersionedChunk chunk)440     public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException {
441       changeState(nextState).handleDataChunk(chunk);
442     }
443 
444     @Override
handleTimeout()445     public void handleTimeout() throws TransferAbortedException {
446       // If the transfer timed out, skip to the completed state. Don't send any more
447       // packets.
448       if (retries >= timeoutSettings.maxRetries()) {
449         logger.atFine().log("%s exhausted its %d retries", Transfer.this, retries);
450         changeState(new Completed(Status.DEADLINE_EXCEEDED));
451         return;
452       }
453 
454       if (lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) {
455         logger.atFine().log("%s exhausted its %d lifetime retries", Transfer.this, retries);
456         changeState(new Completed(Status.DEADLINE_EXCEEDED));
457         return;
458       }
459 
460       logger.atFiner().log("%s received no chunks for %d ms; retrying %d/%d",
461           Transfer.this,
462           timeoutSettings.timeoutMillis(),
463           retries,
464           timeoutSettings.maxRetries());
465       sendChunk(getChunkForRetry());
466       retries += 1;
467       lifetimeRetries += 1;
468       setNextChunkTimeout();
469     }
470   }
471 
472   /**
473    * Transfer completed. Do nothing if the transfer is terminated or cancelled.
474    */
475   class Terminating extends ActiveState {
476     private final Status status;
477     private int retries;
478 
Terminating(Status status)479     Terminating(Status status) {
480       this.status = status;
481     }
482 
483     @Override
handleDataChunk(VersionedChunk chunk)484     public void handleDataChunk(VersionedChunk chunk) {
485       if (chunk.type() == Chunk.Type.COMPLETION_ACK) {
486         changeState(new Completed(status));
487       }
488     }
489 
490     @Override
handleTimeout()491     public void handleTimeout() throws TransferAbortedException {
492       if (retries >= timeoutSettings.maxRetries()
493           || lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) {
494         // Unlike the standard `TimeoutRecovery` state, a `Terminating` transfer should
495         // not fail due to a timeout if no completion ACK is received. It should
496         // instead complete with its existing status.
497         logger.atFine().log(
498             "%s exhausted its %d retries (lifetime %d)", Transfer.this, retries, lifetimeRetries);
499         changeState(new Completed(status));
500         return;
501       }
502 
503       logger.atFiner().log("%s did not receive completion ack for %d ms; retrying %d/%d",
504           Transfer.this,
505           timeoutSettings.timeoutMillis(),
506           retries,
507           timeoutSettings.maxRetries());
508       sendChunk(getChunkForRetry());
509       retries += 1;
510       lifetimeRetries += 1;
511       setNextChunkTimeout();
512     }
513   }
514 
515   class Completed implements State {
516     /**
517      * Performs final cleanup of a completed transfer. No packets are sent to the
518      * server.
519      */
Completed(Status status)520     Completed(Status status) {
521       cleanUp();
522       logger.atInfo().log("%s completed with status %s", Transfer.this, status);
523       if (status.ok()) {
524         setFutureResult();
525       } else {
526         setException(new TransferError(Transfer.this, status));
527       }
528     }
529 
530     /**
531      * Finishes the transfer due to an exception. No packets are sent to the server.
532      */
Completed(TransferError exception)533     Completed(TransferError exception) {
534       cleanUp();
535       logger.atWarning().withCause(exception).log("%s terminated with exception", Transfer.this);
536       setException(exception);
537     }
538 
cleanUp()539     private void cleanUp() {
540       deadline = NO_TIMEOUT;
541       eventHandler.unregisterTransfer(Transfer.this);
542     }
543 
544     @Override
handleDataChunk(VersionedChunk chunk)545     public void handleDataChunk(VersionedChunk chunk) {
546       logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this);
547     }
548 
549     @Override
handleFinalChunk(int statusCode)550     public void handleFinalChunk(int statusCode) {
551       logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this);
552     }
553 
554     @Override
handleTimeout()555     public void handleTimeout() {}
556 
557     @Override
handleTermination()558     public void handleTermination() {}
559 
560     @Override
handleCancellation()561     public void handleCancellation() {}
562   }
563 }
564