• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18from blueberry.tests.gd.cert.event_stream import EventStream
19from blueberry.tests.gd.cert.event_stream import IEventStream
20from blueberry.tests.gd.cert.captures import HciCaptures
21from blueberry.tests.gd.cert.closable import Closable
22from blueberry.tests.gd.cert.closable import safeClose
23from blueberry.tests.gd.cert.truth import assertThat
24from blueberry.facade.hci import acl_manager_facade_pb2 as acl_manager_facade
25from blueberry.utils import bluetooth
26import hci_packets as hci
27
28
29class PyAclManagerAclConnection(IEventStream, Closable):
30
31    def __init__(self, acl_manager, remote_addr, handle, event_stream):
32        self.acl_manager = acl_manager
33        self.handle = handle
34        self.remote_addr = remote_addr
35        self.connection_event_stream = event_stream
36        self.acl_stream = EventStream(self.acl_manager.FetchAclData(acl_manager_facade.HandleMsg(handle=self.handle)))
37
38    def disconnect(self, reason):
39        packet_bytes = hci.Disconnect(connection_handle=self.handle, reason=reason).serialize()
40        self.acl_manager.ConnectionCommand(acl_manager_facade.ConnectionCommandMsg(packet=packet_bytes))
41
42    def close(self):
43        safeClose(self.connection_event_stream)
44        safeClose(self.acl_stream)
45
46    def wait_for_disconnection_complete(self):
47        disconnection_complete = HciCaptures.DisconnectionCompleteCapture()
48        assertThat(self.connection_event_stream).emits(disconnection_complete)
49        self.disconnect_reason = disconnection_complete.get().reason
50
51    def send(self, data):
52        self.acl_manager.SendAclData(acl_manager_facade.AclData(handle=self.handle, payload=bytes(data)))
53
54    def get_event_queue(self):
55        return self.acl_stream.get_event_queue()
56
57
58class PyAclManager:
59
60    def __init__(self, device):
61        self.acl_manager = device.hci_acl_manager
62        self.incoming_connection_event_stream = None
63        self.outgoing_connection_event_stream = None
64
65    def close(self):
66        safeClose(self.incoming_connection_event_stream)
67        safeClose(self.outgoing_connection_event_stream)
68
69    def listen_for_an_incoming_connection(self):
70        assertThat(self.incoming_connection_event_stream).isNone()
71        self.incoming_connection_event_stream = EventStream(
72            self.acl_manager.FetchIncomingConnection(empty_proto.Empty()))
73
74    def initiate_connection(self, remote_addr):
75        assertThat(self.outgoing_connection_event_stream).isNone()
76        remote_addr_bytes = remote_addr if isinstance(remote_addr, bytes) else remote_addr.encode('utf-8')
77        self.outgoing_connection_event_stream = EventStream(
78            self.acl_manager.CreateConnection(acl_manager_facade.ConnectionMsg(address=remote_addr_bytes)))
79
80    def complete_connection(self, event_stream):
81        connection_complete = HciCaptures.ConnectionCompleteCapture()
82        assertThat(event_stream).emits(connection_complete)
83        complete = connection_complete.get()
84        handle = complete.connection_handle
85        address = repr(complete.bd_addr)
86        return PyAclManagerAclConnection(self.acl_manager, address, handle, event_stream)
87
88    def complete_incoming_connection(self):
89        assertThat(self.incoming_connection_event_stream).isNotNone()
90        event_stream = self.incoming_connection_event_stream
91        self.incoming_connection_event_stream = None
92        return self.complete_connection(event_stream)
93
94    def complete_outgoing_connection(self):
95        assertThat(self.outgoing_connection_event_stream).isNotNone()
96        event_stream = self.outgoing_connection_event_stream
97        self.outgoing_connection_event_stream = None
98        return self.complete_connection(event_stream)
99