1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import logging 20import serial_asyncio 21 22from .common import Transport, StreamPacketSource, StreamPacketSink 23 24# ----------------------------------------------------------------------------- 25# Logging 26# ----------------------------------------------------------------------------- 27logger = logging.getLogger(__name__) 28 29 30# ----------------------------------------------------------------------------- 31async def open_serial_transport(spec): 32 ''' 33 Open a serial port transport. 34 The parameter string has this syntax: 35 <device-path>[,<speed>][,rtscts][,dsrdtr] 36 When <speed> is omitted, the default value of 1000000 is used 37 When "rtscts" is specified, RTS/CTS hardware flow control is enabled 38 When "dsrdtr" is specified, DSR/DTR hardware flow control is enabled 39 40 Examples: 41 /dev/tty.usbmodem0006839912172 42 /dev/tty.usbmodem0006839912172,1000000 43 /dev/tty.usbmodem0006839912172,rtscts 44 ''' 45 46 speed = 1000000 47 rtscts = False 48 dsrdtr = False 49 if ',' in spec: 50 parts = spec.split(',') 51 device = parts[0] 52 for part in parts[1:]: 53 if part == 'rtscts': 54 rtscts = True 55 elif part == 'dsrdtr': 56 dsrdtr = True 57 elif part.isnumeric(): 58 speed = int(part) 59 else: 60 device = spec 61 serial_transport, packet_source = await serial_asyncio.create_serial_connection( 62 asyncio.get_running_loop(), 63 lambda: StreamPacketSource(), 64 device, 65 baudrate=speed, 66 rtscts=rtscts, 67 dsrdtr=dsrdtr 68 ) 69 packet_sink = StreamPacketSink(serial_transport) 70 71 return Transport(packet_source, packet_sink) 72 73