1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { Status } from 'pigweedjs/pw_status'; 16import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb'; 17 18export class ProgressStats { 19 constructor( 20 readonly bytesSent: number, 21 readonly bytesConfirmedReceived: number, 22 readonly totalSizeBytes?: number, 23 ) {} 24 25 get percentReceived(): number { 26 if (this.totalSizeBytes === undefined) { 27 return NaN; 28 } 29 return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100; 30 } 31 32 toString(): string { 33 const total = 34 this.totalSizeBytes === undefined 35 ? 'undefined' 36 : this.totalSizeBytes.toString(); 37 const percent = this.percentReceived.toFixed(2); 38 return ( 39 `${percent}% (${this.bytesSent} B sent, ` + 40 `${this.bytesConfirmedReceived} B received of ${total} B)` 41 ); 42 } 43} 44 45export type ProgressCallback = (stats: ProgressStats) => void; 46 47/** A Timer which invokes a callback after a certain timeout. */ 48class Timer { 49 private task?: ReturnType<typeof setTimeout>; 50 51 constructor( 52 readonly timeoutS: number, 53 private readonly callback: () => any, 54 ) {} 55 56 /** 57 * Starts a new timer. 58 * 59 * If a timer is already running, it is stopped and a new timer started. 60 * This can be used to implement watchdog-like behavior, where a callback 61 * is invoked after some time without a kick. 62 */ 63 start() { 64 this.stop(); 65 this.task = setTimeout(this.callback, this.timeoutS * 1000); 66 } 67 68 /** Terminates a running timer. */ 69 stop() { 70 if (this.task !== undefined) { 71 clearTimeout(this.task); 72 this.task = undefined; 73 } 74 } 75} 76 77/** 78 * A client-side data transfer through a Manager. 79 * 80 * Subclasses are responsible for implementing all of the logic for their type 81 * of transfer, receiving messages from the server and sending the appropriate 82 * messages in response. 83 */ 84export abstract class Transfer { 85 status: Status = Status.OK; 86 done: Promise<Status>; 87 data = new Uint8Array(0); 88 89 private retries = 0; 90 private responseTimer?: Timer; 91 private resolve?: (value: Status | PromiseLike<Status>) => void; 92 93 constructor( 94 public id: number, 95 protected sendChunk: (chunk: Chunk) => void, 96 responseTimeoutS: number, 97 private maxRetries: number, 98 private progressCallback?: ProgressCallback, 99 ) { 100 this.responseTimer = new Timer(responseTimeoutS, this.onTimeout); 101 this.done = new Promise<Status>((resolve) => { 102 this.resolve = resolve!; 103 }); 104 } 105 106 /** Returns the initial chunk to notify the server of the transfer. */ 107 protected abstract get initialChunk(): Chunk; 108 109 /** Handles a chunk that contains or requests data. */ 110 protected abstract handleDataChunk(chunk: Chunk): void; 111 112 /** Retries after a timeout occurs. */ 113 protected abstract retryAfterTimeout(): void; 114 115 /** Handles a timeout while waiting for a chunk. */ 116 private onTimeout = () => { 117 this.retries += 1; 118 if (this.retries > this.maxRetries) { 119 this.finish(Status.DEADLINE_EXCEEDED); 120 return; 121 } 122 123 console.debug( 124 `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`, 125 ); 126 127 this.retryAfterTimeout(); 128 this.responseTimer?.start(); 129 }; 130 131 /** Sends the initial chunk of the transfer. */ 132 begin(): void { 133 this.sendChunk(this.initialChunk as any); 134 this.responseTimer?.start(); 135 } 136 137 /** Ends the transfer with the specified status */ 138 protected finish(status: Status): void { 139 this.responseTimer?.stop(); 140 this.responseTimer = undefined; 141 this.status = status; 142 143 if (status === Status.OK) { 144 const totalSize = this.data.length; 145 this.updateProgress(totalSize, totalSize, totalSize); 146 } 147 148 this.resolve!(this.status); 149 } 150 151 /** Ends the transfer without sending a completion chunk */ 152 abort(status: Status): void { 153 this.finish(status); 154 } 155 156 /** Ends the transfer and sends a completion chunk */ 157 terminate(status: Status): void { 158 const chunk = new Chunk(); 159 chunk.setStatus(status); 160 chunk.setTransferId(this.id); 161 chunk.setType(Chunk.Type.COMPLETION); 162 this.sendChunk(chunk); 163 this.abort(status); 164 } 165 166 /** Invokes the provided progress callback, if any, with the progress */ 167 updateProgress( 168 bytesSent: number, 169 bytesConfirmedReceived: number, 170 totalSizeBytes?: number, 171 ): void { 172 const stats = new ProgressStats( 173 bytesSent, 174 bytesConfirmedReceived, 175 totalSizeBytes, 176 ); 177 console.debug(`Transfer ${this.id} progress: ${stats}`); 178 179 if (this.progressCallback !== undefined) { 180 this.progressCallback(stats); 181 } 182 } 183 184 /** 185 * Processes an incoming chunk from the server. 186 * 187 * Handles terminating chunks (i.e. those with a status) and forwards 188 * non-terminating chunks to handle_data_chunk. 189 */ 190 handleChunk(chunk: Chunk): void { 191 this.responseTimer?.stop(); 192 this.retries = 0; // Received data from service, so reset the retries. 193 194 console.debug(`Received chunk:(${chunk})`); 195 196 // Status chunks are only used to terminate a transfer. They do not 197 // contain any data that requires processing. 198 if (chunk.hasStatus()) { 199 this.finish(chunk.getStatus()); 200 return; 201 } 202 203 this.handleDataChunk(chunk); 204 205 // Start the timeout for the server to send a chunk in response. 206 this.responseTimer?.start(); 207 } 208} 209 210/** 211 * A client <= server read transfer. 212 * 213 * Although typescript can effectively handle an unlimited transfer window, this 214 * client sets a conservative window and chunk size to avoid overloading the 215 * device. These are configurable in the constructor. 216 */ 217export class ReadTransfer extends Transfer { 218 private maxBytesToReceive: number; 219 private maxChunkSize: number; 220 private chunkDelayMicroS?: number; // Microseconds 221 private remainingTransferSize?: number; 222 private offset = 0; 223 private pendingBytes: number; 224 private windowEndOffset: number; 225 226 // The fractional position within a window at which a receive transfer should 227 // extend its window size to minimize the amount of time the transmitter 228 // spends blocked. 229 // 230 // For example, a divisor of 2 will extend the window when half of the 231 // requested data has been received, a divisor of three will extend at a third 232 // of the window, and so on. 233 private static EXTEND_WINDOW_DIVISOR = 2; 234 235 constructor( 236 id: number, 237 sendChunk: (chunk: Chunk) => void, 238 responseTimeoutS: number, 239 maxRetries: number, 240 progressCallback?: ProgressCallback, 241 maxBytesToReceive = 8192, 242 maxChunkSize = 1024, 243 chunkDelayMicroS?: number, 244 ) { 245 super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback); 246 this.maxBytesToReceive = maxBytesToReceive; 247 this.maxChunkSize = maxChunkSize; 248 this.chunkDelayMicroS = chunkDelayMicroS; 249 this.pendingBytes = maxBytesToReceive; 250 this.windowEndOffset = maxBytesToReceive; 251 } 252 253 protected get initialChunk(): any { 254 return this.transferParameters(Chunk.Type.START); 255 } 256 257 /** Builds an updated transfer parameters chunk to send the server. */ 258 private transferParameters(type: any): Chunk { 259 this.pendingBytes = this.maxBytesToReceive; 260 this.windowEndOffset = this.offset + this.maxBytesToReceive; 261 262 const chunk = new Chunk(); 263 chunk.setTransferId(this.id); 264 chunk.setPendingBytes(this.pendingBytes); 265 chunk.setMaxChunkSizeBytes(this.maxChunkSize); 266 chunk.setOffset(this.offset); 267 chunk.setWindowEndOffset(this.windowEndOffset); 268 chunk.setType(type); 269 270 if (this.chunkDelayMicroS !== 0) { 271 chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!); 272 } 273 return chunk; 274 } 275 276 /** 277 * Processes an incoming chunk from the server. 278 * 279 * In a read transfer, the client receives data chunks from the server. 280 * Once all pending data is received, the transfer parameters are updated. 281 */ 282 protected handleDataChunk(chunk: Chunk): void { 283 if (chunk.getOffset() != this.offset) { 284 // Initially, the transfer service only supports in-order transfers. 285 // If data is received out of order, request that the server 286 // retransmit from the previous offset. 287 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 288 return; 289 } 290 291 const oldData = this.data; 292 const chunkData = chunk.getData() as Uint8Array; 293 this.data = new Uint8Array(chunkData.length + oldData.length); 294 this.data.set(oldData); 295 this.data.set(chunkData, oldData.length); 296 297 this.pendingBytes -= chunk.getData().length; 298 this.offset += chunk.getData().length; 299 300 if (chunk.hasRemainingBytes()) { 301 if (chunk.getRemainingBytes() === 0) { 302 // No more data to read. Acknowledge receipt and finish. 303 this.terminate(Status.OK); 304 return; 305 } 306 307 this.remainingTransferSize = chunk.getRemainingBytes(); 308 } else if (this.remainingTransferSize !== undefined) { 309 // Update the remaining transfer size, if it is known. 310 this.remainingTransferSize -= chunk.getData().length; 311 312 if (this.remainingTransferSize <= 0) { 313 this.remainingTransferSize = undefined; 314 } 315 } 316 317 if (chunk.getWindowEndOffset() !== 0) { 318 if (chunk.getWindowEndOffset() < this.offset) { 319 console.error( 320 `Transfer ${ 321 this.id 322 }: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${ 323 this.offset 324 })`, 325 ); 326 this.terminate(Status.INTERNAL); 327 return; 328 } 329 330 if (chunk.getWindowEndOffset() < this.offset) { 331 console.error( 332 `Transfer ${ 333 this.id 334 }: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${ 335 this.windowEndOffset 336 })`, 337 ); 338 this.terminate(Status.INTERNAL); 339 return; 340 } 341 342 this.windowEndOffset = chunk.getWindowEndOffset(); 343 this.pendingBytes -= chunk.getWindowEndOffset() - this.offset; 344 } 345 346 const remainingWindowSize = this.windowEndOffset - this.offset; 347 const extendWindow = 348 remainingWindowSize <= 349 this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR; 350 351 const totalSize = 352 this.remainingTransferSize === undefined 353 ? undefined 354 : this.remainingTransferSize + this.offset; 355 this.updateProgress(this.offset, this.offset, totalSize); 356 357 if (this.pendingBytes === 0) { 358 // All pending data was received. Send out a new parameters chunk 359 // for the next block. 360 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 361 } else if (extendWindow) { 362 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE)); 363 } 364 } 365 366 protected retryAfterTimeout(): void { 367 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 368 } 369} 370 371/** 372 * A client => server write transfer. 373 */ 374export class WriteTransfer extends Transfer { 375 private windowId = 0; 376 offset = 0; 377 maxChunkSize = 0; 378 chunkDelayMicroS?: number; 379 windowEndOffset = 0; 380 lastChunk: Chunk; 381 382 constructor( 383 id: number, 384 data: Uint8Array, 385 sendChunk: (chunk: Chunk) => void, 386 responseTimeoutS: number, 387 initialResponseTimeoutS: number, 388 maxRetries: number, 389 progressCallback?: ProgressCallback, 390 ) { 391 super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback); 392 this.data = data; 393 this.lastChunk = this.initialChunk; 394 } 395 396 protected get initialChunk(): any { 397 // TODO(frolv): The session ID should not be set here but assigned by the 398 // server during an initial handshake. 399 const chunk = new Chunk(); 400 chunk.setTransferId(this.id); 401 chunk.setResourceId(this.id); 402 chunk.setType(Chunk.Type.START); 403 return chunk; 404 } 405 406 /** 407 * Processes an incoming chunk from the server. 408 * 409 * In a write transfer, the server only sends transfer parameter updates 410 * to the client. When a message is received, update local parameters and 411 * send data accordingly. 412 */ 413 protected handleDataChunk(chunk: Chunk): void { 414 this.windowId += 1; 415 const initialWindowId = this.windowId; 416 417 if (!this.handleParametersUpdate(chunk)) { 418 return; 419 } 420 421 const bytesAknowledged = chunk.getOffset(); 422 423 let writeChunk: Chunk; 424 // eslint-disable-next-line no-constant-condition 425 while (true) { 426 writeChunk = this.nextChunk(); 427 this.offset += writeChunk.getData().length; 428 const sentRequestedBytes = this.offset === this.windowEndOffset; 429 430 this.updateProgress(this.offset, bytesAknowledged, this.data.length); 431 this.sendChunk(writeChunk); 432 433 if (sentRequestedBytes) { 434 break; 435 } 436 } 437 438 this.lastChunk = writeChunk; 439 } 440 441 /** Updates transfer state base on a transfer parameters update. */ 442 private handleParametersUpdate(chunk: Chunk): boolean { 443 let retransmit = true; 444 if (chunk.hasType()) { 445 retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT; 446 } 447 448 if (chunk.getOffset() > this.data.length) { 449 // Bad offset; terminate the transfer. 450 console.error( 451 `Transfer ${ 452 this.id 453 }: server requested invalid offset ${chunk.getOffset()} (size ${ 454 this.data.length 455 })`, 456 ); 457 458 this.terminate(Status.OUT_OF_RANGE); 459 return false; 460 } 461 462 if (chunk.getPendingBytes() === 0) { 463 console.error( 464 `Transfer ${this.id}: service requested 0 bytes (invalid); aborting`, 465 ); 466 this.terminate(Status.INTERNAL); 467 return false; 468 } 469 470 if (retransmit) { 471 // Check whether the client has sent a previous data offset, which 472 // indicates that some chunks were lost in transmission. 473 if (chunk.getOffset() < this.offset) { 474 console.debug( 475 `Write transfer ${ 476 this.id 477 } rolling back to offset ${chunk.getOffset()} from ${this.offset}`, 478 ); 479 } 480 481 this.offset = chunk.getOffset(); 482 483 // Retransmit is the default behavior for older versions of the 484 // transfer protocol. The window_end_offset field is not guaranteed 485 // to be set in these version, so it must be calculated. 486 const maxBytesToSend = Math.min( 487 chunk.getPendingBytes(), 488 this.data.length - this.offset, 489 ); 490 this.windowEndOffset = this.offset + maxBytesToSend; 491 } else { 492 // Extend the window to the new end offset specified by the server. 493 this.windowEndOffset = Math.min( 494 chunk.getWindowEndOffset(), 495 this.data.length, 496 ); 497 } 498 499 if (chunk.hasMaxChunkSizeBytes()) { 500 this.maxChunkSize = chunk.getMaxChunkSizeBytes(); 501 } 502 503 if (chunk.hasMinDelayMicroseconds()) { 504 this.chunkDelayMicroS = chunk.getMinDelayMicroseconds(); 505 } 506 return true; 507 } 508 509 /** Returns the next Chunk message to send in the data transfer. */ 510 private nextChunk(): Chunk { 511 const chunk = new Chunk(); 512 chunk.setTransferId(this.id); 513 chunk.setOffset(this.offset); 514 chunk.setType(Chunk.Type.DATA); 515 516 const maxBytesInChunk = Math.min( 517 this.maxChunkSize, 518 this.windowEndOffset - this.offset, 519 ); 520 521 chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk)); 522 523 // Mark the final chunk of the transfer. 524 if (this.data.length - this.offset <= maxBytesInChunk) { 525 chunk.setRemainingBytes(0); 526 } 527 return chunk; 528 } 529 530 protected retryAfterTimeout(): void { 531 this.sendChunk(this.lastChunk); 532 } 533} 534