• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15import { Status } from 'pigweedjs/pw_status';
16import { Chunk } from 'pigweedjs/protos/pw_transfer/transfer_pb';
17
18export class ProgressStats {
19  constructor(
20    readonly bytesSent: number,
21    readonly bytesConfirmedReceived: number,
22    readonly totalSizeBytes?: number,
23  ) {}
24
25  get percentReceived(): number {
26    if (this.totalSizeBytes === undefined) {
27      return NaN;
28    }
29    return (this.bytesConfirmedReceived / this.totalSizeBytes) * 100;
30  }
31
32  toString(): string {
33    const total =
34      this.totalSizeBytes === undefined
35        ? 'undefined'
36        : this.totalSizeBytes.toString();
37    const percent = this.percentReceived.toFixed(2);
38    return (
39      `${percent}% (${this.bytesSent} B sent, ` +
40      `${this.bytesConfirmedReceived} B received of ${total} B)`
41    );
42  }
43}
44
45export type ProgressCallback = (stats: ProgressStats) => void;
46
47/** A Timer which invokes a callback after a certain timeout. */
48class Timer {
49  private task?: ReturnType<typeof setTimeout>;
50
51  constructor(
52    readonly timeoutS: number,
53    private readonly callback: () => any,
54  ) {}
55
56  /**
57   * Starts a new timer.
58   *
59   * If a timer is already running, it is stopped and a new timer started.
60   * This can be used to implement watchdog-like behavior, where a callback
61   * is invoked after some time without a kick.
62   */
63  start() {
64    this.stop();
65    this.task = setTimeout(this.callback, this.timeoutS * 1000);
66  }
67
68  /** Terminates a running timer. */
69  stop() {
70    if (this.task !== undefined) {
71      clearTimeout(this.task);
72      this.task = undefined;
73    }
74  }
75}
76
77/**
78 * A client-side data transfer through a Manager.
79 *
80 * Subclasses are responsible for implementing all of the logic for their type
81 * of transfer, receiving messages from the server and sending the appropriate
82 * messages in response.
83 */
84export abstract class Transfer {
85  status: Status = Status.OK;
86  done: Promise<Status>;
87  data = new Uint8Array(0);
88
89  private retries = 0;
90  private responseTimer?: Timer;
91  private resolve?: (value: Status | PromiseLike<Status>) => void;
92
93  constructor(
94    public id: number,
95    protected sendChunk: (chunk: Chunk) => void,
96    responseTimeoutS: number,
97    private maxRetries: number,
98    private progressCallback?: ProgressCallback,
99  ) {
100    this.responseTimer = new Timer(responseTimeoutS, this.onTimeout);
101    this.done = new Promise<Status>((resolve) => {
102      this.resolve = resolve!;
103    });
104  }
105
106  /** Returns the initial chunk to notify the server of the transfer. */
107  protected abstract get initialChunk(): Chunk;
108
109  /** Handles a chunk that contains or requests data. */
110  protected abstract handleDataChunk(chunk: Chunk): void;
111
112  /** Retries after a timeout occurs. */
113  protected abstract retryAfterTimeout(): void;
114
115  /** Handles a timeout while waiting for a chunk. */
116  private onTimeout = () => {
117    this.retries += 1;
118    if (this.retries > this.maxRetries) {
119      this.finish(Status.DEADLINE_EXCEEDED);
120      return;
121    }
122
123    console.debug(
124      `Received no responses for ${this.responseTimer?.timeoutS}; retrying ${this.retries}/${this.maxRetries}`,
125    );
126
127    this.retryAfterTimeout();
128    this.responseTimer?.start();
129  };
130
131  /** Sends the initial chunk of the transfer. */
132  begin(): void {
133    this.sendChunk(this.initialChunk as any);
134    this.responseTimer?.start();
135  }
136
137  /** Ends the transfer with the specified status */
138  protected finish(status: Status): void {
139    this.responseTimer?.stop();
140    this.responseTimer = undefined;
141    this.status = status;
142
143    if (status === Status.OK) {
144      const totalSize = this.data.length;
145      this.updateProgress(totalSize, totalSize, totalSize);
146    }
147
148    this.resolve!(this.status);
149  }
150
151  /** Ends the transfer without sending a completion chunk */
152  abort(status: Status): void {
153    this.finish(status);
154  }
155
156  /** Ends the transfer and sends a completion chunk */
157  terminate(status: Status): void {
158    const chunk = new Chunk();
159    chunk.setStatus(status);
160    chunk.setTransferId(this.id);
161    chunk.setType(Chunk.Type.COMPLETION);
162    this.sendChunk(chunk);
163    this.abort(status);
164  }
165
166  /** Invokes the provided progress callback, if any, with the progress */
167  updateProgress(
168    bytesSent: number,
169    bytesConfirmedReceived: number,
170    totalSizeBytes?: number,
171  ): void {
172    const stats = new ProgressStats(
173      bytesSent,
174      bytesConfirmedReceived,
175      totalSizeBytes,
176    );
177    console.debug(`Transfer ${this.id} progress: ${stats}`);
178
179    if (this.progressCallback !== undefined) {
180      this.progressCallback(stats);
181    }
182  }
183
184  /**
185   *  Processes an incoming chunk from the server.
186   *
187   *  Handles terminating chunks (i.e. those with a status) and forwards
188   *  non-terminating chunks to handle_data_chunk.
189   */
190  handleChunk(chunk: Chunk): void {
191    this.responseTimer?.stop();
192    this.retries = 0; // Received data from service, so reset the retries.
193
194    console.debug(`Received chunk:(${chunk})`);
195
196    // Status chunks are only used to terminate a transfer. They do not
197    // contain any data that requires processing.
198    if (chunk.hasStatus()) {
199      this.finish(chunk.getStatus());
200      return;
201    }
202
203    this.handleDataChunk(chunk);
204
205    // Start the timeout for the server to send a chunk in response.
206    this.responseTimer?.start();
207  }
208}
209
210/**
211 * A client <= server read transfer.
212 *
213 * Although typescript can effectively handle an unlimited transfer window, this
214 * client sets a conservative window and chunk size to avoid overloading the
215 * device. These are configurable in the constructor.
216 */
217export class ReadTransfer extends Transfer {
218  private maxBytesToReceive: number;
219  private maxChunkSize: number;
220  private chunkDelayMicroS?: number; // Microseconds
221  private remainingTransferSize?: number;
222  private offset = 0;
223  private pendingBytes: number;
224  private windowEndOffset: number;
225
226  // The fractional position within a window at which a receive transfer should
227  // extend its window size to minimize the amount of time the transmitter
228  // spends blocked.
229  //
230  // For example, a divisor of 2 will extend the window when half of the
231  // requested data has been received, a divisor of three will extend at a third
232  // of the window, and so on.
233  private static EXTEND_WINDOW_DIVISOR = 2;
234
235  constructor(
236    id: number,
237    sendChunk: (chunk: Chunk) => void,
238    responseTimeoutS: number,
239    maxRetries: number,
240    progressCallback?: ProgressCallback,
241    maxBytesToReceive = 8192,
242    maxChunkSize = 1024,
243    chunkDelayMicroS?: number,
244  ) {
245    super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
246    this.maxBytesToReceive = maxBytesToReceive;
247    this.maxChunkSize = maxChunkSize;
248    this.chunkDelayMicroS = chunkDelayMicroS;
249    this.pendingBytes = maxBytesToReceive;
250    this.windowEndOffset = maxBytesToReceive;
251  }
252
253  protected get initialChunk(): any {
254    return this.transferParameters(Chunk.Type.START);
255  }
256
257  /** Builds an updated transfer parameters chunk to send the server. */
258  private transferParameters(type: any): Chunk {
259    this.pendingBytes = this.maxBytesToReceive;
260    this.windowEndOffset = this.offset + this.maxBytesToReceive;
261
262    const chunk = new Chunk();
263    chunk.setTransferId(this.id);
264    chunk.setPendingBytes(this.pendingBytes);
265    chunk.setMaxChunkSizeBytes(this.maxChunkSize);
266    chunk.setOffset(this.offset);
267    chunk.setWindowEndOffset(this.windowEndOffset);
268    chunk.setType(type);
269
270    if (this.chunkDelayMicroS !== 0) {
271      chunk.setMinDelayMicroseconds(this.chunkDelayMicroS!);
272    }
273    return chunk;
274  }
275
276  /**
277   * Processes an incoming chunk from the server.
278   *
279   * In a read transfer, the client receives data chunks from the server.
280   * Once all pending data is received, the transfer parameters are updated.
281   */
282  protected handleDataChunk(chunk: Chunk): void {
283    if (chunk.getOffset() != this.offset) {
284      // Initially, the transfer service only supports in-order transfers.
285      // If data is received out of order, request that the server
286      // retransmit from the previous offset.
287      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
288      return;
289    }
290
291    const oldData = this.data;
292    const chunkData = chunk.getData() as Uint8Array;
293    this.data = new Uint8Array(chunkData.length + oldData.length);
294    this.data.set(oldData);
295    this.data.set(chunkData, oldData.length);
296
297    this.pendingBytes -= chunk.getData().length;
298    this.offset += chunk.getData().length;
299
300    if (chunk.hasRemainingBytes()) {
301      if (chunk.getRemainingBytes() === 0) {
302        // No more data to read. Acknowledge receipt and finish.
303        this.terminate(Status.OK);
304        return;
305      }
306
307      this.remainingTransferSize = chunk.getRemainingBytes();
308    } else if (this.remainingTransferSize !== undefined) {
309      // Update the remaining transfer size, if it is known.
310      this.remainingTransferSize -= chunk.getData().length;
311
312      if (this.remainingTransferSize <= 0) {
313        this.remainingTransferSize = undefined;
314      }
315    }
316
317    if (chunk.getWindowEndOffset() !== 0) {
318      if (chunk.getWindowEndOffset() < this.offset) {
319        console.error(
320          `Transfer ${
321            this.id
322          }: transmitter sent invalid earlier end offset ${chunk.getWindowEndOffset()} (receiver offset ${
323            this.offset
324          })`,
325        );
326        this.terminate(Status.INTERNAL);
327        return;
328      }
329
330      if (chunk.getWindowEndOffset() < this.offset) {
331        console.error(
332          `Transfer ${
333            this.id
334          }: transmitter sent invalid later end offset ${chunk.getWindowEndOffset()} (receiver end offset ${
335            this.windowEndOffset
336          })`,
337        );
338        this.terminate(Status.INTERNAL);
339        return;
340      }
341
342      this.windowEndOffset = chunk.getWindowEndOffset();
343      this.pendingBytes -= chunk.getWindowEndOffset() - this.offset;
344    }
345
346    const remainingWindowSize = this.windowEndOffset - this.offset;
347    const extendWindow =
348      remainingWindowSize <=
349      this.maxBytesToReceive / ReadTransfer.EXTEND_WINDOW_DIVISOR;
350
351    const totalSize =
352      this.remainingTransferSize === undefined
353        ? undefined
354        : this.remainingTransferSize + this.offset;
355    this.updateProgress(this.offset, this.offset, totalSize);
356
357    if (this.pendingBytes === 0) {
358      // All pending data was received. Send out a new parameters chunk
359      // for the next block.
360      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
361    } else if (extendWindow) {
362      this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_CONTINUE));
363    }
364  }
365
366  protected retryAfterTimeout(): void {
367    this.sendChunk(this.transferParameters(Chunk.Type.PARAMETERS_RETRANSMIT));
368  }
369}
370
371/**
372 * A client => server write transfer.
373 */
374export class WriteTransfer extends Transfer {
375  private windowId = 0;
376  offset = 0;
377  maxChunkSize = 0;
378  chunkDelayMicroS?: number;
379  windowEndOffset = 0;
380  lastChunk: Chunk;
381
382  constructor(
383    id: number,
384    data: Uint8Array,
385    sendChunk: (chunk: Chunk) => void,
386    responseTimeoutS: number,
387    initialResponseTimeoutS: number,
388    maxRetries: number,
389    progressCallback?: ProgressCallback,
390  ) {
391    super(id, sendChunk, responseTimeoutS, maxRetries, progressCallback);
392    this.data = data;
393    this.lastChunk = this.initialChunk;
394  }
395
396  protected get initialChunk(): any {
397    // TODO(frolv): The session ID should not be set here but assigned by the
398    // server during an initial handshake.
399    const chunk = new Chunk();
400    chunk.setTransferId(this.id);
401    chunk.setResourceId(this.id);
402    chunk.setType(Chunk.Type.START);
403    return chunk;
404  }
405
406  /**
407   * Processes an incoming chunk from the server.
408   *
409   * In a write transfer, the server only sends transfer parameter updates
410   * to the client. When a message is received, update local parameters and
411   * send data accordingly.
412   */
413  protected handleDataChunk(chunk: Chunk): void {
414    this.windowId += 1;
415    const initialWindowId = this.windowId;
416
417    if (!this.handleParametersUpdate(chunk)) {
418      return;
419    }
420
421    const bytesAknowledged = chunk.getOffset();
422
423    let writeChunk: Chunk;
424    // eslint-disable-next-line no-constant-condition
425    while (true) {
426      writeChunk = this.nextChunk();
427      this.offset += writeChunk.getData().length;
428      const sentRequestedBytes = this.offset === this.windowEndOffset;
429
430      this.updateProgress(this.offset, bytesAknowledged, this.data.length);
431      this.sendChunk(writeChunk);
432
433      if (sentRequestedBytes) {
434        break;
435      }
436    }
437
438    this.lastChunk = writeChunk;
439  }
440
441  /** Updates transfer state base on a transfer parameters update. */
442  private handleParametersUpdate(chunk: Chunk): boolean {
443    let retransmit = true;
444    if (chunk.hasType()) {
445      retransmit = chunk.getType() === Chunk.Type.PARAMETERS_RETRANSMIT;
446    }
447
448    if (chunk.getOffset() > this.data.length) {
449      // Bad offset; terminate the transfer.
450      console.error(
451        `Transfer ${
452          this.id
453        }: server requested invalid offset ${chunk.getOffset()} (size ${
454          this.data.length
455        })`,
456      );
457
458      this.terminate(Status.OUT_OF_RANGE);
459      return false;
460    }
461
462    if (chunk.getPendingBytes() === 0) {
463      console.error(
464        `Transfer ${this.id}: service requested 0 bytes (invalid); aborting`,
465      );
466      this.terminate(Status.INTERNAL);
467      return false;
468    }
469
470    if (retransmit) {
471      // Check whether the client has sent a previous data offset, which
472      // indicates that some chunks were lost in transmission.
473      if (chunk.getOffset() < this.offset) {
474        console.debug(
475          `Write transfer ${
476            this.id
477          } rolling back to offset ${chunk.getOffset()} from ${this.offset}`,
478        );
479      }
480
481      this.offset = chunk.getOffset();
482
483      // Retransmit is the default behavior for older versions of the
484      // transfer protocol. The window_end_offset field is not guaranteed
485      // to be set in these version, so it must be calculated.
486      const maxBytesToSend = Math.min(
487        chunk.getPendingBytes(),
488        this.data.length - this.offset,
489      );
490      this.windowEndOffset = this.offset + maxBytesToSend;
491    } else {
492      // Extend the window to the new end offset specified by the server.
493      this.windowEndOffset = Math.min(
494        chunk.getWindowEndOffset(),
495        this.data.length,
496      );
497    }
498
499    if (chunk.hasMaxChunkSizeBytes()) {
500      this.maxChunkSize = chunk.getMaxChunkSizeBytes();
501    }
502
503    if (chunk.hasMinDelayMicroseconds()) {
504      this.chunkDelayMicroS = chunk.getMinDelayMicroseconds();
505    }
506    return true;
507  }
508
509  /** Returns the next Chunk message to send in the data transfer. */
510  private nextChunk(): Chunk {
511    const chunk = new Chunk();
512    chunk.setTransferId(this.id);
513    chunk.setOffset(this.offset);
514    chunk.setType(Chunk.Type.DATA);
515
516    const maxBytesInChunk = Math.min(
517      this.maxChunkSize,
518      this.windowEndOffset - this.offset,
519    );
520
521    chunk.setData(this.data.slice(this.offset, this.offset + maxBytesInChunk));
522
523    // Mark the final chunk of the transfer.
524    if (this.data.length - this.offset <= maxBytesInChunk) {
525      chunk.setRemainingBytes(0);
526    }
527    return chunk;
528  }
529
530  protected retryAfterTimeout(): void {
531    this.sendChunk(this.lastChunk);
532  }
533}
534