1#!/usr/bin/python 2 3import cPickle 4import os, unittest 5import common 6from autotest_lib.client.bin import local_host 7from autotest_lib.client.common_lib import global_config 8from autotest_lib.client.common_lib import utils 9from autotest_lib.client.common_lib.test_utils import mock 10from autotest_lib.frontend import setup_django_lite_environment 11from autotest_lib.scheduler import drone_manager, drone_utility, drones 12from autotest_lib.scheduler import scheduler_config, drone_manager 13from autotest_lib.scheduler import thread_lib 14from autotest_lib.scheduler import pidfile_monitor 15from autotest_lib.server.hosts import ssh_host 16 17 18class MockDrone(drones._AbstractDrone): 19 def __init__(self, name, active_processes=0, max_processes=10, 20 allowed_users=None): 21 super(MockDrone, self).__init__() 22 self.name = name 23 self.hostname = name 24 self.active_processes = active_processes 25 self.max_processes = max_processes 26 self.allowed_users = allowed_users 27 self._host = 'mock_drone' 28 # maps method names list of tuples containing method arguments 29 self._recorded_calls = {'queue_call': [], 30 'send_file_to': []} 31 32 33 def queue_call(self, method, *args, **kwargs): 34 self._recorded_calls['queue_call'].append((method, args, kwargs)) 35 36 37 def call(self, method, *args, **kwargs): 38 # don't bother differentiating between call() and queue_call() 39 return self.queue_call(method, *args, **kwargs) 40 41 42 def send_file_to(self, drone, source_path, destination_path, 43 can_fail=False): 44 self._recorded_calls['send_file_to'].append( 45 (drone, source_path, destination_path)) 46 47 48 # method for use by tests 49 def _check_for_recorded_call(self, method_name, arguments): 50 recorded_arg_list = self._recorded_calls[method_name] 51 was_called = arguments in recorded_arg_list 52 if not was_called: 53 print 'Recorded args:', recorded_arg_list 54 print 'Expected:', arguments 55 return was_called 56 57 58 def was_call_queued(self, method, *args, **kwargs): 59 return self._check_for_recorded_call('queue_call', 60 (method, args, kwargs)) 61 62 63 def was_file_sent(self, drone, source_path, destination_path): 64 return self._check_for_recorded_call('send_file_to', 65 (drone, source_path, 66 destination_path)) 67 68 69class DroneManager(unittest.TestCase): 70 _DRONE_INSTALL_DIR = '/drone/install/dir' 71 _DRONE_RESULTS_DIR = os.path.join(_DRONE_INSTALL_DIR, 'results') 72 _RESULTS_DIR = '/results/dir' 73 _SOURCE_PATH = 'source/path' 74 _DESTINATION_PATH = 'destination/path' 75 _WORKING_DIRECTORY = 'working/directory' 76 _USERNAME = 'my_user' 77 78 def setUp(self): 79 self.god = mock.mock_god() 80 self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR', 81 self._DRONE_INSTALL_DIR) 82 self.manager = drone_manager.DroneManager() 83 self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR) 84 85 # we don't want this to ever actually get called 86 self.god.stub_function(drones, 'get_drone') 87 # we don't want the DroneManager to go messing with global config 88 def do_nothing(): 89 pass 90 self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing) 91 92 # set up some dummy drones 93 self.mock_drone = MockDrone('mock_drone') 94 self.manager._drones[self.mock_drone.name] = self.mock_drone 95 self.results_drone = MockDrone('results_drone', 0, 10) 96 self.manager._results_drone = self.results_drone 97 98 self.mock_drone_process = drone_manager.Process(self.mock_drone.name, 0) 99 100 101 def tearDown(self): 102 self.god.unstub_all() 103 104 105 def _test_choose_drone_for_execution_helper(self, processes_info_list, 106 requested_processes): 107 for index, process_info in enumerate(processes_info_list): 108 active_processes, max_processes = process_info 109 self.manager._enqueue_drone( 110 MockDrone(index, active_processes, max_processes, 111 allowed_users=None) 112 ) 113 114 return self.manager._choose_drone_for_execution( 115 requested_processes, self._USERNAME, None) 116 117 118 def test_choose_drone_for_execution(self): 119 drone = self._test_choose_drone_for_execution_helper([(1, 2), (0, 2)], 120 1) 121 self.assertEquals(drone.name, 1) 122 123 124 def test_choose_drone_for_execution_some_full(self): 125 drone = self._test_choose_drone_for_execution_helper([(0, 1), (1, 3)], 126 2) 127 self.assertEquals(drone.name, 1) 128 129 130 def test_choose_drone_for_execution_all_full(self): 131 drone = self._test_choose_drone_for_execution_helper([(2, 1), (3, 2)], 132 1) 133 self.assertEquals(drone.name, 1) 134 135 136 def test_choose_drone_for_execution_all_full_same_percentage_capacity(self): 137 drone = self._test_choose_drone_for_execution_helper([(5, 3), (10, 6)], 138 1) 139 self.assertEquals(drone.name, 1) 140 141 142 def test_user_restrictions(self): 143 # this drone is restricted to a different user 144 self.manager._enqueue_drone(MockDrone(1, max_processes=10, 145 allowed_users=['fakeuser'])) 146 # this drone is allowed but has lower capacity 147 self.manager._enqueue_drone(MockDrone(2, max_processes=2, 148 allowed_users=[self._USERNAME])) 149 150 self.assertEquals(2, 151 self.manager.max_runnable_processes(self._USERNAME, 152 None)) 153 drone = self.manager._choose_drone_for_execution( 154 1, username=self._USERNAME, drone_hostnames_allowed=None) 155 self.assertEquals(drone.name, 2) 156 157 158 def test_user_restrictions_with_full_drone(self): 159 # this drone is restricted to a different user 160 self.manager._enqueue_drone(MockDrone(1, max_processes=10, 161 allowed_users=['fakeuser'])) 162 # this drone is allowed but is full 163 self.manager._enqueue_drone(MockDrone(2, active_processes=3, 164 max_processes=2, 165 allowed_users=[self._USERNAME])) 166 167 self.assertEquals(0, 168 self.manager.max_runnable_processes(self._USERNAME, 169 None)) 170 drone = self.manager._choose_drone_for_execution( 171 1, username=self._USERNAME, drone_hostnames_allowed=None) 172 self.assertEquals(drone.name, 2) 173 174 175 def _setup_test_drone_restrictions(self, active_processes=0): 176 self.manager._enqueue_drone(MockDrone( 177 1, active_processes=active_processes, max_processes=10)) 178 self.manager._enqueue_drone(MockDrone( 179 2, active_processes=active_processes, max_processes=5)) 180 self.manager._enqueue_drone(MockDrone( 181 3, active_processes=active_processes, max_processes=2)) 182 183 184 def test_drone_restrictions_allow_any(self): 185 self._setup_test_drone_restrictions() 186 self.assertEquals(10, 187 self.manager.max_runnable_processes(self._USERNAME, 188 None)) 189 drone = self.manager._choose_drone_for_execution( 190 1, username=self._USERNAME, drone_hostnames_allowed=None) 191 self.assertEqual(drone.name, 1) 192 193 194 def test_drone_restrictions_under_capacity(self): 195 self._setup_test_drone_restrictions() 196 drone_hostnames_allowed = (2, 3) 197 self.assertEquals( 198 5, self.manager.max_runnable_processes(self._USERNAME, 199 drone_hostnames_allowed)) 200 drone = self.manager._choose_drone_for_execution( 201 1, username=self._USERNAME, 202 drone_hostnames_allowed=drone_hostnames_allowed) 203 204 self.assertEqual(drone.name, 2) 205 206 207 def test_drone_restrictions_over_capacity(self): 208 self._setup_test_drone_restrictions(active_processes=6) 209 drone_hostnames_allowed = (2, 3) 210 self.assertEquals( 211 0, self.manager.max_runnable_processes(self._USERNAME, 212 drone_hostnames_allowed)) 213 drone = self.manager._choose_drone_for_execution( 214 7, username=self._USERNAME, 215 drone_hostnames_allowed=drone_hostnames_allowed) 216 self.assertEqual(drone.name, 2) 217 218 219 def test_drone_restrictions_allow_none(self): 220 self._setup_test_drone_restrictions() 221 drone_hostnames_allowed = () 222 self.assertEquals( 223 0, self.manager.max_runnable_processes(self._USERNAME, 224 drone_hostnames_allowed)) 225 drone = self.manager._choose_drone_for_execution( 226 1, username=self._USERNAME, 227 drone_hostnames_allowed=drone_hostnames_allowed) 228 self.assertEqual(drone, None) 229 230 231 def test_initialize(self): 232 results_hostname = 'results_repo' 233 results_install_dir = '/results/install' 234 global_config.global_config.override_config_value( 235 scheduler_config.CONFIG_SECTION, 236 'results_host_installation_directory', results_install_dir) 237 238 (drones.get_drone.expect_call(self.mock_drone.name) 239 .and_return(self.mock_drone)) 240 241 results_drone = MockDrone('results_drone') 242 self.god.stub_function(results_drone, 'set_autotest_install_dir') 243 drones.get_drone.expect_call(results_hostname).and_return(results_drone) 244 results_drone.set_autotest_install_dir.expect_call(results_install_dir) 245 246 self.manager.initialize(base_results_dir=self._RESULTS_DIR, 247 drone_hostnames=[self.mock_drone.name], 248 results_repository_hostname=results_hostname) 249 250 self.assert_(self.mock_drone.was_call_queued( 251 'initialize', self._DRONE_RESULTS_DIR + '/')) 252 self.god.check_playback() 253 254 255 def test_execute_command(self): 256 self.manager._enqueue_drone(self.mock_drone) 257 258 pidfile_name = 'my_pidfile' 259 log_file = 'log_file' 260 261 pidfile_id = self.manager.execute_command( 262 command=['test', drone_manager.WORKING_DIRECTORY], 263 working_directory=self._WORKING_DIRECTORY, 264 pidfile_name=pidfile_name, 265 num_processes=1, 266 log_file=log_file) 267 268 full_working_directory = os.path.join(self._DRONE_RESULTS_DIR, 269 self._WORKING_DIRECTORY) 270 self.assertEquals(pidfile_id.path, 271 os.path.join(full_working_directory, pidfile_name)) 272 self.assert_(self.mock_drone.was_call_queued( 273 'execute_command', ['test', full_working_directory], 274 full_working_directory, 275 os.path.join(self._DRONE_RESULTS_DIR, log_file), pidfile_name)) 276 277 278 def test_attach_file_to_execution(self): 279 self.manager._enqueue_drone(self.mock_drone) 280 281 contents = 'my\ncontents' 282 attached_path = self.manager.attach_file_to_execution( 283 self._WORKING_DIRECTORY, contents) 284 self.manager.execute_command(command=['test'], 285 working_directory=self._WORKING_DIRECTORY, 286 pidfile_name='mypidfile', 287 num_processes=1, 288 drone_hostnames_allowed=None) 289 290 self.assert_(self.mock_drone.was_call_queued( 291 'write_to_file', 292 os.path.join(self._DRONE_RESULTS_DIR, attached_path), 293 contents)) 294 295 296 def test_copy_results_on_drone(self): 297 self.manager.copy_results_on_drone(self.mock_drone_process, 298 self._SOURCE_PATH, 299 self._DESTINATION_PATH) 300 self.assert_(self.mock_drone.was_call_queued( 301 'copy_file_or_directory', 302 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH), 303 os.path.join(self._DRONE_RESULTS_DIR, self._DESTINATION_PATH))) 304 305 306 def test_copy_to_results_repository(self): 307 drone_manager.ENABLE_ARCHIVING = True 308 self.manager._copy_to_results_repository(self.mock_drone_process, 309 self._SOURCE_PATH) 310 self.assert_(self.mock_drone.was_file_sent( 311 self.results_drone, 312 os.path.join(self._DRONE_RESULTS_DIR, self._SOURCE_PATH), 313 os.path.join(self._RESULTS_DIR, self._SOURCE_PATH))) 314 315 316 def test_write_lines_to_file(self): 317 file_path = 'file/path' 318 lines = ['line1', 'line2'] 319 written_data = 'line1\nline2\n' 320 321 # write to results repository 322 self.manager.write_lines_to_file(file_path, lines) 323 self.assert_(self.results_drone.was_call_queued( 324 'write_to_file', os.path.join(self._RESULTS_DIR, file_path), 325 written_data)) 326 327 # write to a drone 328 self.manager.write_lines_to_file( 329 file_path, lines, paired_with_process=self.mock_drone_process) 330 self.assert_(self.mock_drone.was_call_queued( 331 'write_to_file', 332 os.path.join(self._DRONE_RESULTS_DIR, file_path), written_data)) 333 334 335 def test_pidfile_expiration(self): 336 self.god.stub_with(self.manager, '_get_max_pidfile_refreshes', 337 lambda: 0) 338 pidfile_id = self.manager.get_pidfile_id_from('tag', 'name') 339 self.manager.register_pidfile(pidfile_id) 340 self.manager._drop_old_pidfiles() 341 self.manager._drop_old_pidfiles() 342 self.assertFalse(self.manager._registered_pidfile_info) 343 344 345class ThreadedDroneTest(unittest.TestCase): 346 _DRONE_INSTALL_DIR = '/drone/install/dir' 347 _RESULTS_DIR = '/results/dir' 348 _DRONE_CLASS = drones._RemoteDrone 349 _DRONE_HOST = ssh_host.SSHHost 350 351 352 def create_drone(self, drone_hostname, mock_hostname, 353 timestamp_remote_calls=False): 354 """Create and initialize a Remote Drone. 355 356 @return: A remote drone instance. 357 """ 358 mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname) 359 self.god.stub_function(drones.drone_utility, 'create_host') 360 drones.drone_utility.create_host.expect_call(drone_hostname).and_return( 361 mock_host) 362 mock_host.is_up.expect_call().and_return(True) 363 return self._DRONE_CLASS(drone_hostname, 364 timestamp_remote_calls=timestamp_remote_calls) 365 366 367 def create_fake_pidfile_info(self, tag='tag', name='name'): 368 pidfile_id = self.manager.get_pidfile_id_from(tag, name) 369 self.manager.register_pidfile(pidfile_id) 370 return self.manager._registered_pidfile_info 371 372 373 def setUp(self): 374 self.god = mock.mock_god() 375 self.god.stub_with(drones, 'AUTOTEST_INSTALL_DIR', 376 self._DRONE_INSTALL_DIR) 377 self.manager = drone_manager.DroneManager() 378 self.god.stub_with(self.manager, '_results_dir', self._RESULTS_DIR) 379 380 # we don't want this to ever actually get called 381 self.god.stub_function(drones, 'get_drone') 382 # we don't want the DroneManager to go messing with global config 383 def do_nothing(): 384 pass 385 self.god.stub_with(self.manager, 'refresh_drone_configs', do_nothing) 386 387 self.results_drone = MockDrone('results_drone', 0, 10) 388 self.manager._results_drone = self.results_drone 389 self.drone_utility_path = 'mock-drone-utility-path' 390 self.mock_return = {'results': ['mock results'], 391 'warnings': []} 392 393 394 def tearDown(self): 395 self.god.unstub_all() 396 397 def test_trigger_refresh(self): 398 """Test drone manager trigger refresh.""" 399 self.god.stub_with(self._DRONE_CLASS, '_drone_utility_path', 400 self.drone_utility_path) 401 mock_drone = self.create_drone('fakedrone1', 'fakehost1') 402 self.manager._drones[mock_drone.hostname] = mock_drone 403 404 # Create some fake pidfiles and confirm that a refresh call is 405 # executed on each drone host, with the same pidfile paths. Then 406 # check that each drone gets a key in the returned results dictionary. 407 for i in range(0, 1): 408 pidfile_info = self.create_fake_pidfile_info( 409 'tag%s' % i, 'name%s' %i) 410 pidfile_paths = [pidfile.path for pidfile in pidfile_info.keys()] 411 refresh_call = drone_utility.call('refresh', pidfile_paths) 412 expected_results = {} 413 mock_result = utils.CmdResult( 414 stdout=cPickle.dumps(self.mock_return)) 415 for drone in self.manager.get_drones(): 416 drone._host.run.expect_call( 417 'python %s' % self.drone_utility_path, 418 stdin=cPickle.dumps([refresh_call]), stdout_tee=None, 419 connect_timeout=mock.is_instance_comparator(int) 420 ).and_return(mock_result) 421 expected_results[drone] = self.mock_return['results'] 422 self.manager.trigger_refresh() 423 self.assertTrue(self.manager._refresh_task_queue.get_results() == 424 expected_results) 425 self.god.check_playback() 426 427 428 def test_sync_refresh(self): 429 """Test drone manager sync refresh.""" 430 431 mock_drone = self.create_drone('fakedrone1', 'fakehost1') 432 self.manager._drones[mock_drone.hostname] = mock_drone 433 434 # Insert some drone_utility results into the results queue, then 435 # check that get_results returns it in the right format, and that 436 # the rest of sync_refresh populates the right datastructures for 437 # correct handling of agents. Also confirm that this method of 438 # syncing is sufficient for the monitor to pick up the exit status 439 # of the process in the same way it would in handle_agents. 440 pidfile_path = 'results/hosts/host_id/job_id-name/.autoserv_execute' 441 pidfiles = {pidfile_path: '123\n12\n0\n'} 442 drone_utility_results = { 443 'pidfiles': pidfiles, 444 'autoserv_processes':{}, 445 'all_processes':{}, 446 'parse_processes':{}, 447 'pidfiles_second_read':pidfiles, 448 } 449 # Our manager instance isn't the drone manager singletone that the 450 # pidfile_monitor will use by default, becuase setUp doesn't call 451 # drone_manager.instance(). 452 self.god.stub_with(drone_manager, '_the_instance', self.manager) 453 monitor = pidfile_monitor.PidfileRunMonitor() 454 monitor.pidfile_id = drone_manager.PidfileId(pidfile_path) 455 self.manager.register_pidfile(monitor.pidfile_id) 456 self.assertTrue(monitor._state.exit_status == None) 457 458 self.manager._refresh_task_queue.results_queue.put( 459 thread_lib.ThreadedTaskQueue.result( 460 mock_drone, [drone_utility_results])) 461 self.manager.sync_refresh() 462 pidfiles = self.manager._pidfiles 463 pidfile_id = pidfiles.keys()[0] 464 pidfile_contents = pidfiles[pidfile_id] 465 466 self.assertTrue( 467 pidfile_id.path == pidfile_path and 468 pidfile_contents.process.pid == 123 and 469 pidfile_contents.process.hostname == 470 mock_drone.hostname and 471 pidfile_contents.exit_status == 12 and 472 pidfile_contents.num_tests_failed == 0) 473 self.assertTrue(monitor.exit_code() == 12) 474 self.god.check_playback() 475 476 477class ThreadedLocalhostDroneTest(ThreadedDroneTest): 478 _DRONE_CLASS = drones._LocalDrone 479 _DRONE_HOST = local_host.LocalHost 480 481 482 def create_drone(self, drone_hostname, mock_hostname, 483 timestamp_remote_calls=False): 484 """Create and initialize a Remote Drone. 485 486 @return: A remote drone instance. 487 """ 488 mock_host = self.god.create_mock_class(self._DRONE_HOST, mock_hostname) 489 self.god.stub_function(drones.drone_utility, 'create_host') 490 local_drone = self._DRONE_CLASS( 491 timestamp_remote_calls=timestamp_remote_calls) 492 self.god.stub_with(local_drone, '_host', mock_host) 493 return local_drone 494 495 496if __name__ == '__main__': 497 unittest.main() 498