• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1import os
2import csv
3import tempfile
4from itertools import chain
5
6from devlib.instrument import Instrument, MeasurementsCsv, CONTINUOUS
7from devlib.exception import HostError
8from devlib.utils.misc import unique
9
10try:
11    from daqpower.client import execute_command, Status
12    from daqpower.config import DeviceConfiguration, ServerConfiguration
13except ImportError, e:
14    execute_command, Status = None, None
15    DeviceConfiguration, ServerConfiguration, ConfigurationError = None, None, None
16    import_error_mesg = e.message
17
18
19class DaqInstrument(Instrument):
20
21    mode = CONTINUOUS
22
23    def __init__(self, target, resistor_values,  # pylint: disable=R0914
24                 labels=None,
25                 host='localhost',
26                 port=45677,
27                 device_id='Dev1',
28                 v_range=2.5,
29                 dv_range=0.2,
30                 sample_rate_hz=10000,
31                 channel_map=(0, 1, 2, 3, 4, 5, 6, 7, 16, 17, 18, 19, 20, 21, 22, 23),
32                 ):
33        # pylint: disable=no-member
34        super(DaqInstrument, self).__init__(target)
35        self._need_reset = True
36        if execute_command is None:
37            raise HostError('Could not import "daqpower": {}'.format(import_error_mesg))
38        if labels is None:
39            labels = ['PORT_{}'.format(i) for i in xrange(len(resistor_values))]
40        if len(labels) != len(resistor_values):
41            raise ValueError('"labels" and "resistor_values" must be of the same length')
42        self.server_config = ServerConfiguration(host=host,
43                                                 port=port)
44        result = self.execute('list_devices')
45        if result.status == Status.OK:
46            if device_id not in result.data:
47                raise ValueError('Device "{}" is not found on the DAQ server.'.format(device_id))
48        elif result.status != Status.OKISH:
49            raise HostError('Problem querying DAQ server: {}'.format(result.message))
50
51        self.device_config = DeviceConfiguration(device_id=device_id,
52                                                 v_range=v_range,
53                                                 dv_range=dv_range,
54                                                 sampling_rate=sample_rate_hz,
55                                                 resistor_values=resistor_values,
56                                                 channel_map=channel_map,
57                                                 labels=labels)
58        self.sample_rate_hz = sample_rate_hz
59
60        for label in labels:
61            for kind in ['power', 'voltage']:
62                self.add_channel(label, kind)
63
64    def reset(self, sites=None, kinds=None, channels=None):
65        super(DaqInstrument, self).reset(sites, kinds, channels)
66        self.execute('close')
67        result = self.execute('configure', config=self.device_config)
68        if not result.status == Status.OK:  # pylint: disable=no-member
69            raise HostError(result.message)
70        self._need_reset = False
71
72    def start(self):
73        if self._need_reset:
74            self.reset()
75        self.execute('start')
76
77    def stop(self):
78        self.execute('stop')
79        self._need_reset = True
80
81    def get_data(self, outfile):  # pylint: disable=R0914
82        tempdir = tempfile.mkdtemp(prefix='daq-raw-')
83        self.execute('get_data', output_directory=tempdir)
84        raw_file_map = {}
85        for entry in os.listdir(tempdir):
86            site = os.path.splitext(entry)[0]
87            path = os.path.join(tempdir, entry)
88            raw_file_map[site] = path
89
90        active_sites = unique([c.site for c in self.active_channels])
91        file_handles = []
92        try:
93            site_readers = {}
94            for site in active_sites:
95                try:
96                    site_file = raw_file_map[site]
97                    fh = open(site_file, 'rb')
98                    site_readers[site] = csv.reader(fh)
99                    file_handles.append(fh)
100                except KeyError:
101                    message = 'Could not get DAQ trace for {}; Obtained traces are in {}'
102                    raise HostError(message.format(site, tempdir))
103
104            # The first row is the headers
105            channel_order = []
106            for site, reader in site_readers.iteritems():
107                channel_order.extend(['{}_{}'.format(site, kind)
108                                      for kind in reader.next()])
109
110            def _read_next_rows():
111                parts = []
112                for reader in site_readers.itervalues():
113                    try:
114                        parts.extend(reader.next())
115                    except StopIteration:
116                        parts.extend([None, None])
117                return list(chain(parts))
118
119            with open(outfile, 'wb') as wfh:
120                field_names = [c.label for c in self.active_channels]
121                writer = csv.writer(wfh)
122                writer.writerow(field_names)
123                raw_row = _read_next_rows()
124                while any(raw_row):
125                    row = [raw_row[channel_order.index(f)] for f in field_names]
126                    writer.writerow(row)
127                    raw_row = _read_next_rows()
128
129            return MeasurementsCsv(outfile, self.active_channels)
130        finally:
131            for fh in file_handles:
132                fh.close()
133
134    def teardown(self):
135        self.execute('close')
136
137    def execute(self, command, **kwargs):
138        return execute_command(self.server_config, command, **kwargs)
139
140