• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2022 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/* eslint-env browser */
16import { BehaviorSubject, Observable, Subject, Subscription } from 'rxjs';
17import DeviceTransport from './device_transport';
18import type {
19  SerialPort,
20  Serial,
21  SerialOptions,
22  Navigator,
23  SerialPortFilter,
24} from '../types/serial';
25
26const DEFAULT_SERIAL_OPTIONS: SerialOptions & { baudRate: number } = {
27  // Some versions of chrome use `baudrate` (linux)
28  baudrate: 115200,
29  // Some versions use `baudRate` (chromebook)
30  baudRate: 115200,
31  databits: 8,
32  parity: 'none',
33  stopbits: 1,
34};
35
36interface PortReadConnection {
37  chunks: Observable<Uint8Array>;
38  errors: Observable<Error>;
39}
40
41interface PortConnection extends PortReadConnection {
42  sendChunk: (chunk: Uint8Array) => Promise<void>;
43}
44
45export class DeviceLostError extends Error {
46  override message = 'The device has been lost';
47}
48
49export class DeviceLockedError extends Error {
50  override message =
51    "The device's port is locked. Try unplugging it" +
52    ' and plugging it back in.';
53}
54
55/**
56 * WebSerialTransport sends and receives UInt8Arrays to and
57 * from a serial device connected over USB.
58 */
59export class WebSerialTransport implements DeviceTransport {
60  chunks = new Subject<Uint8Array>();
61  errors = new Subject<Error>();
62  connected = new BehaviorSubject<boolean>(false);
63  private portConnections: Map<SerialPort, PortConnection> = new Map();
64  private activePortConnectionConnection: PortConnection | undefined;
65  private rxSubscriptions: Subscription[] = [];
66  private writer: WritableStreamDefaultWriter<Uint8Array> | undefined;
67  private abortController: AbortController | undefined;
68
69  constructor(
70    private serial: Serial = (navigator as unknown as Navigator).serial,
71    private filters: SerialPortFilter[] = [],
72    private serialOptions = DEFAULT_SERIAL_OPTIONS,
73  ) {}
74
75  /**
76   * Send a UInt8Array chunk of data to the connected device.
77   * @param {Uint8Array} chunk The chunk to send
78   */
79  async sendChunk(chunk: Uint8Array): Promise<void> {
80    if (this.activePortConnectionConnection) {
81      return this.activePortConnectionConnection.sendChunk(chunk);
82    }
83    throw new Error('Device not connected');
84  }
85
86  /**
87   * Attempt to open a connection to a device. This includes
88   * asking the user to select a serial port and should only
89   * be called in response to user interaction.
90   */
91  async connect(): Promise<void> {
92    const port = await this.serial.requestPort({ filters: this.filters });
93    await this.connectPort(port);
94  }
95
96  async disconnect() {
97    for (const subscription of this.rxSubscriptions) {
98      subscription.unsubscribe();
99    }
100    this.rxSubscriptions = [];
101
102    this.activePortConnectionConnection = undefined;
103    this.portConnections.clear();
104    this.abortController?.abort();
105
106    try {
107      await this.writer?.close();
108    } catch (err) {
109      this.errors.next(err as Error);
110    }
111    this.connected.next(false);
112  }
113
114  /**
115   * Connect to a given SerialPort. This involves no user interaction.
116   * and can be called whenever a port is available.
117   */
118  async connectPort(port: SerialPort): Promise<void> {
119    this.activePortConnectionConnection =
120      this.portConnections.get(port) ?? (await this.connectNewPort(port));
121
122    this.connected.next(true);
123
124    this.rxSubscriptions.push(
125      this.activePortConnectionConnection.chunks.subscribe(
126        (chunk: any) => {
127          this.chunks.next(chunk);
128        },
129        (err: any) => {
130          throw new Error(`Chunks observable had an unexpected error ${err}`);
131        },
132        () => {
133          this.connected.next(false);
134          this.portConnections.delete(port);
135          // Don't complete the chunks observable because then it would not
136          // be able to forward any future chunks.
137        },
138      ),
139    );
140
141    this.rxSubscriptions.push(
142      this.activePortConnectionConnection.errors.subscribe((error: any) => {
143        this.errors.next(error);
144        if (error instanceof DeviceLostError) {
145          // The device has been lost
146          this.connected.next(false);
147        }
148      }),
149    );
150  }
151
152  private async connectNewPort(port: SerialPort): Promise<PortConnection> {
153    await port.open(this.serialOptions);
154    const writer = port.writable.getWriter();
155    this.writer = writer;
156
157    async function sendChunk(chunk: Uint8Array) {
158      await writer.ready;
159      await writer.write(chunk);
160    }
161
162    const { chunks, errors } = this.getChunks(port);
163
164    const connection: PortConnection = { sendChunk, chunks, errors };
165    this.portConnections.set(port, connection);
166    return connection;
167  }
168
169  private getChunks(port: SerialPort): PortReadConnection {
170    const chunks = new Subject<Uint8Array>();
171    const errors = new Subject<Error>();
172    const abortController = new AbortController();
173    this.abortController = abortController;
174
175    async function read() {
176      if (!port.readable) {
177        throw new DeviceLostError();
178      }
179      if (port.readable.locked) {
180        throw new DeviceLockedError();
181      }
182      await port.readable.pipeTo(
183        new WritableStream({
184          write: (chunk) => {
185            chunks.next(chunk);
186          },
187          close: () => {
188            chunks.complete();
189            errors.complete();
190          },
191        }),
192        { signal: abortController.signal },
193      );
194    }
195
196    function connect() {
197      read().catch((err) => {
198        // Don't error the chunks observable since that stops it from
199        // reading any more packets, and we often want to continue
200        // despite an error. Instead, push errors to the 'errors'
201        // observable.
202        errors.next(err);
203      });
204    }
205
206    connect();
207
208    return { chunks, errors };
209  }
210}
211