1 // Copyright 2022 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 package dev.pigweed.pw_transfer; 16 17 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 18 import static dev.pigweed.pw_transfer.TransferProgress.UNKNOWN_TRANSFER_SIZE; 19 20 import com.google.common.util.concurrent.AbstractFuture; 21 import com.google.common.util.concurrent.SettableFuture; 22 import dev.pigweed.pw_log.Logger; 23 import dev.pigweed.pw_rpc.Status; 24 import dev.pigweed.pw_transfer.TransferEventHandler.TransferInterface; 25 import java.time.Duration; 26 import java.time.Instant; 27 import java.util.Locale; 28 import java.util.function.BooleanSupplier; 29 import java.util.function.Consumer; 30 31 /** Base class for tracking the state of a read or write transfer. */ 32 abstract class Transfer<T> extends AbstractFuture<T> { 33 private static final Logger logger = Logger.forClass(Transfer.class); 34 35 // Largest nanosecond instant. Used to block indefinitely when no transfers are 36 // pending. 37 static final Instant NO_TIMEOUT = Instant.ofEpochSecond(0, Long.MAX_VALUE); 38 39 // Whether to output some particularly noisy logs. 40 static final boolean VERBOSE_LOGGING = false; 41 42 private final int resourceId; 43 private final int sessionId; 44 private int offset; 45 private final ProtocolVersion desiredProtocolVersion; 46 private final TransferEventHandler.TransferInterface eventHandler; 47 private final TransferTimeoutSettings timeoutSettings; 48 private final Consumer<TransferProgress> progressCallback; 49 private final BooleanSupplier shouldAbortCallback; 50 private final Instant startTime; 51 52 private ProtocolVersion configuredProtocolVersion = ProtocolVersion.UNKNOWN; 53 private Instant deadline = NO_TIMEOUT; 54 private State state; 55 private VersionedChunk lastChunkSent; 56 57 private int lifetimeRetries = 0; 58 59 /** 60 * Creates a new read or write transfer. 61 * 62 * @param resourceId The resource ID of the transfer 63 * @param desiredProtocolVersion protocol version to request 64 * @param eventHandler Interface to use to send a chunk. 65 * @param timeoutSettings Timeout and retry settings for this transfer. 66 * @param progressCallback Called each time a packet is sent. 67 * @param shouldAbortCallback BooleanSupplier that returns true if a transfer 68 * should be aborted. 69 */ Transfer(int resourceId, int sessionId, ProtocolVersion desiredProtocolVersion, TransferInterface eventHandler, TransferTimeoutSettings timeoutSettings, Consumer<TransferProgress> progressCallback, BooleanSupplier shouldAbortCallback, int initial_offset)70 Transfer(int resourceId, 71 int sessionId, 72 ProtocolVersion desiredProtocolVersion, 73 TransferInterface eventHandler, 74 TransferTimeoutSettings timeoutSettings, 75 Consumer<TransferProgress> progressCallback, 76 BooleanSupplier shouldAbortCallback, 77 int initial_offset) { 78 this.resourceId = resourceId; 79 this.sessionId = sessionId; 80 this.offset = initial_offset; 81 this.desiredProtocolVersion = desiredProtocolVersion; 82 this.eventHandler = eventHandler; 83 84 this.timeoutSettings = timeoutSettings; 85 this.progressCallback = progressCallback; 86 this.shouldAbortCallback = shouldAbortCallback; 87 88 // If the future is cancelled, tell the TransferEventHandler to cancel the 89 // transfer. 90 addListener(() -> { 91 if (isCancelled()) { 92 eventHandler.cancelTransfer(this); 93 } 94 }, directExecutor()); 95 96 if (desiredProtocolVersion == ProtocolVersion.LEGACY) { 97 // Legacy transfers skip protocol negotiation stage and use the resource ID as 98 // the session ID. 99 configuredProtocolVersion = ProtocolVersion.LEGACY; 100 state = getWaitingForDataState(); 101 } else { 102 state = new Initiating(); 103 } 104 105 startTime = Instant.now(); 106 } 107 108 @Override toString()109 public String toString() { 110 return String.format(Locale.ENGLISH, 111 "%s(%d:%d)[%s]", 112 this.getClass().getSimpleName(), 113 resourceId, 114 sessionId, 115 state.getClass().getSimpleName()); 116 } 117 getResourceId()118 public final int getResourceId() { 119 return resourceId; 120 } 121 getSessionId()122 public final int getSessionId() { 123 return sessionId; 124 } 125 getOffset()126 public final int getOffset() { 127 return offset; 128 } 129 getDesiredProtocolVersion()130 final ProtocolVersion getDesiredProtocolVersion() { 131 return desiredProtocolVersion; 132 } 133 134 /** Terminates the transfer without sending any packets. */ terminate(TransferError error)135 public final void terminate(TransferError error) { 136 changeState(new Completed(error)); 137 } 138 getDeadline()139 final Instant getDeadline() { 140 return deadline; 141 } 142 setOffset(int offset)143 final void setOffset(int offset) { 144 this.offset = offset; 145 } 146 setNextChunkTimeout()147 final void setNextChunkTimeout() { 148 deadline = Instant.now().plusMillis(timeoutSettings.timeoutMillis()); 149 } 150 setInitialTimeout()151 private void setInitialTimeout() { 152 deadline = Instant.now().plusMillis(timeoutSettings.initialTimeoutMillis()); 153 } 154 setTimeoutMicros(int timeoutMicros)155 final void setTimeoutMicros(int timeoutMicros) { 156 deadline = Instant.now().plusNanos((long) timeoutMicros * 1000); 157 } 158 start()159 final void start() { 160 logger.atInfo().log( 161 "%s starting with parameters: default timeout %d ms, initial timeout %d ms, %d max retires", 162 this, 163 timeoutSettings.timeoutMillis(), 164 timeoutSettings.initialTimeoutMillis(), 165 timeoutSettings.maxRetries()); 166 VersionedChunk.Builder chunk = 167 VersionedChunk.createInitialChunk(desiredProtocolVersion, resourceId, sessionId); 168 prepareInitialChunk(chunk); 169 try { 170 sendChunk(chunk.build()); 171 } catch (TransferAbortedException e) { 172 return; // Sending failed, transfer is cancelled 173 } 174 setInitialTimeout(); 175 } 176 177 /** Processes an incoming chunk from the server. */ handleChunk(VersionedChunk chunk)178 final void handleChunk(VersionedChunk chunk) { 179 try { 180 if (chunk.type() == Chunk.Type.COMPLETION) { 181 state.handleFinalChunk(chunk.status().orElseGet(() -> { 182 logger.atWarning().log("Received terminating chunk with no status set; using INTERNAL"); 183 return Status.INTERNAL.code(); 184 })); 185 } else { 186 state.handleDataChunk(chunk); 187 } 188 } catch (TransferAbortedException e) { 189 // Transfer was aborted; nothing else to do. 190 } 191 } 192 handleTimeoutIfDeadlineExceeded()193 final void handleTimeoutIfDeadlineExceeded() { 194 if (Instant.now().isAfter(deadline)) { 195 try { 196 state.handleTimeout(); 197 } catch (TransferAbortedException e) { 198 // Transfer was aborted; nothing else to do. 199 } 200 } 201 } 202 handleTermination()203 final void handleTermination() { 204 state.handleTermination(); 205 } 206 handleCancellation()207 final void handleCancellation() { 208 state.handleCancellation(); 209 } 210 211 /** Returns the State to enter immediately after sending the first packet. */ getWaitingForDataState()212 abstract State getWaitingForDataState(); 213 prepareInitialChunk(VersionedChunk.Builder chunk)214 abstract void prepareInitialChunk(VersionedChunk.Builder chunk); 215 216 /** 217 * Returns the chunk to send for a retry. Returns the initial chunk if no chunks 218 * have been sent. 219 */ getChunkForRetry()220 abstract VersionedChunk getChunkForRetry(); 221 222 /** Sets the result for the future after a successful transfer. */ setFutureResult()223 abstract void setFutureResult(); 224 newChunk(Chunk.Type type)225 final VersionedChunk.Builder newChunk(Chunk.Type type) { 226 return VersionedChunk.builder() 227 .setVersion(configuredProtocolVersion != ProtocolVersion.UNKNOWN ? configuredProtocolVersion 228 : desiredProtocolVersion) 229 .setType(type) 230 .setSessionId(sessionId) 231 .setResourceId(resourceId); 232 } 233 getLastChunkSent()234 final VersionedChunk getLastChunkSent() { 235 return lastChunkSent; 236 } 237 changeState(State newState)238 final State changeState(State newState) { 239 if (newState != state) { 240 logger.atFinest().log("%s state %s -> %s", 241 this, 242 state.getClass().getSimpleName(), 243 newState.getClass().getSimpleName()); 244 } 245 state = newState; 246 return state; 247 } 248 249 /** Exception thrown when the transfer is aborted. */ 250 static class TransferAbortedException extends Exception {} 251 252 /** 253 * Sends a chunk. 254 * 255 * If sending fails, the transfer cannot proceed. sendChunk() sets the state to 256 * completed and 257 * throws a TransferAbortedException. 258 */ sendChunk(VersionedChunk chunk)259 final void sendChunk(VersionedChunk chunk) throws TransferAbortedException { 260 lastChunkSent = chunk; 261 if (shouldAbortCallback.getAsBoolean()) { 262 logger.atWarning().log("Abort signal received."); 263 changeState(new Completed(new TransferError(this, Status.ABORTED))); 264 throw new TransferAbortedException(); 265 } 266 267 try { 268 if (VERBOSE_LOGGING) { 269 logger.atFinest().log("%s sending %s", this, chunk); 270 } 271 eventHandler.sendChunk(chunk.toMessage()); 272 } catch (TransferError transferError) { 273 changeState(new Completed(transferError)); 274 throw new TransferAbortedException(); 275 } 276 } 277 278 /** Sends a status chunk to the server and finishes the transfer. */ setStateTerminatingAndSendFinalChunk(Status status)279 final void setStateTerminatingAndSendFinalChunk(Status status) throws TransferAbortedException { 280 logger.atFine().log("%s sending final chunk with status %s", this, status); 281 sendChunk(newChunk(Chunk.Type.COMPLETION).setStatus(status).build()); 282 if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) { 283 changeState(new Terminating(status)); 284 } else { 285 changeState(new Completed(status)); 286 } 287 } 288 289 /** Invokes the transfer progress callback and logs the progress. */ updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes)290 final void updateProgress(long bytesSent, long bytesConfirmedReceived, long totalSizeBytes) { 291 TransferProgress progress = 292 TransferProgress.create(bytesSent, bytesConfirmedReceived, totalSizeBytes); 293 progressCallback.accept(progress); 294 295 long durationNanos = Duration.between(startTime, Instant.now()).toNanos(); 296 long totalRate = durationNanos == 0 ? 0 : (bytesSent * 1_000_000_000 / durationNanos); 297 298 logger.atFiner().log("%s progress: " 299 + "%5.1f%% (%d B sent, %d B confirmed received of %s B total) at %d B/s", 300 this, 301 progress.percentReceived(), 302 bytesSent, 303 bytesConfirmedReceived, 304 totalSizeBytes == UNKNOWN_TRANSFER_SIZE ? "unknown" : totalSizeBytes, 305 totalRate); 306 } 307 308 interface State { 309 /** 310 * Called to handle a non-final chunk for this transfer. 311 */ handleDataChunk(VersionedChunk chunk)312 void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException; 313 314 /** 315 * Called to handle the final chunk for this transfer. 316 */ handleFinalChunk(int statusCode)317 void handleFinalChunk(int statusCode) throws TransferAbortedException; 318 319 /** 320 * Called when this transfer's deadline expires. 321 */ handleTimeout()322 void handleTimeout() throws TransferAbortedException; 323 324 /** 325 * Called if the transfer is cancelled by the user. 326 */ handleCancellation()327 void handleCancellation(); 328 329 /** 330 * Called when the transfer thread is shutting down. 331 */ handleTermination()332 void handleTermination(); 333 } 334 335 /** Represents an active state in the transfer state machine. */ 336 abstract class ActiveState implements State { 337 @Override handleFinalChunk(int statusCode)338 public final void handleFinalChunk(int statusCode) throws TransferAbortedException { 339 Status status = Status.fromCode(statusCode); 340 if (status == null) { 341 logger.atWarning().log("Received invalid status value %d, using INTERNAL", statusCode); 342 status = Status.INTERNAL; 343 } 344 345 // If this is not version 2, immediately clean up. If it is, send the 346 // COMPLETION_ACK first and 347 // clean up if that succeeded. 348 if (configuredProtocolVersion == ProtocolVersion.VERSION_TWO) { 349 sendChunk(newChunk(Chunk.Type.COMPLETION_ACK).build()); 350 } 351 changeState(new Completed(status)); 352 } 353 354 /** Enters the recovery state and returns to this state if recovery succeeds. */ 355 @Override handleTimeout()356 public void handleTimeout() throws TransferAbortedException { 357 changeState(new TimeoutRecovery(this)).handleTimeout(); 358 } 359 360 @Override handleCancellation()361 public final void handleCancellation() { 362 try { 363 setStateTerminatingAndSendFinalChunk(Status.CANCELLED); 364 } catch (TransferAbortedException e) { 365 // Transfer was aborted; nothing to do. 366 } 367 } 368 369 @Override handleTermination()370 public final void handleTermination() { 371 try { 372 setStateTerminatingAndSendFinalChunk(Status.ABORTED); 373 } catch (TransferAbortedException e) { 374 // Transfer was aborted; nothing to do. 375 } 376 } 377 } 378 379 private class Initiating extends ActiveState { 380 @Override handleDataChunk(VersionedChunk chunk)381 public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { 382 if (chunk.version() == ProtocolVersion.UNKNOWN) { 383 logger.atWarning().log( 384 "%s aborting due to unsupported protocol version: %s", Transfer.this, chunk); 385 setStateTerminatingAndSendFinalChunk(Status.INVALID_ARGUMENT); 386 return; 387 } 388 389 changeState(getWaitingForDataState()); 390 391 if (chunk.type() != Chunk.Type.START_ACK) { 392 if (offset != 0) { 393 logger.atWarning().log( 394 "%s aborting due to unsupported non-zero offset transfer: %s", Transfer.this, chunk); 395 setStateTerminatingAndSendFinalChunk(Status.INTERNAL); 396 return; 397 } 398 logger.atFine().log( 399 "%s got non-handshake chunk; reverting to legacy protocol", Transfer.this); 400 configuredProtocolVersion = ProtocolVersion.LEGACY; 401 state.handleDataChunk(chunk); 402 return; 403 } 404 405 if (chunk.version().compareTo(desiredProtocolVersion) <= 0) { 406 configuredProtocolVersion = chunk.version(); 407 } else { 408 configuredProtocolVersion = desiredProtocolVersion; 409 } 410 411 logger.atFine().log("%s negotiated protocol %s (ours=%s, theirs=%s)", 412 Transfer.this, 413 configuredProtocolVersion, 414 desiredProtocolVersion, 415 chunk.version()); 416 417 if (offset != chunk.initialOffset()) { 418 logger.atWarning().log( 419 "%s aborting due to unconfirmed non-zero offset transfer: %s", Transfer.this, chunk); 420 setStateTerminatingAndSendFinalChunk(Status.UNIMPLEMENTED); 421 return; 422 } 423 424 VersionedChunk.Builder startAckConfirmation = newChunk(Chunk.Type.START_ACK_CONFIRMATION); 425 prepareInitialChunk(startAckConfirmation); 426 sendChunk(startAckConfirmation.build()); 427 } 428 } 429 430 /** Recovering from an expired timeout. */ 431 class TimeoutRecovery extends ActiveState { 432 private final State nextState; 433 private int retries; 434 TimeoutRecovery(State nextState)435 TimeoutRecovery(State nextState) { 436 this.nextState = nextState; 437 } 438 439 @Override handleDataChunk(VersionedChunk chunk)440 public void handleDataChunk(VersionedChunk chunk) throws TransferAbortedException { 441 changeState(nextState).handleDataChunk(chunk); 442 } 443 444 @Override handleTimeout()445 public void handleTimeout() throws TransferAbortedException { 446 // If the transfer timed out, skip to the completed state. Don't send any more 447 // packets. 448 if (retries >= timeoutSettings.maxRetries()) { 449 logger.atFine().log("%s exhausted its %d retries", Transfer.this, retries); 450 changeState(new Completed(Status.DEADLINE_EXCEEDED)); 451 return; 452 } 453 454 if (lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) { 455 logger.atFine().log("%s exhausted its %d lifetime retries", Transfer.this, retries); 456 changeState(new Completed(Status.DEADLINE_EXCEEDED)); 457 return; 458 } 459 460 logger.atFiner().log("%s received no chunks for %d ms; retrying %d/%d", 461 Transfer.this, 462 timeoutSettings.timeoutMillis(), 463 retries, 464 timeoutSettings.maxRetries()); 465 sendChunk(getChunkForRetry()); 466 retries += 1; 467 lifetimeRetries += 1; 468 setNextChunkTimeout(); 469 } 470 } 471 472 /** 473 * Transfer completed. Do nothing if the transfer is terminated or cancelled. 474 */ 475 class Terminating extends ActiveState { 476 private final Status status; 477 private int retries; 478 Terminating(Status status)479 Terminating(Status status) { 480 this.status = status; 481 } 482 483 @Override handleDataChunk(VersionedChunk chunk)484 public void handleDataChunk(VersionedChunk chunk) { 485 if (chunk.type() == Chunk.Type.COMPLETION_ACK) { 486 changeState(new Completed(status)); 487 } 488 } 489 490 @Override handleTimeout()491 public void handleTimeout() throws TransferAbortedException { 492 if (retries >= timeoutSettings.maxRetries() 493 || lifetimeRetries >= timeoutSettings.maxLifetimeRetries()) { 494 // Unlike the standard `TimeoutRecovery` state, a `Terminating` transfer should 495 // not fail due to a timeout if no completion ACK is received. It should 496 // instead complete with its existing status. 497 logger.atFine().log( 498 "%s exhausted its %d retries (lifetime %d)", Transfer.this, retries, lifetimeRetries); 499 changeState(new Completed(status)); 500 return; 501 } 502 503 logger.atFiner().log("%s did not receive completion ack for %d ms; retrying %d/%d", 504 Transfer.this, 505 timeoutSettings.timeoutMillis(), 506 retries, 507 timeoutSettings.maxRetries()); 508 sendChunk(getChunkForRetry()); 509 retries += 1; 510 lifetimeRetries += 1; 511 setNextChunkTimeout(); 512 } 513 } 514 515 class Completed implements State { 516 /** 517 * Performs final cleanup of a completed transfer. No packets are sent to the 518 * server. 519 */ Completed(Status status)520 Completed(Status status) { 521 cleanUp(); 522 logger.atInfo().log("%s completed with status %s", Transfer.this, status); 523 if (status.ok()) { 524 setFutureResult(); 525 } else { 526 setException(new TransferError(Transfer.this, status)); 527 } 528 } 529 530 /** 531 * Finishes the transfer due to an exception. No packets are sent to the server. 532 */ Completed(TransferError exception)533 Completed(TransferError exception) { 534 cleanUp(); 535 logger.atWarning().withCause(exception).log("%s terminated with exception", Transfer.this); 536 setException(exception); 537 } 538 cleanUp()539 private void cleanUp() { 540 deadline = NO_TIMEOUT; 541 eventHandler.unregisterTransfer(Transfer.this); 542 } 543 544 @Override handleDataChunk(VersionedChunk chunk)545 public void handleDataChunk(VersionedChunk chunk) { 546 logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this); 547 } 548 549 @Override handleFinalChunk(int statusCode)550 public void handleFinalChunk(int statusCode) { 551 logger.atFiner().log("%s [Completed state]: Received unexpected data chunk", Transfer.this); 552 } 553 554 @Override handleTimeout()555 public void handleTimeout() {} 556 557 @Override handleTermination()558 public void handleTermination() {} 559 560 @Override handleCancellation()561 public void handleCancellation() {} 562 } 563 } 564