• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import logging
19import asyncio
20import collections
21
22from .colors import color
23
24
25# -----------------------------------------------------------------------------
26# Logging
27# -----------------------------------------------------------------------------
28logger = logging.getLogger(__name__)
29
30
31# -----------------------------------------------------------------------------
32# Protocol Support
33# -----------------------------------------------------------------------------
34
35# -----------------------------------------------------------------------------
36class HfpProtocol:
37    def __init__(self, dlc):
38        self.dlc = dlc
39        self.buffer = ''
40        self.lines = collections.deque()
41        self.lines_available = asyncio.Event()
42
43        dlc.sink = self.feed
44
45    def feed(self, data):
46        # Convert the data to a string if needed
47        if isinstance(data, bytes):
48            data = data.decode('utf-8')
49
50        logger.debug(f'<<< Data received: {data}')
51
52        # Add to the buffer and look for lines
53        self.buffer += data
54        while (separator := self.buffer.find('\r')) >= 0:
55            line = self.buffer[:separator].strip()
56            self.buffer = self.buffer[separator + 1 :]
57            if len(line) > 0:
58                self.on_line(line)
59
60    def on_line(self, line):
61        self.lines.append(line)
62        self.lines_available.set()
63
64    def send_command_line(self, line):
65        logger.debug(color(f'>>> {line}', 'yellow'))
66        self.dlc.write(line + '\r')
67
68    def send_response_line(self, line):
69        logger.debug(color(f'>>> {line}', 'yellow'))
70        self.dlc.write('\r\n' + line + '\r\n')
71
72    async def next_line(self):
73        await self.lines_available.wait()
74        line = self.lines.popleft()
75        if not self.lines:
76            self.lines_available.clear()
77        logger.debug(color(f'<<< {line}', 'green'))
78        return line
79
80    async def initialize_service(self):
81        # Perform Service Level Connection Initialization
82        self.send_command_line('AT+BRSF=2072')  # Retrieve Supported Features
83        await (self.next_line())
84        await (self.next_line())
85
86        self.send_command_line('AT+CIND=?')
87        await (self.next_line())
88        await (self.next_line())
89
90        self.send_command_line('AT+CIND?')
91        await (self.next_line())
92        await (self.next_line())
93
94        self.send_command_line('AT+CMER=3,0,0,1')
95        await (self.next_line())
96