1import os 2import csv 3import tempfile 4from itertools import chain 5 6from devlib.instrument import Instrument, MeasurementsCsv, CONTINUOUS 7from devlib.exception import HostError 8from devlib.utils.misc import unique 9 10try: 11 from daqpower.client import execute_command, Status 12 from daqpower.config import DeviceConfiguration, ServerConfiguration 13except ImportError, e: 14 execute_command, Status = None, None 15 DeviceConfiguration, ServerConfiguration, ConfigurationError = None, None, None 16 import_error_mesg = e.message 17 18 19class DaqInstrument(Instrument): 20 21 mode = CONTINUOUS 22 23 def __init__(self, target, resistor_values, # pylint: disable=R0914 24 labels=None, 25 host='localhost', 26 port=45677, 27 device_id='Dev1', 28 v_range=2.5, 29 dv_range=0.2, 30 sample_rate_hz=10000, 31 channel_map=(0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23), 32 ): 33 # pylint: disable=no-member 34 super(DaqInstrument, self).__init__(target) 35 self._need_reset = True 36 if execute_command is None: 37 raise HostError('Could not import "daqpower": {}'.format(import_error_mesg)) 38 if labels is None: 39 labels = ['PORT_{}'.format(i) for i in xrange(len(resistor_values))] 40 if len(labels) != len(resistor_values): 41 raise ValueError('"labels" and "resistor_values" must be of the same length') 42 self.server_config = ServerConfiguration(host=host, 43 port=port) 44 result = self.execute('list_devices') 45 if result.status == Status.OK: 46 if device_id not in result.data: 47 raise ValueError('Device "{}" is not found on the DAQ server.'.format(device_id)) 48 elif result.status != Status.OKISH: 49 raise HostError('Problem querying DAQ server: {}'.format(result.message)) 50 51 self.device_config = DeviceConfiguration(device_id=device_id, 52 v_range=v_range, 53 dv_range=dv_range, 54 sampling_rate=sample_rate_hz, 55 resistor_values=resistor_values, 56 channel_map=channel_map, 57 labels=labels) 58 self.sample_rate_hz = sample_rate_hz 59 60 for label in labels: 61 for kind in ['power', 'voltage']: 62 self.add_channel(label, kind) 63 64 def reset(self, sites=None, kinds=None, channels=None): 65 super(DaqInstrument, self).reset(sites, kinds, channels) 66 self.execute('close') 67 result = self.execute('configure', config=self.device_config) 68 if not result.status == Status.OK: # pylint: disable=no-member 69 raise HostError(result.message) 70 self._need_reset = False 71 72 def start(self): 73 if self._need_reset: 74 self.reset() 75 self.execute('start') 76 77 def stop(self): 78 self.execute('stop') 79 self._need_reset = True 80 81 def get_data(self, outfile): # pylint: disable=R0914 82 tempdir = tempfile.mkdtemp(prefix='daq-raw-') 83 self.execute('get_data', output_directory=tempdir) 84 raw_file_map = {} 85 for entry in os.listdir(tempdir): 86 site = os.path.splitext(entry)[0] 87 path = os.path.join(tempdir, entry) 88 raw_file_map[site] = path 89 90 active_sites = unique([c.site for c in self.active_channels]) 91 file_handles = [] 92 try: 93 site_readers = {} 94 for site in active_sites: 95 try: 96 site_file = raw_file_map[site] 97 fh = open(site_file, 'rb') 98 site_readers[site] = csv.reader(fh) 99 file_handles.append(fh) 100 except KeyError: 101 message = 'Could not get DAQ trace for {}; Obtained traces are in {}' 102 raise HostError(message.format(site, tempdir)) 103 104 # The first row is the headers 105 channel_order = [] 106 for site, reader in site_readers.iteritems(): 107 channel_order.extend(['{}_{}'.format(site, kind) 108 for kind in reader.next()]) 109 110 def _read_next_rows(): 111 parts = [] 112 for reader in site_readers.itervalues(): 113 try: 114 parts.extend(reader.next()) 115 except StopIteration: 116 parts.extend([None, None]) 117 return list(chain(parts)) 118 119 with open(outfile, 'wb') as wfh: 120 field_names = [c.label for c in self.active_channels] 121 writer = csv.writer(wfh) 122 writer.writerow(field_names) 123 raw_row = _read_next_rows() 124 while any(raw_row): 125 row = [raw_row[channel_order.index(f)] for f in field_names] 126 writer.writerow(row) 127 raw_row = _read_next_rows() 128 129 return MeasurementsCsv(outfile, self.active_channels) 130 finally: 131 for fh in file_handles: 132 fh.close() 133 134 def teardown(self): 135 self.execute('close') 136 137 def execute(self, command, **kwargs): 138 return execute_command(self.server_config, command, **kwargs) 139 140