• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import {
16  BidirectionalStreamingCall,
17  BidirectionalStreamingMethodStub,
18  ServiceClient,
19} from '@pigweed/pw_rpc';
20import {Status} from '@pigweed/pw_status';
21import {Chunk} from 'transfer_proto_tspb/transfer_proto_tspb_pb/pw_transfer/transfer_pb';
22
23export class ProgressStats {
24  constructor(
25    readonly bytesSent: number,
26    readonly bytesConfirmedReceived: number,
27    readonly totalSizeBytes?: number
28  ) {}
29
30  get percentReceived(): number {
31    if (this.totalSizeBytes === undefined) {
32      return NaN;
33    }
34    return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100;
35  }
36
37  toString(): string {
38    const total =
39      this.totalSizeBytes === undefined
40        ? 'undefined'
41        : this.totalSizeBytes.toString();
42    const percent = this.percentReceived.toFixed(2);
43    return (
44      `${percent}% (${this.bytesSent} B sent, ` +
45      `${this.bytesConfirmedReceived} B received of ${total} B)`
46    );
47  }
48}
49
50export type ProgressCallback = (stats: ProgressStats) => void;
51
52/** A Timer which invokes a callback after a certain timeout. */
53class Timer {
54  private task?: ReturnType<typeof setTimeout>;
55
56  constructor(
57    readonly timeoutS: number,
58    private readonly callback: () => any
59  ) {}
60
61  /**
62   * Starts a new timer.
63   *
64   * If a timer is already running, it is stopped and a new timer started.
65   * This can be used to implement watchdog-like behavior, where a callback
66   * is invoked after some time without a kick.
67   */
68  start() {
69    this.stop();
70    this.task = setTimeout(this.callback, this.timeoutS * 1000);
71  }
72
73  /** Terminates a running timer. */
74  stop() {
75    if (this.task !== undefined) {
76      clearTimeout(this.task);
77      this.task = undefined;
78    }
79  }
80}
81
82/**
83 * A client-side data transfer through a Manager.
84 *
85 * Subclasses are responsible for implementing all of the logic for their type
86 * of transfer, receiving messages from the server and sending the appropriate
87 * messages in response.
88 */
89export abstract class Transfer {
90  status: Status = Status.OK;
91  done: Promise<Status>;
92  protected data = new Uint8Array();
93
94  private retries = 0;
95  private responseTimer?: Timer;
96  private resolve?: (value: Status | PromiseLike<Status>) => void;
97
98  constructor(
99    public id: number,
100    protected sendChunk: (chunk: Chunk) => void,
101    responseTimeoutS: number,
102    private maxRetries: number,
103    private progressCallback?: ProgressCallback
104  ) {
105    this.responseTimer = new Timer(responseTimeoutS, this.onTimeout);
106    this.done = new Promise<Status>(resolve => {
107      this.resolve = resolve!;
108    });
109  }
110
111  /** Returns the initial chunk to notify the server of the transfer. */
112  protected abstract get initialChunk(): Chunk;
113
114  /** Handles a chunk that contains or requests data. */
115  protected abstract handleDataChunk(chunk: Chunk): void;
116
117  /** Retries after a timeout occurs. */
118  protected abstract retryAfterTimeout(): void;
119
120  /** Handles a timeout while waiting for a chunk. */
121  private onTimeout = () => {
122    this.retries += 1;
123    if (this.retries > this.maxRetries) {
124      this.finish(Status.DEADLINE_EXCEEDED);
125      return;
126    }
127
128    console.debug(
129      `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`
130    );
131
132    this.retryAfterTimeout();
133    this.responseTimer?.start();
134  };
135
136  /** Sends an error chunk to the server and finishes the transfer. */
137  protected sendError(error: Status): void {
138    const chunk = new Chunk();
139    chunk.setStatus(error);
140    chunk.setTransferId(this.id);
141    chunk.setType(Chunk.Type.TRANSFER_COMPLETION);
142    this.sendChunk(chunk);
143    this.finish(error);
144  }
145
146  /** Sends the initial chunk of the transfer. */
147  begin(): void {
148    this.sendChunk(this.initialChunk);
149    this.responseTimer?.start();
150  }
151
152  /** Ends the transfer with the specified status. */
153  finish(status: Status): void {
154    this.responseTimer?.stop();
155    this.responseTimer = undefined;
156    this.status = status;
157
158    if (status === Status.OK) {
159      const totalSize = this.data.length;
160      this.updateProgress(totalSize, totalSize, totalSize);
161    }
162
163    this.resolve!(this.status);
164  }
165
166  /** Invokes the provided progress callback, if any, with the progress */
167  updateProgress(
168    bytesSent: number,
169    bytesConfirmedReceived: number,
170    totalSizeBytes?: number
171  ): void {
172    const stats = new ProgressStats(
173      bytesSent,
174      bytesConfirmedReceived,
175      totalSizeBytes
176    );
177    console.debug(`Transfer ${this.id} progress: ${stats}`);
178
179    if (this.progressCallback !== undefined) {
180      this.progressCallback(stats);
181    }
182  }
183
184  /**
185   *  Processes an incoming chunk from the server.
186   *
187   *  Handles terminating chunks (i.e. those with a status) and forwards
188   *  non-terminating chunks to handle_data_chunk.
189   */
190  handleChunk(chunk: Chunk): void {
191    this.responseTimer?.stop();
192    this.retries = 0; // Received data from service, so reset the retries.
193
194    console.debug(`Received chunk:(${chunk})`);
195
196    // Status chunks are only used to terminate a transfer. They do not
197    // contain any data that requires processing.
198    if (chunk.hasStatus()) {
199      this.finish(chunk.getStatus());
200      return;
201    }
202
203    this.handleDataChunk(chunk);
204
205    // Start the timeout for the server to send a chunk in response.
206    this.responseTimer?.start();
207  }
208}
209
210/**
211 * A client <= server read transfer.
212 *
213 * Although typescript can effectively handle an unlimited transfer window, this
214 * client sets a conservative window and chunk size to avoid overloading the
215 * device. These are configurable in the constructor.
216 */
217export class ReadTransfer extends Transfer {
218  private maxBytesToReceive: number;
219  private maxChunkSize: number;
220  private chunkDelayMicroS?: number; // Microseconds
221  private remainingTransferSize?: number;
222  private offset = 0;
223  private pendingBytes: number;
224  private windowEndOffset: number;
225
226  // The fractional position within a window at which a receive transfer should
227  // extend its window size to minimize the amount of time the transmitter
228  // spends blocked.
229  //
230  // For example, a divisor of 2 will extend the window when half of the
231  // requested data has been received, a divisor of three will extend at a third
232  // of the window, and so on.
233  private static EXTEND_WINDOW_DIVISOR = 2;
234
235  data = new Uint8Array();
236
237  constructor(
238    id: number,
239    sendChunk: (chunk: Chunk) => void,
240    responseTimeoutS: number,
241    maxRetries: number,
242    progressCallback?: ProgressCallback,
243    maxBytesToReceive = 8192,
244    maxChunkSize = 1024,
245    chunkDelayMicroS?: number
246  ) {
247    super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
248    this.maxBytesToReceive = maxBytesToReceive;
249    this.maxChunkSize = maxChunkSize;
250    this.chunkDelayMicroS = chunkDelayMicroS;
251    this.pendingBytes = maxBytesToReceive;
252    this.windowEndOffset = maxBytesToReceive;
253  }
254
255  protected get initialChunk(): Chunk {
256    return this.transferParameters(Chunk.Type.TRANSFER_START);
257  }
258
259  /** Builds an updated transfer parameters chunk to send the server. */
260  private transferParameters(type: Chunk.TypeMap[keyof Chunk.TypeMap]): Chunk {
261    this.pendingBytes = this.maxBytesToReceive;
262    this.windowEndOffset = this.offset + this.maxBytesToReceive;
263
264    const chunk = new Chunk();
265    chunk.setTransferId(this.id);
266    chunk.setPendingBytes(this.pendingBytes);
267    chunk.setMaxChunkSizeBytes(this.maxChunkSize);
268    chunk.setOffset(this.offset);
269    chunk.setWindowEndOffset(this.windowEndOffset);
270    chunk.setType(type);
271
272    if (this.chunkDelayMicroS !== 0) {
273      chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!);
274    }
275    return chunk;
276  }
277
278  /**
279   * Processes an incoming chunk from the server.
280   *
281   * In a read transfer, the client receives data chunks from the server.
282   * Once all pending data is received, the transfer parameters are updated.
283   */
284  protected handleDataChunk(chunk: Chunk): void {
285    if (chunk.getOffset() != this.offset) {
286      // Initially, the transfer service only supports in-order transfers.
287      // If data is received out of order, request that the server
288      // retransmit from the previous offset.
289      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
290      return;
291    }
292
293    const oldData = this.data;
294    const chunkData = chunk.getData() as Uint8Array;
295    this.data = new Uint8Array(chunkData.length + oldData.length);
296    this.data.set(oldData);
297    this.data.set(chunkData, oldData.length);
298
299    this.pendingBytes -= chunk.getData().length;
300    this.offset += chunk.getData().length;
301
302    if (chunk.hasRemainingBytes()) {
303      if (chunk.getRemainingBytes() === 0) {
304        // No more data to read. Acknowledge receipt and finish.
305        const endChunk = new Chunk();
306        endChunk.setTransferId(this.id);
307        endChunk.setStatus(Status.OK);
308        endChunk.setType(Chunk.Type.TRANSFER_COMPLETION);
309        this.sendChunk(endChunk);
310        this.finish(Status.OK);
311        return;
312      }
313
314      this.remainingTransferSize = chunk.getRemainingBytes();
315    } else if (this.remainingTransferSize !== undefined) {
316      // Update the remaining transfer size, if it is known.
317      this.remainingTransferSize -= chunk.getData().length;
318
319      if (this.remainingTransferSize <= 0) {
320        this.remainingTransferSize = undefined;
321      }
322    }
323
324    if (chunk.getWindowEndOffset() !== 0) {
325      if (chunk.getWindowEndOffset() < this.offset) {
326        console.error(
327          `Transfer ${
328            this.id
329          }: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${
330            this.offset
331          })`
332        );
333        this.sendError(Status.INTERNAL);
334        return;
335      }
336
337      if (chunk.getWindowEndOffset() < this.offset) {
338        console.error(
339          `Transfer ${
340            this.id
341          }: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${
342            this.windowEndOffset
343          })`
344        );
345        this.sendError(Status.INTERNAL);
346        return;
347      }
348
349      this.windowEndOffset = chunk.getWindowEndOffset();
350      this.pendingBytes -= chunk.getWindowEndOffset() - this.offset;
351    }
352
353    const remainingWindowSize = this.windowEndOffset - this.offset;
354    const extendWindow =
355      remainingWindowSize <=
356      this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR;
357
358    const totalSize =
359      this.remainingTransferSize === undefined
360        ? undefined
361        : this.remainingTransferSize + this.offset;
362    this.updateProgress(this.offset, this.offset, totalSize);
363
364    if (this.pendingBytes === 0) {
365      // All pending data was received. Send out a new parameters chunk
366      // for the next block.
367      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
368    } else if (extendWindow) {
369      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE));
370    }
371  }
372
373  protected retryAfterTimeout(): void {
374    this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
375  }
376}
377
378/**
379 * A client => server write transfer.
380 */
381export class WriteTransfer extends Transfer {
382  readonly data: Uint8Array;
383  private windowId = 0;
384  offset = 0;
385  maxChunkSize = 0;
386  chunkDelayMicroS?: number;
387  windowEndOffset = 0;
388  lastChunk: Chunk;
389
390  constructor(
391    id: number,
392    data: Uint8Array,
393    sendChunk: (chunk: Chunk) => void,
394    responseTimeoutS: number,
395    initialResponseTimeoutS: number,
396    maxRetries: number,
397    progressCallback?: ProgressCallback
398  ) {
399    super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
400    this.data = data;
401    this.lastChunk = this.initialChunk;
402  }
403
404  protected get initialChunk(): Chunk {
405    const chunk = new Chunk();
406    chunk.setTransferId(this.id);
407    chunk.setType(Chunk.Type.TRANSFER_START);
408    return chunk;
409  }
410
411  /**
412   * Processes an incoming chunk from the server.
413   *
414   * In a write transfer, the server only sends transfer parameter updates
415   * to the client. When a message is received, update local parameters and
416   * send data accordingly.
417   */
418  protected handleDataChunk(chunk: Chunk): void {
419    this.windowId += 1;
420    const initialWindowId = this.windowId;
421
422    if (!this.handleParametersUpdate(chunk)) {
423      return;
424    }
425
426    const bytesAknowledged = chunk.getOffset();
427
428    let writeChunk: Chunk;
429    while (true) {
430      writeChunk = this.nextChunk();
431      this.offset += writeChunk.getData().length;
432      const sentRequestedBytes = this.offset === this.windowEndOffset;
433
434      this.updateProgress(this.offset, bytesAknowledged, this.data.length);
435      this.sendChunk(writeChunk);
436
437      if (sentRequestedBytes) {
438        break;
439      }
440    }
441
442    this.lastChunk = writeChunk;
443  }
444
445  /** Updates transfer state base on a transfer parameters update. */
446  private handleParametersUpdate(chunk: Chunk): boolean {
447    let retransmit = true;
448    if (chunk.hasType()) {
449      retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT;
450    }
451
452    if (chunk.getOffset() > this.data.length) {
453      // Bad offset; terminate the transfer.
454      console.error(
455        `Transfer ${
456          this.id
457        }: server requested invalid offset ${chunk.getOffset()} (size ${
458          this.data.length
459        })`
460      );
461
462      this.sendError(Status.OUT_OF_RANGE);
463      return false;
464    }
465
466    if (chunk.getPendingBytes() === 0) {
467      console.error(
468        `Transfer ${this.id}: service requested 0 bytes (invalid); aborting`
469      );
470      this.sendError(Status.INTERNAL);
471      return false;
472    }
473
474    if (retransmit) {
475      // Check whether the client has sent a previous data offset, which
476      // indicates that some chunks were lost in transmission.
477      if (chunk.getOffset() < this.offset) {
478        console.debug(
479          `Write transfer ${
480            this.id
481          } rolling back to offset ${chunk.getOffset()} from ${this.offset}`
482        );
483      }
484
485      this.offset = chunk.getOffset();
486
487      // Retransmit is the default behavior for older versions of the
488      // transfer protocol. The window_end_offset field is not guaranteed
489      // to be set in these version, so it must be calculated.
490      const maxBytesToSend = Math.min(
491        chunk.getPendingBytes(),
492        this.data.length - this.offset
493      );
494      this.windowEndOffset = this.offset + maxBytesToSend;
495    } else {
496      // Extend the window to the new end offset specified by the server.
497      this.windowEndOffset = Math.min(
498        chunk.getWindowEndOffset(),
499        this.data.length
500      );
501    }
502
503    if (chunk.hasMaxChunkSizeBytes()) {
504      this.maxChunkSize = chunk.getMaxChunkSizeBytes();
505    }
506
507    if (chunk.hasMinDelayMicroseconds()) {
508      this.chunkDelayMicroS = chunk.getMinDelayMicroseconds();
509    }
510    return true;
511  }
512
513  /** Returns the next Chunk message to send in the data transfer. */
514  private nextChunk(): Chunk {
515    const chunk = new Chunk();
516    chunk.setTransferId(this.id);
517    chunk.setOffset(this.offset);
518    chunk.setType(Chunk.Type.TRANSFER_DATA);
519
520    const maxBytesInChunk = Math.min(
521      this.maxChunkSize,
522      this.windowEndOffset - this.offset
523    );
524
525    chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk));
526
527    // Mark the final chunk of the transfer.
528    if (this.data.length - this.offset <= maxBytesInChunk) {
529      chunk.setRemainingBytes(0);
530    }
531    return chunk;
532  }
533
534  protected retryAfterTimeout(): void {
535    this.sendChunk(this.lastChunk);
536  }
537}
538