• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright 2020 The Pigweed Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License"); you may not
4// use this file except in compliance with the License. You may obtain a copy of
5// the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12// License for the specific language governing permissions and limitations under
13// the License.
14
15/* eslint-env browser */
16import {BehaviorSubject, Observable, Subject, Subscription} from 'rxjs';
17
18import DeviceTransport from './device_transport';
19
20const DEFAULT_SERIAL_OPTIONS: SerialOptions & {baudRate: number} = {
21  // Some versions of chrome use `baudrate` (linux)
22  baudrate: 115200,
23  // Some versions use `baudRate` (chromebook)
24  baudRate: 115200,
25  databits: 8,
26  parity: 'none',
27  stopbits: 1,
28};
29
30interface PortReadConnection {
31  chunks: Observable<Uint8Array>;
32  errors: Observable<Error>;
33}
34
35interface PortConnection extends PortReadConnection {
36  sendChunk: (chunk: Uint8Array) => Promise<void>;
37}
38
39export class DeviceLostError extends Error {
40  message = 'The device has been lost';
41}
42
43export class DeviceLockedError extends Error {
44  message =
45    "The device's port is locked. Try unplugging it" +
46    ' and plugging it back in.';
47}
48
49/**
50 * WebSerialTransport sends and receives UInt8Arrays to and
51 * from a serial device connected over USB.
52 */
53export class WebSerialTransport implements DeviceTransport {
54  chunks = new Subject<Uint8Array>();
55  errors = new Subject<Error>();
56  connected = new BehaviorSubject<boolean>(false);
57  private portConnections: Map<SerialPort, PortConnection> = new Map();
58  private activePortConnectionConnection: PortConnection | undefined;
59  private rxSubscriptions: Subscription[] = [];
60
61  constructor(
62    private serial: Serial = navigator.serial,
63    private filters: SerialPortFilter[] = [],
64    private serialOptions = DEFAULT_SERIAL_OPTIONS
65  ) {}
66
67  /**
68   * Send a UInt8Array chunk of data to the connected device.
69   * @param {Uint8Array} chunk The chunk to send
70   */
71  async sendChunk(chunk: Uint8Array): Promise<void> {
72    if (this.activePortConnectionConnection) {
73      return this.activePortConnectionConnection.sendChunk(chunk);
74    }
75    throw new Error('Device not connected');
76  }
77
78  /**
79   * Attempt to open a connection to a device. This includes
80   * asking the user to select a serial port and should only
81   * be called in response to user interaction.
82   */
83  async connect(): Promise<void> {
84    const port = await this.serial.requestPort({filters: this.filters});
85    await this.connectPort(port);
86  }
87
88  private disconnect() {
89    for (const subscription of this.rxSubscriptions) {
90      subscription.unsubscribe();
91    }
92    this.rxSubscriptions = [];
93
94    this.activePortConnectionConnection = undefined;
95    this.connected.next(false);
96  }
97
98  /**
99   * Connect to a given SerialPort. This involves no user interaction.
100   * and can be called whenever a port is available.
101   */
102  async connectPort(port: SerialPort): Promise<void> {
103    this.disconnect();
104
105    this.activePortConnectionConnection =
106      this.portConnections.get(port) ?? (await this.conectNewPort(port));
107
108    this.connected.next(true);
109
110    this.rxSubscriptions.push(
111      this.activePortConnectionConnection.chunks.subscribe(
112        (chunk: any) => {
113          this.chunks.next(chunk);
114        },
115        (err: any) => {
116          throw new Error(`Chunks observable had an unexpected error ${err}`);
117        },
118        () => {
119          this.connected.next(false);
120          this.portConnections.delete(port);
121          // Don't complete the chunks observable because then it would not
122          // be able to forward any future chunks.
123        }
124      )
125    );
126
127    this.rxSubscriptions.push(
128      this.activePortConnectionConnection.errors.subscribe((error: any) => {
129        this.errors.next(error);
130        if (error instanceof DeviceLostError) {
131          // The device has been lost
132          this.connected.next(false);
133        }
134      })
135    );
136  }
137
138  private async conectNewPort(port: SerialPort): Promise<PortConnection> {
139    await port.open(this.serialOptions);
140    const writer = port.writable.getWriter();
141
142    async function sendChunk(chunk: Uint8Array) {
143      await writer.ready;
144      await writer.write(chunk);
145    }
146
147    const {chunks, errors} = this.getChunks(port);
148
149    const connection: PortConnection = {sendChunk, chunks, errors};
150    this.portConnections.set(port, connection);
151    return connection;
152  }
153
154  private getChunks(port: SerialPort): PortReadConnection {
155    const chunks = new Subject<Uint8Array>();
156    const errors = new Subject<Error>();
157
158    async function read() {
159      if (!port.readable) {
160        throw new DeviceLostError();
161      }
162      if (port.readable.locked) {
163        throw new DeviceLockedError();
164      }
165      await port.readable.pipeTo(
166        new WritableStream({
167          write: chunk => {
168            chunks.next(chunk);
169          },
170          close: () => {
171            chunks.complete();
172            errors.complete();
173          },
174          abort: () => {
175            // Reconnect to the port.
176            connect();
177          },
178        })
179      );
180    }
181
182    function connect() {
183      read().catch(err => {
184        // Don't error the chunks observable since that stops it from
185        // reading any more packets, and we often want to continue
186        // despite an error. Instead, push errors to the 'errors'
187        // observable.
188        errors.next(err);
189      });
190    }
191
192    connect();
193
194    return {chunks, errors};
195  }
196}
197