1// Copyright 2022 The Pigweed Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); you may not 4// use this file except in compliance with the License. You may obtain a copy of 5// the License at 6// 7// https://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12// License for the specific language governing permissions and limitations under 13// the License. 14 15import { 16 BidirectionalStreamingCall, 17 BidirectionalStreamingMethodStub, 18 ServiceClient, 19} from '@pigweed/pw_rpc'; 20import {Status} from '@pigweed/pw_status'; 21import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb'; 22 23export class ProgressStats { 24 constructor( 25 readonly bytesSent: number, 26 readonly bytesConfirmedReceived: number, 27 readonly totalSizeBytes?: number 28 ) {} 29 30 get percentReceived(): number { 31 if (this.totalSizeBytes === undefined) { 32 return NaN; 33 } 34 return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100; 35 } 36 37 toString(): string { 38 const total = 39 this.totalSizeBytes === undefined 40 ? 'undefined' 41 : this.totalSizeBytes.toString(); 42 const percent = this.percentReceived.toFixed(2); 43 return ( 44 `${percent}% (${this.bytesSent} B sent, ` + 45 `${this.bytesConfirmedReceived} B received of ${total} B)` 46 ); 47 } 48} 49 50export type ProgressCallback = (stats: ProgressStats) => void; 51 52/** A Timer which invokes a callback after a certain timeout. */ 53class Timer { 54 private task?: ReturnType<typeof setTimeout>; 55 56 constructor( 57 readonly timeoutS: number, 58 private readonly callback: () => any 59 ) {} 60 61 /** 62 * Starts a new timer. 63 * 64 * If a timer is already running, it is stopped and a new timer started. 65 * This can be used to implement watchdog-like behavior, where a callback 66 * is invoked after some time without a kick. 67 */ 68 start() { 69 this.stop(); 70 this.task = setTimeout(this.callback, this.timeoutS * 1000); 71 } 72 73 /** Terminates a running timer. */ 74 stop() { 75 if (this.task !== undefined) { 76 clearTimeout(this.task); 77 this.task = undefined; 78 } 79 } 80} 81 82/** 83 * A client-side data transfer through a Manager. 84 * 85 * Subclasses are responsible for implementing all of the logic for their type 86 * of transfer, receiving messages from the server and sending the appropriate 87 * messages in response. 88 */ 89export abstract class Transfer { 90 status: Status = Status.OK; 91 done: Promise<Status>; 92 protected data = new Uint8Array(); 93 94 private retries = 0; 95 private responseTimer?: Timer; 96 private resolve?: (value: Status | PromiseLike<Status>) => void; 97 98 constructor( 99 public id: number, 100 protected sendChunk: (chunk: Chunk) => void, 101 responseTimeoutS: number, 102 private maxRetries: number, 103 private progressCallback?: ProgressCallback 104 ) { 105 this.responseTimer = new Timer(responseTimeoutS, this.onTimeout); 106 this.done = new Promise<Status>(resolve => { 107 this.resolve = resolve!; 108 }); 109 } 110 111 /** Returns the initial chunk to notify the server of the transfer. */ 112 protected abstract get initialChunk(): Chunk; 113 114 /** Handles a chunk that contains or requests data. */ 115 protected abstract handleDataChunk(chunk: Chunk): void; 116 117 /** Retries after a timeout occurs. */ 118 protected abstract retryAfterTimeout(): void; 119 120 /** Handles a timeout while waiting for a chunk. */ 121 private onTimeout = () => { 122 this.retries += 1; 123 if (this.retries > this.maxRetries) { 124 this.finish(Status.DEADLINE_EXCEEDED); 125 return; 126 } 127 128 console.debug( 129 `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}` 130 ); 131 132 this.retryAfterTimeout(); 133 this.responseTimer?.start(); 134 }; 135 136 /** Sends an error chunk to the server and finishes the transfer. */ 137 protected sendError(error: Status): void { 138 const chunk = new Chunk(); 139 chunk.setStatus(error); 140 chunk.setTransferId(this.id); 141 chunk.setType(Chunk.Type.TRANSFER_COMPLETION); 142 this.sendChunk(chunk); 143 this.finish(error); 144 } 145 146 /** Sends the initial chunk of the transfer. */ 147 begin(): void { 148 this.sendChunk(this.initialChunk); 149 this.responseTimer?.start(); 150 } 151 152 /** Ends the transfer with the specified status. */ 153 finish(status: Status): void { 154 this.responseTimer?.stop(); 155 this.responseTimer = undefined; 156 this.status = status; 157 158 if (status === Status.OK) { 159 const totalSize = this.data.length; 160 this.updateProgress(totalSize, totalSize, totalSize); 161 } 162 163 this.resolve!(this.status); 164 } 165 166 /** Invokes the provided progress callback, if any, with the progress */ 167 updateProgress( 168 bytesSent: number, 169 bytesConfirmedReceived: number, 170 totalSizeBytes?: number 171 ): void { 172 const stats = new ProgressStats( 173 bytesSent, 174 bytesConfirmedReceived, 175 totalSizeBytes 176 ); 177 console.debug(`Transfer ${this.id} progress: ${stats}`); 178 179 if (this.progressCallback !== undefined) { 180 this.progressCallback(stats); 181 } 182 } 183 184 /** 185 * Processes an incoming chunk from the server. 186 * 187 * Handles terminating chunks (i.e. those with a status) and forwards 188 * non-terminating chunks to handle_data_chunk. 189 */ 190 handleChunk(chunk: Chunk): void { 191 this.responseTimer?.stop(); 192 this.retries = 0; // Received data from service, so reset the retries. 193 194 console.debug(`Received chunk:(${chunk})`); 195 196 // Status chunks are only used to terminate a transfer. They do not 197 // contain any data that requires processing. 198 if (chunk.hasStatus()) { 199 this.finish(chunk.getStatus()); 200 return; 201 } 202 203 this.handleDataChunk(chunk); 204 205 // Start the timeout for the server to send a chunk in response. 206 this.responseTimer?.start(); 207 } 208} 209 210/** 211 * A client <= server read transfer. 212 * 213 * Although typescript can effectively handle an unlimited transfer window, this 214 * client sets a conservative window and chunk size to avoid overloading the 215 * device. These are configurable in the constructor. 216 */ 217export class ReadTransfer extends Transfer { 218 private maxBytesToReceive: number; 219 private maxChunkSize: number; 220 private chunkDelayMicroS?: number; // Microseconds 221 private remainingTransferSize?: number; 222 private offset = 0; 223 private pendingBytes: number; 224 private windowEndOffset: number; 225 226 // The fractional position within a window at which a receive transfer should 227 // extend its window size to minimize the amount of time the transmitter 228 // spends blocked. 229 // 230 // For example, a divisor of 2 will extend the window when half of the 231 // requested data has been received, a divisor of three will extend at a third 232 // of the window, and so on. 233 private static EXTEND_WINDOW_DIVISOR = 2; 234 235 data = new Uint8Array(); 236 237 constructor( 238 id: number, 239 sendChunk: (chunk: Chunk) => void, 240 responseTimeoutS: number, 241 maxRetries: number, 242 progressCallback?: ProgressCallback, 243 maxBytesToReceive = 8192, 244 maxChunkSize = 1024, 245 chunkDelayMicroS?: number 246 ) { 247 super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback); 248 this.maxBytesToReceive = maxBytesToReceive; 249 this.maxChunkSize = maxChunkSize; 250 this.chunkDelayMicroS = chunkDelayMicroS; 251 this.pendingBytes = maxBytesToReceive; 252 this.windowEndOffset = maxBytesToReceive; 253 } 254 255 protected get initialChunk(): Chunk { 256 return this.transferParameters(Chunk.Type.TRANSFER_START); 257 } 258 259 /** Builds an updated transfer parameters chunk to send the server. */ 260 private transferParameters(type: Chunk.TypeMap[keyof Chunk.TypeMap]): Chunk { 261 this.pendingBytes = this.maxBytesToReceive; 262 this.windowEndOffset = this.offset + this.maxBytesToReceive; 263 264 const chunk = new Chunk(); 265 chunk.setTransferId(this.id); 266 chunk.setPendingBytes(this.pendingBytes); 267 chunk.setMaxChunkSizeBytes(this.maxChunkSize); 268 chunk.setOffset(this.offset); 269 chunk.setWindowEndOffset(this.windowEndOffset); 270 chunk.setType(type); 271 272 if (this.chunkDelayMicroS !== 0) { 273 chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!); 274 } 275 return chunk; 276 } 277 278 /** 279 * Processes an incoming chunk from the server. 280 * 281 * In a read transfer, the client receives data chunks from the server. 282 * Once all pending data is received, the transfer parameters are updated. 283 */ 284 protected handleDataChunk(chunk: Chunk): void { 285 if (chunk.getOffset() != this.offset) { 286 // Initially, the transfer service only supports in-order transfers. 287 // If data is received out of order, request that the server 288 // retransmit from the previous offset. 289 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 290 return; 291 } 292 293 const oldData = this.data; 294 const chunkData = chunk.getData() as Uint8Array; 295 this.data = new Uint8Array(chunkData.length + oldData.length); 296 this.data.set(oldData); 297 this.data.set(chunkData, oldData.length); 298 299 this.pendingBytes -= chunk.getData().length; 300 this.offset += chunk.getData().length; 301 302 if (chunk.hasRemainingBytes()) { 303 if (chunk.getRemainingBytes() === 0) { 304 // No more data to read. Acknowledge receipt and finish. 305 const endChunk = new Chunk(); 306 endChunk.setTransferId(this.id); 307 endChunk.setStatus(Status.OK); 308 endChunk.setType(Chunk.Type.TRANSFER_COMPLETION); 309 this.sendChunk(endChunk); 310 this.finish(Status.OK); 311 return; 312 } 313 314 this.remainingTransferSize = chunk.getRemainingBytes(); 315 } else if (this.remainingTransferSize !== undefined) { 316 // Update the remaining transfer size, if it is known. 317 this.remainingTransferSize -= chunk.getData().length; 318 319 if (this.remainingTransferSize <= 0) { 320 this.remainingTransferSize = undefined; 321 } 322 } 323 324 if (chunk.getWindowEndOffset() !== 0) { 325 if (chunk.getWindowEndOffset() < this.offset) { 326 console.error( 327 `Transfer ${ 328 this.id 329 }: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${ 330 this.offset 331 })` 332 ); 333 this.sendError(Status.INTERNAL); 334 return; 335 } 336 337 if (chunk.getWindowEndOffset() < this.offset) { 338 console.error( 339 `Transfer ${ 340 this.id 341 }: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${ 342 this.windowEndOffset 343 })` 344 ); 345 this.sendError(Status.INTERNAL); 346 return; 347 } 348 349 this.windowEndOffset = chunk.getWindowEndOffset(); 350 this.pendingBytes -= chunk.getWindowEndOffset() - this.offset; 351 } 352 353 const remainingWindowSize = this.windowEndOffset - this.offset; 354 const extendWindow = 355 remainingWindowSize <= 356 this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR; 357 358 const totalSize = 359 this.remainingTransferSize === undefined 360 ? undefined 361 : this.remainingTransferSize + this.offset; 362 this.updateProgress(this.offset, this.offset, totalSize); 363 364 if (this.pendingBytes === 0) { 365 // All pending data was received. Send out a new parameters chunk 366 // for the next block. 367 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 368 } else if (extendWindow) { 369 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE)); 370 } 371 } 372 373 protected retryAfterTimeout(): void { 374 this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT)); 375 } 376} 377 378/** 379 * A client => server write transfer. 380 */ 381export class WriteTransfer extends Transfer { 382 readonly data: Uint8Array; 383 private windowId = 0; 384 offset = 0; 385 maxChunkSize = 0; 386 chunkDelayMicroS?: number; 387 windowEndOffset = 0; 388 lastChunk: Chunk; 389 390 constructor( 391 id: number, 392 data: Uint8Array, 393 sendChunk: (chunk: Chunk) => void, 394 responseTimeoutS: number, 395 initialResponseTimeoutS: number, 396 maxRetries: number, 397 progressCallback?: ProgressCallback 398 ) { 399 super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback); 400 this.data = data; 401 this.lastChunk = this.initialChunk; 402 } 403 404 protected get initialChunk(): Chunk { 405 const chunk = new Chunk(); 406 chunk.setTransferId(this.id); 407 chunk.setType(Chunk.Type.TRANSFER_START); 408 return chunk; 409 } 410 411 /** 412 * Processes an incoming chunk from the server. 413 * 414 * In a write transfer, the server only sends transfer parameter updates 415 * to the client. When a message is received, update local parameters and 416 * send data accordingly. 417 */ 418 protected handleDataChunk(chunk: Chunk): void { 419 this.windowId += 1; 420 const initialWindowId = this.windowId; 421 422 if (!this.handleParametersUpdate(chunk)) { 423 return; 424 } 425 426 const bytesAknowledged = chunk.getOffset(); 427 428 let writeChunk: Chunk; 429 while (true) { 430 writeChunk = this.nextChunk(); 431 this.offset += writeChunk.getData().length; 432 const sentRequestedBytes = this.offset === this.windowEndOffset; 433 434 this.updateProgress(this.offset, bytesAknowledged, this.data.length); 435 this.sendChunk(writeChunk); 436 437 if (sentRequestedBytes) { 438 break; 439 } 440 } 441 442 this.lastChunk = writeChunk; 443 } 444 445 /** Updates transfer state base on a transfer parameters update. */ 446 private handleParametersUpdate(chunk: Chunk): boolean { 447 let retransmit = true; 448 if (chunk.hasType()) { 449 retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT; 450 } 451 452 if (chunk.getOffset() > this.data.length) { 453 // Bad offset; terminate the transfer. 454 console.error( 455 `Transfer ${ 456 this.id 457 }: server requested invalid offset ${chunk.getOffset()} (size ${ 458 this.data.length 459 })` 460 ); 461 462 this.sendError(Status.OUT_OF_RANGE); 463 return false; 464 } 465 466 if (chunk.getPendingBytes() === 0) { 467 console.error( 468 `Transfer ${this.id}: service requested 0 bytes (invalid); aborting` 469 ); 470 this.sendError(Status.INTERNAL); 471 return false; 472 } 473 474 if (retransmit) { 475 // Check whether the client has sent a previous data offset, which 476 // indicates that some chunks were lost in transmission. 477 if (chunk.getOffset() < this.offset) { 478 console.debug( 479 `Write transfer ${ 480 this.id 481 } rolling back to offset ${chunk.getOffset()} from ${this.offset}` 482 ); 483 } 484 485 this.offset = chunk.getOffset(); 486 487 // Retransmit is the default behavior for older versions of the 488 // transfer protocol. The window_end_offset field is not guaranteed 489 // to be set in these version, so it must be calculated. 490 const maxBytesToSend = Math.min( 491 chunk.getPendingBytes(), 492 this.data.length - this.offset 493 ); 494 this.windowEndOffset = this.offset + maxBytesToSend; 495 } else { 496 // Extend the window to the new end offset specified by the server. 497 this.windowEndOffset = Math.min( 498 chunk.getWindowEndOffset(), 499 this.data.length 500 ); 501 } 502 503 if (chunk.hasMaxChunkSizeBytes()) { 504 this.maxChunkSize = chunk.getMaxChunkSizeBytes(); 505 } 506 507 if (chunk.hasMinDelayMicroseconds()) { 508 this.chunkDelayMicroS = chunk.getMinDelayMicroseconds(); 509 } 510 return true; 511 } 512 513 /** Returns the next Chunk message to send in the data transfer. */ 514 private nextChunk(): Chunk { 515 const chunk = new Chunk(); 516 chunk.setTransferId(this.id); 517 chunk.setOffset(this.offset); 518 chunk.setType(Chunk.Type.TRANSFER_DATA); 519 520 const maxBytesInChunk = Math.min( 521 this.maxChunkSize, 522 this.windowEndOffset - this.offset 523 ); 524 525 chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk)); 526 527 // Mark the final chunk of the transfer. 528 if (this.data.length - this.offset <= maxBytesInChunk) { 529 chunk.setRemainingBytes(0); 530 } 531 return chunk; 532 } 533 534 protected retryAfterTimeout(): void { 535 this.sendChunk(this.lastChunk); 536 } 537} 538