1#! /usr/bin/env python 2# encoding: utf-8 3""" 4Example of a AT command protocol. 5 6https://en.wikipedia.org/wiki/Hayes_command_set 7http://www.itu.int/rec/T-REC-V.250-200307-I/en 8""" 9from __future__ import print_function 10 11import sys 12sys.path.insert(0, '..') 13 14import logging 15import serial 16import serial.threaded 17import threading 18 19try: 20 import queue 21except ImportError: 22 import Queue as queue 23 24 25class ATException(Exception): 26 pass 27 28 29class ATProtocol(serial.threaded.LineReader): 30 31 TERMINATOR = b'\r\n' 32 33 def __init__(self): 34 super(ATProtocol, self).__init__() 35 self.alive = True 36 self.responses = queue.Queue() 37 self.events = queue.Queue() 38 self._event_thread = threading.Thread(target=self._run_event) 39 self._event_thread.daemon = True 40 self._event_thread.name = 'at-event' 41 self._event_thread.start() 42 self.lock = threading.Lock() 43 44 def stop(self): 45 """ 46 Stop the event processing thread, abort pending commands, if any. 47 """ 48 self.alive = False 49 self.events.put(None) 50 self.responses.put('<exit>') 51 52 def _run_event(self): 53 """ 54 Process events in a separate thread so that input thread is not 55 blocked. 56 """ 57 while self.alive: 58 try: 59 self.handle_event(self.events.get()) 60 except: 61 logging.exception('_run_event') 62 63 def handle_line(self, line): 64 """ 65 Handle input from serial port, check for events. 66 """ 67 if line.startswith('+'): 68 self.events.put(line) 69 else: 70 self.responses.put(line) 71 72 def handle_event(self, event): 73 """ 74 Spontaneous message received. 75 """ 76 print('event received:', event) 77 78 def command(self, command, response='OK', timeout=5): 79 """ 80 Set an AT command and wait for the response. 81 """ 82 with self.lock: # ensure that just one thread is sending commands at once 83 self.write_line(command) 84 lines = [] 85 while True: 86 try: 87 line = self.responses.get(timeout=timeout) 88 #~ print("%s -> %r" % (command, line)) 89 if line == response: 90 return lines 91 else: 92 lines.append(line) 93 except queue.Empty: 94 raise ATException('AT command timeout ({!r})'.format(command)) 95 96 97# test 98if __name__ == '__main__': 99 import time 100 101 class PAN1322(ATProtocol): 102 """ 103 Example communication with PAN1322 BT module. 104 105 Some commands do not respond with OK but with a '+...' line. This is 106 implemented via command_with_event_response and handle_event, because 107 '+...' lines are also used for real events. 108 """ 109 110 def __init__(self): 111 super(PAN1322, self).__init__() 112 self.event_responses = queue.Queue() 113 self._awaiting_response_for = None 114 115 def connection_made(self, transport): 116 super(PAN1322, self).connection_made(transport) 117 # our adapter enables the module with RTS=low 118 self.transport.serial.rts = False 119 time.sleep(0.3) 120 self.transport.serial.reset_input_buffer() 121 122 def handle_event(self, event): 123 """Handle events and command responses starting with '+...'""" 124 if event.startswith('+RRBDRES') and self._awaiting_response_for.startswith('AT+JRBD'): 125 rev = event[9:9 + 12] 126 mac = ':'.join('{:02X}'.format(ord(x)) for x in rev.decode('hex')[::-1]) 127 self.event_responses.put(mac) 128 else: 129 logging.warning('unhandled event: {!r}'.format(event)) 130 131 def command_with_event_response(self, command): 132 """Send a command that responds with '+...' line""" 133 with self.lock: # ensure that just one thread is sending commands at once 134 self._awaiting_response_for = command 135 self.transport.write(b'{}\r\n'.format(command.encode(self.ENCODING, self.UNICODE_HANDLING))) 136 response = self.event_responses.get() 137 self._awaiting_response_for = None 138 return response 139 140 # - - - example commands 141 142 def reset(self): 143 self.command("AT+JRES", response='ROK') # SW-Reset BT module 144 145 def get_mac_address(self): 146 # requests hardware / calibration info as event 147 return self.command_with_event_response("AT+JRBD") 148 149 ser = serial.serial_for_url('spy://COM1', baudrate=115200, timeout=1) 150 #~ ser = serial.Serial('COM1', baudrate=115200, timeout=1) 151 with serial.threaded.ReaderThread(ser, PAN1322) as bt_module: 152 bt_module.reset() 153 print("reset OK") 154 print("MAC address is", bt_module.get_mac_address()) 155