• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/python
2# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5
6import cellular_logging
7import dbus, os, subprocess, time
8
9from autotest_lib.client.common_lib import error
10from autotest_lib.client.cros import flimflam_test_path
11from autotest_lib.client.cros.cellular import modem
12
13log = cellular_logging.SetupCellularLogging('mm_test')
14
15
16class ModemManagerTest(object):
17    """Wrapper for starting up ModemManager in an artificial testing
18    environment, connected to a fake modem program and talking to a
19    fake (tun) network device.
20
21    The test using this must ensure the setup of the fakegudev and
22    fakemodem deps.
23    """
24
25    def __init__(self, autodir, modem_pattern_files):
26        self.autodir=autodir # not great. Examine deps directly?
27        self.modem_pattern_files = modem_pattern_files
28        self.modemmanager = None
29        self.fakemodem_process = None
30        self.fakenet_process = None
31
32    def _start_fake_network(self):
33        """Start the fakenetwork program and return the fake interface name
34
35        Start up the fakenet program, which uses the tun driver to create
36        a network device.
37
38        Returns the name of the fake network interface.
39        Sets self.fakenet_process as a handle to the process.
40        """
41        self.fakenet_process = subprocess.Popen(
42            os.path.join(self.autodir,'deps/fakemodem/bin','fakenet'),
43            stdout=subprocess.PIPE)
44        return self.fakenet_process.stdout.readline().rstrip()
45
46
47    def _start_fake_modem(self, patternfiles):
48        """Start the fakemodem program and return the pty path to access it
49
50        Start up the fakemodem program
51        Argument:
52        patternfiles -- List of files to read for command/response patterns
53
54        Returns the device path of the pty that serves the fake modem, e.g.
55        /dev/pts/4.
56        Sets self.fakemodem_process as a handle to the process, and
57        self.fakemodem as a DBus interface to it.
58        """
59        scriptargs = ["--patternfile=" + x for x in patternfiles]
60        name = os.path.join(self.autodir, 'deps/fakemodem/bin', 'fakemodem')
61        self.fakemodem_process = subprocess.Popen(
62            [os.path.join(self.autodir, 'deps/fakemodem/bin', 'fakemodem')]
63            + scriptargs,
64            stdout=subprocess.PIPE)
65        ptyname = self.fakemodem_process.stdout.readline().rstrip()
66        time.sleep(2) # XXX
67        self.fakemodem = dbus.Interface(
68            dbus.SystemBus().get_object('org.chromium.FakeModem', '/'),
69            'org.chromium.FakeModem')
70        return ptyname
71
72
73    def _start_modemmanager(self, netname, modemname):
74        """Start modemmanager under the control of fake devices.
75
76        Arguments:
77        netname -- fake network interface name (e.g. tun0)
78        modemname -- path to pty slave device of fake modem (e.g. /dev/pts/4)
79
80        Returns...
81
82        """
83        id_props = ['property_ID_MM_CANDIDATE=1',
84                    'property_ID_VENDOR_ID=04e8', # Samsung USB VID
85                    'property_ID_MODEL_ID=6872' # Y3300 modem PID
86                    ]
87        tty_device = (['device_file=%s' % (modemname),
88                       'name=%s' % (modemname[5:]), # remove leading /dev/
89                       'subsystem=tty',
90                       'driver=fake',
91                       'sysfs_path=/sys/devices/fake/tty',
92                       'parent=/dev/fake-parent'] +
93                      id_props)
94        net_device = (['device_file=/dev/fakenet',
95                       'name=%s' % (netname),
96                       'subsystem=net',
97                       'driver=fake',
98                       'sysfs_path=/sys/devices/fake/net',
99                       'parent=/dev/fake-parent'] +
100                      id_props)
101        parent_device=['device_file=/dev/fake-parent',
102                       'sysfs_path=/sys/devices/fake/parent',
103                       'devtype=usb_device',
104                       'subsystem=usb']
105        environment = { 'FAKEGUDEV_DEVICES' : ':'.join(tty_device +
106                                                       net_device +
107                                                       parent_device),
108                        'FAKEGUDEV_BLOCK_REAL' : 'true',
109                        'G_DEBUG' : 'fatal_criticals',
110                        'LD_PRELOAD' : os.path.join(self.autodir,
111                                                    "deps/fakegudev/lib",
112                                                    "libfakegudev.so") }
113        self.modemmanager = subprocess.Popen(['/usr/sbin/modem-manager',
114                                              '--debug',
115                                              '--log-level=DEBUG',
116                                              '--log-file=/tmp/mm-log'],
117                                             env=environment)
118        time.sleep(3) # wait for DeviceAdded signal?
119        self.modemmanager.poll()
120        if self.modemmanager.returncode is not None:
121            self.modemmanager = None
122            raise error.TestFail("ModemManager quit early")
123
124        # wait for MM to stabilize?
125        return modem.ModemManager(provider='org.freedesktop')
126
127    def _stop_fake_network(self):
128        if self.fakenet_process:
129            self.fakenet_process.poll()
130            if self.fakenet_process.returncode is None:
131                self.fakenet_process.terminate()
132                self.fakenet_process.wait()
133
134    def _stop_fake_modem(self):
135        if self.fakemodem_process:
136            self.fakemodem_process.poll()
137            if self.fakemodem_process.returncode is None:
138                self.fakemodem_process.terminate()
139                self.fakemodem_process.wait()
140
141    def _stop_modemmanager(self):
142        if self.modemmanager:
143            self.modemmanager.poll()
144            if self.modemmanager.returncode is None:
145                self.modemmanager.terminate()
146                self.modemmanager.wait()
147
148
149    def __enter__(self):
150        fakenetname = self._start_fake_network()
151        fakemodemname = self._start_fake_modem(self.modem_pattern_files)
152        self.mm = self._start_modemmanager(fakenetname, fakemodemname)
153        # This would be better handled by listening for DeviceAdded, but
154        # since we've blocked everything else and only supplied data for
155        # one modem, it's going to be right
156        self.modem_object_path = self.mm.path + '/Modems/0'
157        return self
158
159    def __exit__(self, exception, value, traceback):
160        self._stop_modemmanager()
161        self._stop_fake_modem()
162        self._stop_fake_network()
163        return False
164