#!/usr/bin/env python2 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """An implementation of the ModemManager1 DBUS interface. This modem mimics a GSM (eventually LTE & CDMA) modem and allows a user to test shill and UI behaviors when a supported SIM is inserted into the device. Invoked with the proper flags it can test that SMS messages are deliver to the UI. This program creates a virtual network interface to simulate the network interface of a modem. It depends on modemmanager-next to set the dbus permissions properly. TODO: * Use more appropriate values for many of the properties * Support all ModemManager1 interfaces * implement LTE modems * implement CDMA modems """ from optparse import OptionParser import logging import os import signal import string import subprocess import sys import time import dbus from dbus.exceptions import DBusException import dbus.mainloop.glib import dbus.service from dbus.types import Int32 from dbus.types import ObjectPath from dbus.types import Struct from dbus.types import UInt32 import glib import gobject import mm1 # Miscellaneous delays to simulate a modem DEFAULT_CONNECT_DELAY_MS = 1500 DEFAULT_CARRIER = 'att' class DBusObjectWithProperties(dbus.service.Object): """Implements the org.freedesktop.DBus.Properties interface. Implements the org.freedesktop.DBus.Properties interface, specifically the Get and GetAll methods. Class which inherit from this class must implement the InterfacesAndProperties function which will return a dictionary of all interfaces and the properties defined on those interfaces. """ def __init__(self, bus, path): dbus.service.Object.__init__(self, bus, path) @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='ss', out_signature='v') def Get(self, interface, property_name, *args, **kwargs): """Returns: The value of property_name on interface.""" logging.info('%s: Get %s, %s', self.path, interface, property_name) interfaces = self.InterfacesAndProperties() properties = interfaces.get(interface, None) if property_name in properties: return properties[property_name] raise dbus.exceptions.DBusException( mm1.MODEM_MANAGER_INTERFACE + '.UnknownProperty', 'Property %s not defined for interface %s' % (property_name, interface)) @dbus.service.method(dbus.PROPERTIES_IFACE, in_signature='s', out_signature='a{sv}') def GetAll(self, interface, *args, **kwargs): """Returns: A dictionary. The properties on interface.""" logging.info('%s: GetAll %s', self.path, interface) interfaces = self.InterfacesAndProperties() properties = interfaces.get(interface, None) if properties is not None: return properties raise dbus.exceptions.DBusException( mm1.MODEM_MANAGER_INTERFACE + '.UnknownInterface', 'Object does not implement the %s interface' % interface) def InterfacesAndProperties(self): """Subclasses must implement this function. Returns: A dictionary of interfaces where the values are dictionaries of dbus properties. """ pass class SIM(DBusObjectWithProperties): """SIM Object. Mock SIM Card and the typical information it might contain. SIM cards of different carriers can be created by providing the MCC, MNC, operator name, imsi, and msin. SIM objects are passed to the Modem during Modem initialization. """ DEFAULT_MCC = '310' DEFAULT_MNC = '090' DEFAULT_OPERATOR = 'AT&T' DEFAULT_MSIN = '1234567890' DEFAULT_IMSI = '888999111' MCC_LIST = { 'us': '310', 'de': '262', 'es': '214', 'fr': '208', 'gb': '234', 'it': '222', 'nl': '204', } CARRIERS = { 'att': ('us', '090', 'AT&T'), 'tmobile': ('us', '026', 'T-Mobile'), 'simyo': ('de', '03', 'simyo'), 'movistar': ('es', '07', 'Movistar'), 'sfr': ('fr', '10', 'SFR'), 'three': ('gb', '20', '3'), 'threeita': ('it', '99', '3ITA'), 'kpn': ('nl', '08', 'KPN') } def __init__(self, manager, mcc_country='us', mnc=DEFAULT_MNC, operator_name=DEFAULT_OPERATOR, msin=DEFAULT_MSIN, imsi=None, mcc=None, name='/Sim/0'): self.manager = manager self.name = name self.path = manager.path + name self.mcc = mcc or SIM.MCC_LIST.get(mcc_country, '000') self.mnc = mnc self.operator_name = operator_name self.msin = msin self.imsi = imsi or (self.mcc + self.mnc + SIM.DEFAULT_IMSI) DBusObjectWithProperties.__init__(self, manager.bus, self.path) @staticmethod def FromCarrier(carrier, manager): """Creates a SIM card object for a given carrier.""" args = SIM.CARRIERS.get(carrier, []) return SIM(manager, *args) def Properties(self): return { 'SimIdentifier': self.msin, 'Imsi': self.imsi, 'OperatorIdentifier': self.mcc + self.mnc, 'OperatorName': self.operator_name } def InterfacesAndProperties(self): return {mm1.SIM_INTERFACE: self.Properties()} class SMS(DBusObjectWithProperties): """SMS Object. Mock SMS message. """ def __init__(self, manager, name='/SMS/0', text='test', number='123', timestamp='12:00', smsc=''): self.manager = manager self.name = name self.path = manager.path + name self.text = text or 'test sms at %s' % name self.number = number self.timestamp = timestamp self.smsc = smsc DBusObjectWithProperties.__init__(self, manager.bus, self.path) def Properties(self): # TODO(jglasgow): State, Validity, Class, Storage are also defined return { 'Text': self.text, 'Number': self.number, 'Timestamp': self.timestamp, 'SMSC': self.smsc } def InterfacesAndProperties(self): return {mm1.SMS_INTERFACE: self.Properties()} class PseudoNetworkInterface(object): """A Pseudo network interface. This uses a pair of network interfaces and dnsmasq to simulate the network device normally associated with a modem. """ # Any interface that shill manages will get its own routing # table. Routes added to the main routing table with RTPROT_BOOT (the # default proto value) will be sent to the corresponding interface's # routing table. We want to prevent that in this case, so we use # proto 5, as shill currently ignores proto values greater than 4. ROUTE_PROTO = 'proto 5' def __init__(self, interface, base): self.interface = interface self.peer = self.interface + 'p' self.base = base self.lease_file = '/tmp/dnsmasq.%s.leases' % self.interface self.dnsmasq = None def __enter__(self): """Make usable with "with" statement.""" self.CreateInterface() return self def __exit__(self, exception, value, traceback): """Make usable with "with" statement.""" self.DestroyInterface() return False def CreateInterface(self): """Creates a virtual interface. Creates the virtual interface self.interface as well as a peer interface. Runs dnsmasq on the peer interface so that a DHCP service can offer ip addresses to the virtual interface. """ os.system('ip link add name %s type veth peer name %s' % ( self.interface, self.peer)) os.system('ifconfig %s %s.1/24' % (self.peer, self.base)) os.system('ifconfig %s up' % self.peer) os.system('ifconfig %s up' % self.interface) os.system('ip route add 255.255.255.255 dev %s %s' % (self.peer, self.ROUTE_PROTO)) os.close(os.open(self.lease_file, os.O_CREAT | os.O_TRUNC)) self.dnsmasq = subprocess.Popen( ['/usr/local/sbin/dnsmasq', '--pid-file', '-k', '--dhcp-leasefile=%s' % self.lease_file, '--dhcp-range=%s.2,%s.254' % (self.base, self.base), '--port=0', '--interface=%s' % self.peer, '--bind-interfaces' ]) # iptables default policy is to reject packets. Add ACCEPT as the # target for the virtual and peer interfaces. Note that this currently # only accepts v4 traffic. os.system('iptables -I INPUT -i %s -j ACCEPT' % self.peer) os.system('iptables -I INPUT -i %s -j ACCEPT' % self.interface) def DestroyInterface(self): """Destroys the virtual interface. Stops dnsmasq and cleans up all on disk state. """ if self.dnsmasq: self.dnsmasq.terminate() try: os.system('ip route del 255.255.255.255 %s' % self.ROUTE_PROTO) except: pass try: os.system('ip link del %s' % self.interface) except: pass os.system('iptables -D INPUT -i %s -j ACCEPT' % self.peer) os.system('iptables -D INPUT -i %s -j ACCEPT' % self.interface) if os.path.exists(self.lease_file): os.remove(self.lease_file) class Modem(DBusObjectWithProperties): """A Modem object that implements the ModemManager DBUS API.""" def __init__(self, manager, name='/Modem/0', device='pseudomodem0', mdn='0000001234', meid='A100000DCE2CA0', carrier='CrCarrier', esn='EDD1EDD1', sim=None): """Instantiates a Modem with some options. Args: manager: a ModemManager object. name: string, a dbus path name. device: string, the network device to use. mdn: string, the mobile directory number. meid: string, the mobile equipment id (CDMA only?). carrier: string, the name of the carrier. esn: string, the electronic serial number. sim: a SIM object. """ self.state = mm1.MM_MODEM_STATE_DISABLED self.manager = manager self.name = name self.path = manager.path + name self.device = device self.mdn = mdn self.meid = meid self.carrier = carrier self.operator_name = carrier self.operator_code = '123' self.esn = esn self.registration_state = mm1.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE self.sim = sim DBusObjectWithProperties.__init__(self, manager.bus, self.path) self.pseudo_interface = PseudoNetworkInterface(self.device, '192.168.7') self.smses = {} def __enter__(self): """Make usable with "with" statement.""" self.pseudo_interface.__enter__() # Add the device to the manager only after the pseudo # interface has been created. self.manager.Add(self) return self def __exit__(self, exception, value, traceback): """Make usable with "with" statement.""" self.manager.Remove(self) return self.pseudo_interface.__exit__(exception, value, traceback) def DiscardModem(self): """Discard this DBUS Object. Send a message that a modem has disappeared and deregister from DBUS. """ logging.info('DiscardModem') self.remove_from_connection() self.manager.Remove(self) def ModemProperties(self): """Return the properties of the modem object.""" properties = { # 'Sim': type='o' 'ModemCapabilities': UInt32(0), 'CurrentCapabilities': UInt32(0), 'MaxBearers': UInt32(2), 'MaxActiveBearers': UInt32(2), 'Manufacturer': 'Foo Electronics', 'Model': 'Super Foo Modem', 'Revision': '1.0', 'DeviceIdentifier': '123456789', 'Device': self.device, 'Driver': 'fake', 'Plugin': 'Foo Plugin', 'EquipmentIdentifier': self.meid, 'UnlockRequired': UInt32(0), #'UnlockRetries' type='a{uu}' mm1.MM_MODEM_PROPERTY_STATE: Int32(self.state), 'AccessTechnologies': UInt32(self.state), 'SignalQuality': Struct([UInt32(90), True], signature='ub'), 'OwnNumbers': ['6175551212'], 'SupportedModes': UInt32(0), 'AllowedModes': UInt32(0), 'PreferredMode': UInt32(0), 'SupportedBands': [UInt32(0)], 'Bands': [UInt32(0)] } if self.sim: properties['Sim'] = ObjectPath(self.sim.path) return properties def InterfacesAndProperties(self): """Return all supported interfaces and their properties.""" return { mm1.MODEM_INTERFACE: self.ModemProperties(), } def ChangeState(self, new_state, why=mm1.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN): logging.info('Change state from %s to %s', self.state, new_state) self.StateChanged(Int32(self.state), Int32(new_state), UInt32(why)) self.PropertiesChanged(mm1.MODEM_INTERFACE, {mm1.MM_MODEM_PROPERTY_STATE: Int32(new_state)}, []) self.state = new_state @dbus.service.method(mm1.MODEM_INTERFACE, in_signature='b', out_signature='') def Enable(self, on, *args, **kwargs): """Enables the Modem.""" logging.info('Modem: Enable %s', str(on)) if on: if self.state <= mm1.MM_MODEM_STATE_ENABLING: self.ChangeState(mm1.MM_MODEM_STATE_ENABLING) if self.state <= mm1.MM_MODEM_STATE_ENABLED: self.ChangeState(mm1.MM_MODEM_STATE_ENABLED) if self.state <= mm1.MM_MODEM_STATE_SEARCHING: self.ChangeState(mm1.MM_MODEM_STATE_SEARCHING) glib.timeout_add(250, self.OnRegistered) else: if self.state >= mm1.MM_MODEM_STATE_DISABLING: self.ChangeState(mm1.MM_MODEM_STATE_DISABLING) if self.state >= mm1.MM_MODEM_STATE_DISABLED: self.ChangeState(mm1.MM_MODEM_STATE_DISABLED) self.ChangeRegistrationState( mm1.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE) return None def ChangeRegistrationState(self, new_state): """Updates the registration state of the modem. Updates the registration state of the modem and broadcasts a DBUS signal. Args: new_state: the new registation state of the modem. """ if new_state != self.registration_state: self.registration_state = new_state self.PropertiesChanged( mm1.MODEM_MODEM3GPP_INTERFACE, {mm1.MM_MODEM3GPP_PROPERTY_REGISTRATION_STATE: UInt32(new_state)}, []) def OnRegistered(self): """Called when the Modem is Registered.""" if (self.state >= mm1.MM_MODEM_STATE_ENABLED and self.state <= mm1.MM_MODEM_STATE_REGISTERED): logging.info('Modem: Marking Registered') self.ChangeRegistrationState( mm1.MM_MODEM_3GPP_REGISTRATION_STATE_HOME) self.ChangeState(mm1.MM_MODEM_STATE_REGISTERED) @dbus.service.method(mm1.MODEM_SIMPLE_INTERFACE, in_signature='', out_signature='a{sv}') def GetStatus(self, *args, **kwargs): """Gets the general modem status. Returns: A dictionary of properties. """ logging.info('Modem: GetStatus') properties = { 'state': UInt32(self.state), 'signal-quality': UInt32(99), 'bands': self.carrier, 'access-technology': UInt32(0), 'm3gpp-registration-state': UInt32(self.registration_state), 'm3gpp-operator-code': '123', 'm3gpp-operator-name': '123', 'cdma-cdma1x-registration-state': UInt32(99), 'cdma-evdo-registration-state': UInt32(99), 'cdma-sid': '123', 'cdma-nid': '123', } if self.state >= mm1.MM_MODEM_STATE_ENABLED: properties['carrier'] = 'Test Network' return properties @dbus.service.signal(mm1.MODEM_INTERFACE, signature='iiu') def StateChanged(self, old_state, new_state, why): pass @dbus.service.method(mm1.MODEM_SIMPLE_INTERFACE, in_signature='a{sv}', out_signature='o', async_callbacks=('return_cb', 'raise_cb')) def Connect(self, unused_props, return_cb, raise_cb, **kwargs): """Connect the modem to the network. Args: unused_props: connection properties. See ModemManager documentation. return_cb: function to call to return result asynchronously. raise_cb: function to call to raise an error asynchronously. """ def ConnectDone(new, why): logging.info('Modem: ConnectDone %s -> %s because %s', str(self.state), str(new), str(why)) if self.state == mm1.MM_MODEM_STATE_CONNECTING: self.ChangeState(new, why) # TODO(jglasgow): implement a bearer object bearer_path = '/Bearer/0' return_cb(bearer_path) else: raise_cb(mm1.ConnectionUnknownError()) logging.info('Modem: Connect') if self.state != mm1.MM_MODEM_STATE_REGISTERED: logging.info( 'Modem: Connect fails on unregistered modem. State = %s', self.state) raise mm1.NoNetworkError() delay_ms = kwargs.get('connect_delay_ms', DEFAULT_CONNECT_DELAY_MS) time.sleep(delay_ms / 1000.0) self.ChangeState(mm1.MM_MODEM_STATE_CONNECTING) glib.timeout_add(50, lambda: ConnectDone( mm1.MM_MODEM_STATE_CONNECTED, mm1.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED)) @dbus.service.method(mm1.MODEM_SIMPLE_INTERFACE, in_signature='o', async_callbacks=('return_cb', 'raise_cb')) def Disconnect(self, bearer, return_cb, raise_cb, **kwargs): """Disconnect the modem from the network.""" def DisconnectDone(old, new, why): logging.info('Modem: DisconnectDone %s -> %s because %s', str(old), str(new), str(why)) if self.state == mm1.MM_MODEM_STATE_DISCONNECTING: logging.info('Modem: State is DISCONNECTING, changing to %s', str(new)) self.ChangeState(new) return_cb() elif self.state == mm1.MM_MODEM_STATE_DISABLED: logging.info('Modem: State is DISABLED, not changing state') return_cb() else: raise_cb(mm1.ConnectionUnknownError()) logging.info('Modem: Disconnect') self.ChangeState(mm1.MM_MODEM_STATE_DISCONNECTING) glib.timeout_add( 500, lambda: DisconnectDone( self.state, mm1.MM_MODEM_STATE_REGISTERED, mm1.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED)) @dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as') def PropertiesChanged(self, interface, changed_properties, invalidated_properties): pass def AddSMS(self, sms): logging.info('Adding SMS %s to list', sms.path) self.smses[sms.path] = sms self.Added(self.path, True) @dbus.service.method(mm1.MODEM_MESSAGING_INTERFACE, in_signature='', out_signature='ao') def List(self, *args, **kwargs): logging.info('Modem.Messaging: List: %s', ', '.join(self.smses.keys())) return self.smses.keys() @dbus.service.method(mm1.MODEM_MESSAGING_INTERFACE, in_signature='o', out_signature='') def Delete(self, sms_path, *args, **kwargs): logging.info('Modem.Messaging: Delete %s', sms_path) del self.smses[sms_path] @dbus.service.signal(mm1.MODEM_MESSAGING_INTERFACE, signature='ob') def Added(self, sms_path, complete): pass class GSMModem(Modem): """A GSMModem implements the mm1.MODEM_MODEM3GPP_INTERFACE interface.""" def __init__(self, manager, imei='00112342342', **kwargs): self.imei = imei Modem.__init__(self, manager, **kwargs) @dbus.service.method(mm1.MODEM_MODEM3GPP_INTERFACE, in_signature='s', out_signature='') def Register(self, operator_id, *args, **kwargs): """Register the modem on the network.""" pass def Modem3GPPProperties(self): """Return the 3GPP Properties of the modem object.""" return { 'Imei': self.imei, mm1.MM_MODEM3GPP_PROPERTY_REGISTRATION_STATE: UInt32(self.registration_state), 'OperatorCode': self.operator_code, 'OperatorName': self.operator_name, 'EnabledFacilityLocks': UInt32(0) } def InterfacesAndProperties(self): """Return all supported interfaces and their properties.""" return { mm1.MODEM_INTERFACE: self.ModemProperties(), mm1.MODEM_MODEM3GPP_INTERFACE: self.Modem3GPPProperties() } @dbus.service.method(mm1.MODEM_MODEM3GPP_INTERFACE, in_signature='', out_signature='aa{sv}') def Scan(self, *args, **kwargs): """Scan for networks.""" raise mm1.CoreUnsupportedError() class ModemManager(dbus.service.Object): """Implements the org.freedesktop.DBus.ObjectManager interface.""" def __init__(self, bus, path): self.devices = [] self.bus = bus self.path = path dbus.service.Object.__init__(self, bus, path) def Add(self, device): """Adds a modem device to the list of devices that are managed.""" logging.info('ModemManager: add %s', device.name) self.devices.append(device) interfaces = device.InterfacesAndProperties() logging.info('Add: %s', interfaces) self.InterfacesAdded(device.path, interfaces) def Remove(self, device): """Removes a modem device from the list of managed devices.""" logging.info('ModemManager: remove %s', device.name) self.devices.remove(device) interfaces = device.InterfacesAndProperties().keys() self.InterfacesRemoved(device.path, interfaces) @dbus.service.method(mm1.OFDOM, out_signature='a{oa{sa{sv}}}') def GetManagedObjects(self): """Returns the list of managed objects and their properties.""" results = {} for device in self.devices: results[device.path] = device.InterfacesAndProperties() logging.info('GetManagedObjects: %s', ', '.join(results.keys())) return results @dbus.service.signal(mm1.OFDOM, signature='oa{sa{sv}}') def InterfacesAdded(self, object_path, interfaces_and_properties): pass @dbus.service.signal(mm1.OFDOM, signature='oas') def InterfacesRemoved(self, object_path, interfaces): pass def main(): usage = """ Run pseudo_modem to simulate a GSM modem using the modemmanager-next DBUS interfaces. This can be used for the following: - to simpilify the verification process of UI features that use of overseas SIM cards - to test shill on a virtual machine without a physical modem - to test that Chrome property displays SMS messages To use on a test image you use test_that to run network_3GModemControl which will cause pseudo_modem.py to be installed in /usr/local/autotests/cros/cellular. Then stop modemmanager and start the pseudo modem with the commands: stop modemmanager /usr/local/autotest/cros/cellular/pseudo_modem.py When done, use Control-C to stop the process and restart modem manager: start modemmanager Additional help documentation is available by invoking pseudo_modem.py --help. SMS testing can be accomnplished by supplying the -s flag to simulate the receipt of a number of SMS messages. The message text can be specified with the --text option on the command line, or read from a file by using the --file option. If the messages are located in a file, then each line corresponds to a single SMS message. Chrome should display the SMS messages as soon as a user logs in to the Chromebook, or if the user is already logged in, then shortly after the pseudo modem is recognized by shill. """ parser = OptionParser(usage=usage) parser.add_option('-c', '--carrier', dest='carrier_name', metavar='', help=' := %s' % ' | '.join( SIM.CARRIERS.keys())) parser.add_option('-s', '--smscount', dest='sms_count', default=0, metavar='', help=' := integer') parser.add_option('-l', '--logfile', dest='logfile', default='', metavar='', help=' := filename for logging output') parser.add_option('-t', '--text', dest='sms_text', default=None, metavar='', help=' := text for sms messages') parser.add_option('-f', '--file', dest='filename', default=None, metavar='', help=' := file with text for sms messages') (options, args) = parser.parse_args() kwargs = {} if options.logfile: kwargs['filename'] = options.logfile logging.basicConfig(format='%(asctime)-15s %(message)s', level=logging.DEBUG, **kwargs) if not options.carrier_name: options.carrier_name = DEFAULT_CARRIER dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) bus = dbus.SystemBus() name = dbus.service.BusName(mm1.MODEM_MANAGER_INTERFACE, bus) manager = ModemManager(bus, mm1.OMM) sim_card = SIM.FromCarrier(string.lower(options.carrier_name), manager) with GSMModem(manager, sim=sim_card) as modem: if options.filename: f = open(options.filename, 'r') for index, line in enumerate(f.readlines()): line = line.strip() if line: sms = SMS(manager, name='/SMS/%s' % index, text=line) modem.AddSMS(sms) else: for index in xrange(int(options.sms_count)): sms = SMS(manager, name='/SMS/%s' % index, text=options.sms_text) modem.AddSMS(sms) mainloop = gobject.MainLoop() def SignalHandler(signum, frame): logging.info('Signal handler called with signal: %s', signum) mainloop.quit() signal.signal(signal.SIGTERM, SignalHandler) mainloop.run() if __name__ == '__main__': main()