• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#pylint: disable-msg=C0111
2
3# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Scheduler library classes.
8"""
9
10import collections
11import logging
12
13import common
14
15from autotest_lib.frontend import setup_django_environment
16
17from autotest_lib.client.common_lib import utils
18from autotest_lib.frontend.afe import models
19from autotest_lib.server.cros.dynamic_suite import constants
20from autotest_lib.scheduler import scheduler_models
21from autotest_lib.scheduler import scheduler_lib
22
23try:
24    from chromite.lib import metrics
25except ImportError:
26    metrics = utils.metrics_mock
27
28
29_job_timer_name = 'chromeos/autotest/scheduler/job_query_durations/%s'
30class AFEJobQueryManager(object):
31    """Query manager for AFE Jobs."""
32
33    # A subquery to only get inactive hostless jobs.
34    hostless_query = 'host_id IS NULL AND meta_host IS NULL'
35
36
37    @metrics.SecondsTimerDecorator(
38            _job_timer_name % 'get_pending_queue_entries')
39    def get_pending_queue_entries(self, only_hostless=False):
40        """
41        Fetch a list of new host queue entries.
42
43        The ordering of this list is important, as every new agent
44        we schedule can potentially contribute to the process count
45        on the drone, which has a static limit. The sort order
46        prioritizes jobs as follows:
47        1. High priority jobs: Based on the afe_job's priority
48        2. With hosts and metahosts: This will only happen if we don't
49            activate the hqe after assigning a host to it in
50            schedule_new_jobs.
51        3. With hosts but without metahosts: When tests are scheduled
52            through the frontend the owner of the job would have chosen
53            a host for it.
54        4. Without hosts but with metahosts: This is the common case of
55            a new test that needs a DUT. We assign a host and set it to
56            active so it shouldn't show up in case 2 on the next tick.
57        5. Without hosts and without metahosts: Hostless suite jobs, that
58            will result in new jobs that fall under category 4.
59
60        A note about the ordering of cases 3 and 4:
61        Prioritizing one case above the other leads to earlier acquisition
62        of the following resources: 1. process slots on the drone 2. machines.
63        - When a user schedules a job through the afe they choose a specific
64          host for it. Jobs with metahost can utilize any host that satisfies
65          the metahost criterion. This means that if we had scheduled 4 before
66          3 there is a good chance that a job which could've used another host,
67          will now use the host assigned to a metahost-less job. Given the
68          availability of machines in pool:suites, this almost guarantees
69          starvation for jobs scheduled through the frontend.
70        - Scheduling 4 before 3 also has its pros however, since a suite
71          has the concept of a time out, whereas users can wait. If we hit the
72          process count on the drone a suite can timeout waiting on the test,
73          but a user job generally has a much longer timeout, and relatively
74          harmless consequences.
75        The current ordering was chosed because it is more likely that we will
76        run out of machines in pool:suites than processes on the drone.
77
78        @returns A list of HQEs ordered according to sort_order.
79        """
80        sort_order = ('afe_jobs.priority DESC, '
81                      'ISNULL(host_id), '
82                      'ISNULL(meta_host), '
83                      'parent_job_id, '
84                      'job_id')
85        # Don't execute jobs that should be executed by a shard in the global
86        # scheduler.
87        # This won't prevent the shard scheduler to run this, as the shard db
88        # doesn't have an an entry in afe_shards_labels.
89        query=('NOT complete AND NOT active AND status="Queued"'
90               'AND NOT aborted AND afe_shards_labels.id IS NULL')
91
92        # TODO(jakobjuelich, beeps): Optimize this query. Details:
93        # Compressed output of EXPLAIN <query>:
94        # +------------------------+--------+-------------------------+-------+
95        # | table                  | type   | key                     | rows  |
96        # +------------------------+--------+-------------------------+-------+
97        # | afe_host_queue_entries | ref    | host_queue_entry_status | 30536 |
98        # | afe_shards_labels      | ref    | shard_label_id_fk       |     1 |
99        # | afe_jobs               | eq_ref | PRIMARY                 |     1 |
100        # +------------------------+--------+-------------------------+-------+
101        # This shows the first part of the query fetches a lot of objects, that
102        # are then filtered. The joins are comparably fast: There's usually just
103        # one or none shard mapping that can be answered fully using an index
104        # (shard_label_id_fk), similar thing applies to the job.
105        #
106        # This works for now, but once O(#Jobs in shard) << O(#Jobs in Queued),
107        # it might be more efficient to filter on the meta_host first, instead
108        # of the status.
109        if only_hostless:
110            query = '%s AND (%s)' % (query, self.hostless_query)
111        return list(scheduler_models.HostQueueEntry.fetch(
112            joins=('INNER JOIN afe_jobs ON (job_id=afe_jobs.id) '
113                   'LEFT JOIN afe_shards_labels ON ('
114                   'meta_host=afe_shards_labels.label_id)'),
115            where=query, order_by=sort_order))
116
117
118    @metrics.SecondsTimerDecorator(
119            _job_timer_name % 'get_prioritized_special_tasks')
120    def get_prioritized_special_tasks(self, only_tasks_with_leased_hosts=False):
121        """
122        Returns all queued SpecialTasks prioritized for repair first, then
123        cleanup, then verify.
124
125        @param only_tasks_with_leased_hosts: If true, this method only returns
126            tasks with leased hosts.
127
128        @return: list of afe.models.SpecialTasks sorted according to priority.
129        """
130        queued_tasks = models.SpecialTask.objects.filter(is_active=False,
131                                                         is_complete=False,
132                                                         host__locked=False)
133        # exclude hosts with active queue entries unless the SpecialTask is for
134        # that queue entry
135        queued_tasks = models.SpecialTask.objects.add_join(
136                queued_tasks, 'afe_host_queue_entries', 'host_id',
137                join_condition='afe_host_queue_entries.active',
138                join_from_key='host_id', force_left_join=True)
139        queued_tasks = queued_tasks.extra(
140                where=['(afe_host_queue_entries.id IS NULL OR '
141                       'afe_host_queue_entries.id = '
142                               'afe_special_tasks.queue_entry_id)'])
143        if only_tasks_with_leased_hosts:
144            queued_tasks = queued_tasks.filter(host__leased=True)
145
146        # reorder tasks by priority
147        task_priority_order = [models.SpecialTask.Task.REPAIR,
148                               models.SpecialTask.Task.CLEANUP,
149                               models.SpecialTask.Task.VERIFY,
150                               models.SpecialTask.Task.RESET,
151                               models.SpecialTask.Task.PROVISION]
152        def task_priority_key(task):
153            return task_priority_order.index(task.task)
154        return sorted(queued_tasks, key=task_priority_key)
155
156
157    @classmethod
158    def get_overlapping_jobs(cls):
159        """A helper method to get all active jobs using the same host.
160
161        @return: A list of dictionaries with the hqe id, job_id and host_id
162            of the currently overlapping jobs.
163        """
164        # Filter all active hqes and stand alone special tasks to make sure
165        # a host isn't being used by two jobs at the same time. An incomplete
166        # stand alone special task can share a host with an active hqe, an
167        # example of this is the cleanup scheduled in gathering.
168        hqe_hosts = list(models.HostQueueEntry.objects.filter(
169                active=1, complete=0, host_id__isnull=False).values_list(
170                'host_id', flat=True))
171        special_task_hosts = list(models.SpecialTask.objects.filter(
172                is_active=1, is_complete=0, host_id__isnull=False,
173                queue_entry_id__isnull=True).values_list('host_id', flat=True))
174        host_counts = collections.Counter(
175                hqe_hosts + special_task_hosts).most_common()
176        multiple_hosts = [count[0] for count in host_counts if count[1] > 1]
177        return list(models.HostQueueEntry.objects.filter(
178                host_id__in=multiple_hosts, active=True).values(
179                        'id', 'job_id', 'host_id'))
180
181
182    @metrics.SecondsTimerDecorator(
183            _job_timer_name % 'get_suite_host_assignment')
184    def get_suite_host_assignment(self):
185        """A helper method to get how many hosts each suite is holding.
186
187        @return: Two dictionaries (suite_host_num, hosts_to_suites)
188                 suite_host_num maps suite job id to number of hosts
189                 holding by its child jobs.
190                 hosts_to_suites contains current hosts held by
191                 any suites, and maps the host id to its parent_job_id.
192        """
193        query = models.HostQueueEntry.objects.filter(
194                host_id__isnull=False, complete=0, active=1,
195                job__parent_job_id__isnull=False)
196        suite_host_num = {}
197        hosts_to_suites = {}
198        for hqe in query:
199            host_id = hqe.host_id
200            parent_job_id = hqe.job.parent_job_id
201            count = suite_host_num.get(parent_job_id, 0)
202            suite_host_num[parent_job_id] = count + 1
203            hosts_to_suites[host_id] = parent_job_id
204        return suite_host_num, hosts_to_suites
205
206
207    @metrics.SecondsTimerDecorator( _job_timer_name % 'get_min_duts_of_suites')
208    def get_min_duts_of_suites(self, suite_job_ids):
209        """Load suite_min_duts job keyval for a set of suites.
210
211        @param suite_job_ids: A set of suite job ids.
212
213        @return: A dictionary where the key is a suite job id,
214                 the value is the value of 'suite_min_duts'.
215        """
216        query = models.JobKeyval.objects.filter(
217                job_id__in=suite_job_ids,
218                key=constants.SUITE_MIN_DUTS_KEY, value__isnull=False)
219        return dict((keyval.job_id, int(keyval.value)) for keyval in query)
220
221
222_host_timer_name = 'chromeos/autotest/scheduler/host_query_durations/%s'
223class AFEHostQueryManager(object):
224    """Query manager for AFE Hosts."""
225
226    def __init__(self):
227        """Create an AFEHostQueryManager.
228
229        @param db: A connection to the database with the afe_hosts table.
230        """
231        self._db = scheduler_lib.ConnectionManager().get_connection()
232
233
234    def _process_many2many_dict(self, rows, flip=False):
235        result = {}
236        for row in rows:
237            left_id, right_id = int(row[0]), int(row[1])
238            if flip:
239                left_id, right_id = right_id, left_id
240            result.setdefault(left_id, set()).add(right_id)
241        return result
242
243
244    def _get_sql_id_list(self, id_list):
245        return ','.join(str(item_id) for item_id in id_list)
246
247
248    def _get_many2many_dict(self, query, id_list, flip=False):
249        if not id_list:
250            return {}
251        query %= self._get_sql_id_list(id_list)
252        rows = self._db.execute(query)
253        return self._process_many2many_dict(rows, flip)
254
255
256    def _get_ready_hosts(self):
257        # We don't lose anything by re-doing these checks
258        # even though we release hosts on the same conditions.
259        # In the future we might have multiple clients that
260        # release_hosts and/or lock them independent of the
261        # scheduler tick.
262        hosts = scheduler_models.Host.fetch(
263            where="NOT afe_hosts.leased "
264                  "AND NOT afe_hosts.locked "
265                  "AND (afe_hosts.status IS NULL "
266                      "OR afe_hosts.status = 'Ready')")
267        return dict((host.id, host) for host in hosts)
268
269
270    @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_acl_groups')
271    def _get_job_acl_groups(self, job_ids):
272        query = """
273        SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
274        FROM afe_jobs
275        INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
276        INNER JOIN afe_acl_groups_users ON
277                afe_acl_groups_users.user_id = afe_users.id
278        WHERE afe_jobs.id IN (%s)
279        """
280        return self._get_many2many_dict(query, job_ids)
281
282
283    def _get_job_ineligible_hosts(self, job_ids):
284        query = """
285        SELECT job_id, host_id
286        FROM afe_ineligible_host_queues
287        WHERE job_id IN (%s)
288        """
289        return self._get_many2many_dict(query, job_ids)
290
291
292    @metrics.SecondsTimerDecorator(_host_timer_name % 'get_job_dependencies')
293    def _get_job_dependencies(self, job_ids):
294        query = """
295        SELECT job_id, label_id
296        FROM afe_jobs_dependency_labels
297        WHERE job_id IN (%s)
298        """
299        return self._get_many2many_dict(query, job_ids)
300
301
302    @classmethod
303    def find_unused_healty_hosts(cls):
304        """Get hosts that are currently unused and in the READY state.
305
306        @return: A list of host objects, one for each unused healthy host.
307        """
308        # Avoid any host with a currently active queue entry against it.
309        hqe_join = ('LEFT JOIN afe_host_queue_entries AS active_hqe '
310                    'ON (afe_hosts.id = active_hqe.host_id AND '
311                    'active_hqe.active)')
312
313        # Avoid any host with a new special task against it. There are 2 cases
314        # when an inactive but incomplete special task will not use the host
315        # this tick: 1. When the host is locked 2. When an active hqe already
316        # has special tasks for the same host. In both these cases this host
317        # will not be in the ready hosts list anyway. In all other cases,
318        # an incomplete special task will grab the host before a new job does
319        # by assigning an agent to it.
320        special_task_join = ('LEFT JOIN afe_special_tasks as new_tasks '
321                             'ON (afe_hosts.id = new_tasks.host_id AND '
322                             'new_tasks.is_complete=0)')
323
324        return scheduler_models.Host.fetch(
325            joins='%s %s' % (hqe_join, special_task_join),
326            where="active_hqe.host_id IS NULL AND new_tasks.host_id IS NULL "
327                  "AND afe_hosts.leased "
328                  "AND NOT afe_hosts.locked "
329                  "AND (afe_hosts.status IS NULL "
330                          "OR afe_hosts.status = 'Ready')")
331
332    @metrics.SecondsTimerDecorator(_host_timer_name % 'set_leased')
333    def set_leased(self, leased_value, **kwargs):
334        """Modify the leased bit on the hosts with ids in host_ids.
335
336        @param leased_value: The True/False value of the leased column for
337            the hosts with ids in host_ids.
338        @param kwargs: The args to use in finding matching hosts.
339        """
340        logging.info('Setting leased = %s for the hosts that match %s',
341                     leased_value, kwargs)
342        models.Host.objects.filter(**kwargs).update(leased=leased_value)
343
344
345    @metrics.SecondsTimerDecorator(_host_timer_name % 'get_labels')
346    def _get_labels(self, job_dependencies):
347        """
348        Calculate a dict mapping label id to label object so that we don't
349        frequently round trip to the database every time we need a label.
350
351        @param job_dependencies: A dict mapping an integer job id to a list of
352            integer label id's.  ie. {job_id: [label_id]}
353        @return: A dict mapping an integer label id to a scheduler model label
354            object.  ie. {label_id: label_object}
355
356        """
357        id_to_label = dict()
358        # Pull all the labels on hosts we might look at
359        host_labels = scheduler_models.Label.fetch(
360                where="id IN (SELECT label_id FROM afe_hosts_labels)")
361        id_to_label.update([(label.id, label) for label in host_labels])
362        # and pull all the labels on jobs we might look at.
363        job_label_set = set()
364        for job_deps in job_dependencies.values():
365            job_label_set.update(job_deps)
366        # On the rare/impossible chance that no jobs have any labels, we
367        # can skip this.
368        if job_label_set:
369            job_string_label_list = ','.join([str(x) for x in job_label_set])
370            job_labels = scheduler_models.Label.fetch(
371                    where="id IN (%s)" % job_string_label_list)
372            id_to_label.update([(label.id, label) for label in job_labels])
373        return id_to_label
374
375
376    def refresh(self, pending_queue_entries):
377        """Update the query manager.
378
379        Cache information about a list of queue entries and eligible hosts
380        from the database so clients can avoid expensive round trips during
381        host acquisition.
382
383        @param pending_queue_entries: A list of queue entries about which we
384            need information.
385        """
386        self._hosts_available = self._get_ready_hosts()
387        relevant_jobs = [queue_entry.job_id
388                         for queue_entry in pending_queue_entries]
389        self._job_acls = self._get_job_acl_groups(relevant_jobs)
390        self._ineligible_hosts = (self._get_job_ineligible_hosts(relevant_jobs))
391        self._job_dependencies = (self._get_job_dependencies(relevant_jobs))
392        host_ids = self._hosts_available.keys()
393        self._labels = self._get_labels(self._job_dependencies)
394