// Copyright 2022 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. /** Client for the pw_transfer service, which transmits data over pw_rpc. */ import { BidirectionalStreamingCall, BidirectionalStreamingMethodStub, ServiceClient, } from 'pigweedjs/pw_rpc'; import {Status} from 'pigweedjs/pw_status'; import {Chunk} from 'pigweedjs/protos/pw_transfer/transfer_pb'; import { ReadTransfer, ProgressCallback, Transfer, WriteTransfer, } from './transfer'; type TransferDict = { [key: number]: Transfer; }; const DEFAULT_MAX_RETRIES = 3; const DEFAULT_RESPONSE_TIMEOUT_S = 2; const DEFAULT_INITIAL_RESPONSE_TIMEOUT = 4; /** * A manager for transmitting data through an RPC TransferService. * * This should be initialized with an active Manager over an RPC channel. Only * one instance of this class should exist for a configured RPC TransferService * -- the Manager supports multiple simultaneous transfers. * * When created, a Manager starts a separate thread in which transfer * communications and events are handled. */ export class Manager { // Ongoing transfers in the service by ID private readTransfers: TransferDict = {}; private writeTransfers: TransferDict = {}; // RPC streams for read and write transfers. These are shareable by // multiple transfers of the same type. private readStream?: BidirectionalStreamingCall; private writeStream?: BidirectionalStreamingCall; /** * Initializes a Manager on top of a TransferService. * * Args: * @param{ServiceClient} service: the pw_rpc transfer service * client * @param{number} defaultResponseTimeoutS: max time to wait between receiving * packets * @param{number} initialResponseTimeoutS: timeout for the first packet; may * be longer to account for transfer handler initialization * @param{number} maxRetries: number of times to retry after a timeout */ constructor( private service: ServiceClient, private defaultResponseTimeoutS = DEFAULT_RESPONSE_TIMEOUT_S, private initialResponseTimeoutS = DEFAULT_INITIAL_RESPONSE_TIMEOUT, private maxRetries = DEFAULT_MAX_RETRIES ) {} /** * Receives ("downloads") data from the server. * * @throws Throws an error when the transfer fails to complete. */ async read( resourceId: number, progressCallback?: ProgressCallback ): Promise { if (resourceId in this.readTransfers) { throw new Error( `Read transfer for resource ${resourceId} already exists` ); } const transfer = new ReadTransfer( resourceId, this.sendReadChunkCallback, this.defaultResponseTimeoutS, this.maxRetries, progressCallback ); this.startReadTransfer(transfer); const status = await transfer.done; delete this.readTransfers[transfer.id]; if (status !== Status.OK) { throw new TransferError(transfer.id, transfer.status); } return transfer.data; } /** Begins a new read transfer, opening the stream if it isn't. */ startReadTransfer(transfer: Transfer): void { this.readTransfers[transfer.id] = transfer; if (this.readStream === undefined) { this.openReadStream(); } console.debug(`Starting new read transfer ${transfer.id}`); transfer.begin(); } /** Transmits (uploads) data to the server. * * @param{number} resourceId: ID of the resource to which to write. * @param{Uint8Array} data: Data to send to the server. */ async write( resourceId: number, data: Uint8Array, progressCallback?: ProgressCallback ): Promise { const transfer = new WriteTransfer( resourceId, data, this.sendWriteChunkCallback, this.defaultResponseTimeoutS, this.initialResponseTimeoutS, this.maxRetries, progressCallback ); this.startWriteTransfer(transfer); const status = await transfer.done; delete this.writeTransfers[transfer.id]; if (transfer.status !== Status.OK) { throw new TransferError(transfer.id, transfer.status); } } sendReadChunkCallback = (chunk: Chunk) => { this.readStream!.send(chunk); }; sendWriteChunkCallback = (chunk: Chunk) => { this.writeStream!.send(chunk); }; /** Begins a new write transfer, opening the stream if it isn't */ startWriteTransfer(transfer: Transfer): void { this.writeTransfers[transfer.id] = transfer; if (!this.writeStream) { this.openWriteStream(); } console.debug(`Starting new write transfer ${transfer.id}`); transfer.begin(); } private openReadStream(): void { const readRpc = this.service.method( 'Read' )! as BidirectionalStreamingMethodStub; this.readStream = readRpc.invoke( (chunk: Chunk) => { this.handleChunk(this.readTransfers, chunk); }, () => {}, this.onReadError ); } private openWriteStream(): void { const writeRpc = this.service.method( 'Write' )! as BidirectionalStreamingMethodStub; this.writeStream = writeRpc.invoke( (chunk: Chunk) => { this.handleChunk(this.writeTransfers, chunk); }, () => {}, this.onWriteError ); } /** * Callback for an RPC error in the read stream. */ private onReadError = (status: Status) => { if (status === Status.FAILED_PRECONDITION) { // FAILED_PRECONDITION indicates that the stream packet was not // recognized as the stream is not open. This could occur if the // server resets during an active transfer. Re-open the stream to // allow pending transfers to continue. this.openReadStream(); return; } // Other errors are unrecoverable. Clear the stream and cancel any // pending transfers with an INTERNAL status as this is a system // error. this.readStream = undefined; for (const key in this.readTransfers) { const transfer = this.readTransfers[key]; transfer.finish(Status.INTERNAL); } this.readTransfers = {}; console.error(`Read stream shut down ${Status[status]}`); }; private onWriteError = (status: Status) => { if (status === Status.FAILED_PRECONDITION) { // FAILED_PRECONDITION indicates that the stream packet was not // recognized as the stream is not open. This could occur if the // server resets during an active transfer. Re-open the stream to // allow pending transfers to continue. this.openWriteStream(); } else { // Other errors are unrecoverable. Clear the stream and cancel any // pending transfers with an INTERNAL status as this is a system // error. this.writeStream = undefined; for (const key in this.writeTransfers) { const transfer = this.writeTransfers[key]; transfer.finish(Status.INTERNAL); } this.writeTransfers = {}; console.error(`Write stream shut down: ${Status[status]}`); } }; /** * Processes an incoming chunk from a stream. * * The chunk is dispatched to an active transfer based on its ID. If the * transfer indicates that it is complete, the provided completion callback * is invoked. */ private async handleChunk(transfers: TransferDict, chunk: Chunk) { const transfer = transfers[chunk.getTransferId()]; if (transfer === undefined) { console.error( `TransferManager received chunk for unknown transfer ${chunk.getTransferId()}` ); return; } transfer.handleChunk(chunk); } } /** * Exception raised when a transfer fails. * * Stores the ID of the failed transfer and the error that occured. */ class TransferError extends Error { id: number; status: Status; constructor(id: number, status: Status) { super(`Transfer ${id} failed with status ${Status[status]}`); this.status = status; this.id = id; } }