1# 2# Copyright (C) 2017 The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15# 16 17import logging 18import socket 19import time 20import uuid 21 22import httplib2 23from googleapiclient import errors 24 25from host_controller import invocation_thread 26from host_controller.tradefed import remote_operation 27from host_controller.tfc import command_attempt 28 29 30class HostController(object): 31 """The class that relays commands between a TradeFed host and clusters. 32 33 Attributes: 34 _remote_client: The RemoteClient which runs commands. 35 _tfc_client: The TfcClient from which the command tasks are leased. 36 _hostname: A string, the name of the TradeFed host. 37 _cluster_ids: A list of strings, the cluster IDs for leasing tasks. 38 _invocation_threads: The list of running InvocationThread. 39 """ 40 41 def __init__(self, remote_client, tfc_client, hostname, cluster_ids): 42 """Initializes the attributes.""" 43 self._remote_client = remote_client 44 self._tfc_client = tfc_client 45 self._hostname = hostname 46 self._cluster_ids = cluster_ids 47 self._invocation_threads = [] 48 49 @property 50 def hostname(self): 51 """Returns the name of the host.""" 52 return self._hostname 53 54 def _JoinInvocationThreads(self): 55 """Removes terminated threads from _invocation_threads.""" 56 alive_threads = [] 57 for inv_thread in self._invocation_threads: 58 inv_thread.join(0) 59 if inv_thread.is_alive(): 60 alive_threads.append(inv_thread) 61 self._invocation_threads = alive_threads 62 63 def _CreateInvocationThread(self, task): 64 """Creates an invocation thread from a command task. 65 66 Args: 67 task: The CommandTask object. 68 69 Returns: 70 An InvocationThread. 71 """ 72 attempt_id = uuid.uuid4() 73 attempt = command_attempt.CommandAttempt( 74 task.task_id, attempt_id, 75 self._hostname, task.device_serials[0]) 76 inv_thread = invocation_thread.InvocationThread( 77 self._remote_client, self._tfc_client, attempt, 78 task.command_line.split(), task.device_serials) 79 return inv_thread 80 81 def ListDevices(self): 82 """Lists present devices on the host. 83 84 Returns: 85 A list of DeviceInfo. 86 """ 87 devices = self._remote_client.ListDevices() 88 return [dev for dev in devices if not dev.IsStub()] 89 90 def ListAvailableDevices(self): 91 """Lists available devices for command tasks. 92 93 Returns: 94 A list of DeviceInfo. 95 """ 96 self._JoinInvocationThreads() 97 allocated_serials = set() 98 for inv_thread in self._invocation_threads: 99 allocated_serials.update(inv_thread.device_serials) 100 101 present_devices = self.ListDevices() 102 return [dev for dev in present_devices if 103 dev.IsAvailable() and 104 dev.device_serial not in allocated_serials] 105 106 def LeaseCommandTasks(self): 107 """Leases command tasks and creates threads to execute them. 108 109 Returns: 110 A list of CommandTask. The leased command tasks. 111 """ 112 available_devices = self.ListAvailableDevices() 113 if not available_devices: 114 return [] 115 116 tasks = self._tfc_client.LeaseHostTasks( 117 self._cluster_ids[0], self._cluster_ids[1:], 118 self._hostname, available_devices) 119 for task in tasks: 120 inv_thread = self._CreateInvocationThread(task) 121 inv_thread.daemon = True 122 inv_thread.start() 123 self._invocation_threads.append(inv_thread) 124 return tasks 125 126 def Run(self, poll_interval): 127 """Starts polling TFC for tasks. 128 129 Args: 130 poll_interval: The poll interval in seconds. 131 """ 132 while True: 133 try: 134 self.LeaseCommandTasks() 135 except (socket.error, 136 remote_operation.RemoteOperationException, 137 httplib2.HttpLib2Error, 138 errors.HttpError) as e: 139 logging.exception(e) 140 time.sleep(poll_interval) 141