# Copyright 2015 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import dbus import time from autotest_lib.client.bin import test from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib.cros.tendo import peerd_config from autotest_lib.client.cros import dbus_util SERVICE_ID = 'test-service' class peerd_MonitorsDBusConnections(test.test): """Test that peerd removes services when processes disconnect from DBus.""" version = 1 def _check_has_test_service(self, expect_service=True): services = dbus_util.get_objects_with_interface( peerd_config.SERVICE_NAME, peerd_config.OBJECT_MANAGER_PATH, peerd_config.DBUS_INTERFACE_SERVICE, path_prefix=peerd_config.DBUS_PATH_SELF, bus=self._bus) found_service = False # services is a map of object path to dicts of DBus interface to # properties exposed by that interface. for path, interfaces in services.iteritems(): for interface, properties in interfaces.iteritems(): if interface != peerd_config.DBUS_INTERFACE_SERVICE: continue if (properties[peerd_config.SERVICE_PROPERTY_SERVICE_ID] != SERVICE_ID): continue if found_service: raise error.TestFail('Found multiple test service ' 'instances?') found_service = True if expect_service != found_service: raise error.TestFail('Expected to see test service, but did not.') def run_once(self): self._bus = dbus.SystemBus() config = peerd_config.PeerdConfig(verbosity_level=5) config.restart_with_config() self._check_has_test_service(expect_service=False) self._manager = dbus.Interface( self._bus.get_object(peerd_config.SERVICE_NAME, peerd_config.DBUS_PATH_MANAGER), peerd_config.DBUS_INTERFACE_MANAGER) self._manager.ExposeService(SERVICE_ID, dbus.Dictionary(signature='ss'), dbus.Dictionary(signature='sv')) # Python keeps the DBus connection sitting around unless we # explicitly close it. The service should still be there. time.sleep(1) # Peerd might take some time to publish the service. self._check_has_test_service() # Close our previous connection, open a new one. self._bus.close() self._bus = dbus.SystemBus() time.sleep(1) # Peerd might take some time to remove the service. self._check_has_test_service(expect_service=False)