• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from contextlib import asynccontextmanager
19import logging
20import os
21from typing import Optional
22
23from .common import Transport, AsyncPipeSink, SnoopingTransport, TransportSpecError
24from ..snoop import create_snooper
25
26# -----------------------------------------------------------------------------
27# Logging
28# -----------------------------------------------------------------------------
29logger = logging.getLogger(__name__)
30
31
32# -----------------------------------------------------------------------------
33def _wrap_transport(transport: Transport) -> Transport:
34    """
35    Automatically wrap a Transport instance when a wrapping class can be inferred
36    from the environment.
37    If no wrapping class is applicable, the transport argument is returned as-is.
38    """
39
40    # If BUMBLE_SNOOPER is set, try to automatically create a snooper.
41    if snooper_spec := os.getenv('BUMBLE_SNOOPER'):
42        try:
43            return SnoopingTransport.create_with(
44                transport, create_snooper(snooper_spec)
45            )
46        except Exception as exc:
47            logger.warning(f'Exception while creating snooper: {exc}')
48
49    return transport
50
51
52# -----------------------------------------------------------------------------
53async def open_transport(name: str) -> Transport:
54    """
55    Open a transport by name.
56    The name must be <type>:<metadata><parameters>
57    Where <parameters> depend on the type (and may be empty for some types), and
58    <metadata> is either omitted, or a ,-separated list of <key>=<value> pairs,
59    enclosed in [].
60    If there are not metadata or parameter, the : after the <type> may be omitted.
61    Examples:
62      * usb:0
63      * usb:[driver=rtk]0
64      * android-netsim
65
66    The supported types are:
67      * serial
68      * udp
69      * tcp-client
70      * tcp-server
71      * ws-client
72      * ws-server
73      * pty
74      * file
75      * vhci
76      * hci-socket
77      * usb
78      * pyusb
79      * android-emulator
80      * android-netsim
81    """
82
83    scheme, *tail = name.split(':', 1)
84    spec = tail[0] if tail else None
85    metadata = None
86    if spec:
87        # Metadata may precede the spec
88        if spec.startswith('['):
89            metadata_str, *tail = spec[1:].split(']')
90            spec = tail[0] if tail else None
91            metadata = dict([entry.split('=') for entry in metadata_str.split(',')])
92
93    transport = await _open_transport(scheme, spec)
94    if metadata:
95        transport.source.metadata = {  # type: ignore[attr-defined]
96            **metadata,
97            **getattr(transport.source, 'metadata', {}),
98        }
99        # pylint: disable=line-too-long
100        logger.debug(f'HCI metadata: {transport.source.metadata}')  # type: ignore[attr-defined]
101
102    return _wrap_transport(transport)
103
104
105# -----------------------------------------------------------------------------
106async def _open_transport(scheme: str, spec: Optional[str]) -> Transport:
107    # pylint: disable=import-outside-toplevel
108    # pylint: disable=too-many-return-statements
109
110    if scheme == 'serial' and spec:
111        from .serial import open_serial_transport
112
113        return await open_serial_transport(spec)
114
115    if scheme == 'udp' and spec:
116        from .udp import open_udp_transport
117
118        return await open_udp_transport(spec)
119
120    if scheme == 'tcp-client' and spec:
121        from .tcp_client import open_tcp_client_transport
122
123        return await open_tcp_client_transport(spec)
124
125    if scheme == 'tcp-server' and spec:
126        from .tcp_server import open_tcp_server_transport
127
128        return await open_tcp_server_transport(spec)
129
130    if scheme == 'ws-client' and spec:
131        from .ws_client import open_ws_client_transport
132
133        return await open_ws_client_transport(spec)
134
135    if scheme == 'ws-server' and spec:
136        from .ws_server import open_ws_server_transport
137
138        return await open_ws_server_transport(spec)
139
140    if scheme == 'pty':
141        from .pty import open_pty_transport
142
143        return await open_pty_transport(spec)
144
145    if scheme == 'file':
146        from .file import open_file_transport
147
148        assert spec is not None
149        return await open_file_transport(spec)
150
151    if scheme == 'vhci':
152        from .vhci import open_vhci_transport
153
154        return await open_vhci_transport(spec)
155
156    if scheme == 'hci-socket':
157        from .hci_socket import open_hci_socket_transport
158
159        return await open_hci_socket_transport(spec)
160
161    if scheme == 'usb':
162        from .usb import open_usb_transport
163
164        assert spec
165        return await open_usb_transport(spec)
166
167    if scheme == 'pyusb':
168        from .pyusb import open_pyusb_transport
169
170        assert spec
171        return await open_pyusb_transport(spec)
172
173    if scheme == 'android-emulator':
174        from .android_emulator import open_android_emulator_transport
175
176        return await open_android_emulator_transport(spec)
177
178    if scheme == 'android-netsim':
179        from .android_netsim import open_android_netsim_transport
180
181        return await open_android_netsim_transport(spec)
182
183    if scheme == 'unix':
184        from .unix import open_unix_client_transport
185
186        assert spec
187        return await open_unix_client_transport(spec)
188
189    raise TransportSpecError('unknown transport scheme')
190
191
192# -----------------------------------------------------------------------------
193async def open_transport_or_link(name: str) -> Transport:
194    """
195    Open a transport or a link relay.
196
197    Args:
198      name:
199        Name of the transport or link relay to open.
200        When the name starts with "link-relay:", open a link relay (see RemoteLink
201        for details on what the arguments are).
202        For other namespaces, see `open_transport`.
203
204    """
205    if name.startswith('link-relay:'):
206        logger.warning('Link Relay has been deprecated.')
207        from ..controller import Controller
208        from ..link import RemoteLink  # lazy import
209
210        link = RemoteLink(name[11:])
211        await link.wait_until_connected()
212        controller = Controller('remote', link=link)  # type:ignore[arg-type]
213
214        class LinkTransport(Transport):
215            async def close(self):
216                link.close()
217
218        return _wrap_transport(LinkTransport(controller, AsyncPipeSink(controller)))
219
220    return await open_transport(name)
221