#!/usr/bin/env python # # Copyright (c) 2017, The OpenThread Authors. # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # 1. Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # 2. Redistributions in binary form must reproduce the above copyright # notice, this list of conditions and the following disclaimer in the # documentation and/or other materials provided with the distribution. # 3. Neither the name of the copyright holder nor the # names of its contributors may be used to endorse or promote products # derived from this software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # import sys import abc import logging import serial import time ABC = abc.ABC if sys.version_info >= (3, 4) else abc.ABCMeta('ABC', (), {}) logger = logging.getLogger(__name__) class RfShieldController(ABC): @abc.abstractmethod def shield(self): pass @abc.abstractmethod def unshield(self): pass @abc.abstractmethod def __enter__(self): pass @abc.abstractmethod def __exit__(self, *args, **kwargs): pass class RfSwitchController(RfShieldController): def __init__(self, channel, port): self._channel = channel self._port = port self._conn = None def shield(self): self._display_string('CLOSE {}'.format(self._channel)) self._write('CLOSE (@{})'.format(self._channel)) def unshield(self): self._display_string('OPEN {}'.format(self._channel)) self._write('OPEN (@{})'.format(self._channel)) def _write(self, data): return self._conn.write('{}\r\n'.format(data)) def _display_string(self, string): self._write('DIAGNOSTIC:DISPLAY "{}"'.format(string)) def __enter__(self): self._conn = serial.Serial(self._port, 9600) if not self._conn.isOpen(): self._conn.open() self._write('') time.sleep(1) return self def __exit__(self, *args, **kwargs): if self._conn: self._conn.close() self._conn = None CONTROLLERS = {'RF_SWITCH': RfSwitchController} def get_rf_shield_controller(shield_type, params): if shield_type in CONTROLLERS: return CONTROLLERS[shield_type](**params) logger.exception('Unknown RF shield controller type: {}'.format(shield_type))