• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# pylint: disable=missing-docstring
2
3""" This is the module for everything related to the AgentTask.
4
5The AgentTask imposes an interface through which the scheduler can monitor
6a processes; Examples of such processes include Verify, Cleanup and the Queue
7Tasks that run the tests. The scheduler itself only understands Agents.
8Agents:
9    The Agent is the bridge between the scheduler and the AgentTask. The
10    schedulers tick has a method called handle_agents, which calls the
11    tick of each agent in the Dispatchers queue. This leads to the Agent
12    polling its AgentTask. The scheduler will keep polling a task through
13    the associated Agent till the Agent is removed from the dispatcher.
14
15    At a high level:
16        agents finished = tasks done
17        agent polls till finished
18            task polls till done
19                task sets done
20        agent is removed from dispatcher
21AgentTasks:
22    Basic AgentTasks are created when an hqe changes state. Examples of these
23    are the QueueTask, which is created when a hqe goes into the Starting state
24    and the FinalReparseTask, which is created when the hqe goes into parsing.
25SpecialAgentTasks:
26    Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
27    in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28
29Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30an AgentTask to an Agent, which the scheduler understands. From this point
31onward, the scheduler manages the task through the Agents interface,as follows:
32At a high level:
33    task poll
34        start
35            prolog
36        tick till we get an exit code
37        finished(exit==0)
38            done=True
39            epilog
40                cleanup
41                    set is_active, is_complete, success (checked in scheduler)
42
43The first special task for an HQE is usually Reset.
44-poll: The first poll will start the task, polls thereafter will call the tasks
45       tick method. A started task will have the started bit set.
46- start: Call prolog, run the process and set the start bit.
47    - prolog: Usually where one puts any model state changes that happen before
48              the actual task. Different per Task. Examples of things that might
49              happen in a prolog:
50                  - state of Host, HQE (to something like Resetting)
51                  - delete any unwanted queued special tasks
52                  - register a pidfile
53                  - set the is_active bit on the special task
54    - run:
55        - create a PidfileRunMonitor
56        - pass the autoserv command, working directory etc to drone manager.
57          This will start the actual autoserv process.
58   - set the start bit: so subsequent polls do not 'start' again
59
60- tick: For as long as a started tasks done bit is not set, a poll will lead
61        to a tick. The tick monitors the pid file of the autoserv process
62        running on the drone through the PidfileRunMonitor created in prolog.
63        If the autoserv process has finished we call finished with true/false
64        depending on autoserv exit code.
65
66        - finished: sets the done and success values, then calls epilog. The
67                    done bit is important because the Agent polls this bit to
68                    measure the success or failure of its task.
69
70            - epilog: Is generally where we set status of the Host/HQE again,
71                      requeue any other task that needs to run after this one
72                      and perform cleanup. Just like the prolog, this step is
73                      different per task.
74
75                      - cleanup: Sets the is_active and is_complete and success
76                                 states on the tasks model. Also uses the
77                                 drone_manager to:
78                                    unregister the pidfile
79                                    copy results of the task
80                                 (Note this is not to be confused with the
81                                  special task called cleanup).
82
83                      The actions we take in the epilog are based on the
84                      success/failure of the autoserv process set in cleanup,
85                      eg: if reset failed we will enqueue a repair, but if all
86                      is well the epilog will just return. Prejob task epilogs
87                      also have an on_pending method that change the status of
88                      the HQE to pending/starting, which gets picked up in the
89                      scheduler.
90By this point the is_done flag is set, which results in the Agent noticing that
91the task has finished and unregistering it from the dispatcher.Class hierarchy:
92AgentTask
93 |--->SpecialAgentTask (prejob_task.py)
94      |--->RepairTask
95      |--->PreJobTask
96           |--->Verify, Cleanup, Reset, Provision
97
98 |--->AbstractQueueTask (monitor_db.py)
99      |--->QueueTask
100      |--->HostlessQueueTask
101
102 |--->PostJobTask (postjob_task.py)
103      |--->GatherLogsTask
104      |--->SelfThrottledPostJobTask
105            |--->FinalReparseTask
106
107"""
108
109import logging
110import os
111import time
112import urllib
113
114import common
115
116from autotest_lib.client.common_lib import global_config
117from autotest_lib.client.common_lib import utils
118from autotest_lib.frontend.afe import models
119from autotest_lib.scheduler import drone_manager
120from autotest_lib.scheduler import email_manager
121from autotest_lib.scheduler import pidfile_monitor
122from autotest_lib.scheduler import rdb_lib
123from autotest_lib.scheduler import scheduler_lib
124from autotest_lib.scheduler import scheduler_models
125from autotest_lib.server import autoserv_utils
126from autotest_lib.server import system_utils
127
128try:
129    from chromite.lib import metrics
130except ImportError:
131    metrics = utils.metrics_mock
132
133
134CONFIG = global_config.global_config
135AUTOSERV_NICE_LEVEL = 10
136
137ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
138        'CROS', 'enable_drone_in_restricted_subnet', type=bool,
139        default=False)
140
141
142class AgentTask(object):
143    class _NullMonitor(object):
144        pidfile_id = None
145
146        def has_process(self):
147            return True
148
149
150    def __init__(self, log_file_name=None):
151        """
152        @param log_file_name: (optional) name of file to log command output to
153        """
154        self._drone_manager = drone_manager.instance()
155        self.done = False
156        self.started = False
157        self.success = None
158        self.aborted = False
159        self.monitor = None
160        self.queue_entry_ids = []
161        self.host_ids = []
162        # A map between host id and hostname.
163        self.hostnames = {}
164        self._log_file_name = log_file_name
165
166
167    def _set_ids(self, host=None, queue_entries=None):
168        if queue_entries and queue_entries != [None]:
169            self.host_ids = []
170            self.queue_entry_ids = []
171            self.hostnames = {}
172            for entry in queue_entries:
173                if entry.host is not None:
174                    self.host_ids.append(entry.host.id)
175                    self.queue_entry_ids.append(entry.id)
176                    self.hostnames[entry.host.id] = entry.host.hostname
177                else:
178                    logging.debug(
179                            'No host is found for host_queue_entry_id: %r',
180                            entry.id)
181                    raise scheduler_lib.NoHostIdError(
182                            'Failed to schedule a job whose '
183                            'host_queue_entry_id=%r due to no host_id.'
184                            % entry.id)
185        else:
186            assert host
187            self.host_ids = [host.id]
188            self.hostnames = {host.id: host.hostname}
189
190
191    def poll(self):
192        if not self.started:
193            self.start()
194        if not self.done:
195            self.tick()
196
197
198    def tick(self):
199        assert self.monitor
200        exit_code = self.monitor.exit_code()
201        if exit_code is None:
202            return
203
204        success = (exit_code == 0)
205        self.finished(success)
206
207
208    def is_done(self):
209        return self.done
210
211
212    def finished(self, success):
213        if self.done:
214            assert self.started
215            return
216        self.started = True
217        self.done = True
218        self.success = success
219        self.epilog()
220
221
222    def prolog(self):
223        """
224        To be overridden.
225        """
226        assert not self.monitor
227        self.register_necessary_pidfiles()
228
229
230    def _log_file(self):
231        if not self._log_file_name:
232            return None
233        return os.path.join(self._working_directory(), self._log_file_name)
234
235
236    def cleanup(self):
237        log_file = self._log_file()
238        if self.monitor and log_file:
239            self.monitor.try_copy_to_results_repository(log_file)
240
241
242    def epilog(self):
243        """
244        To be overridden.
245        """
246        self.cleanup()
247        logging.info("%s finished with success=%s", type(self).__name__,
248                     self.success)
249
250
251    def start(self):
252        if not self.started:
253            self.prolog()
254            self.run()
255
256        self.started = True
257
258
259    def abort(self):
260        if self.monitor:
261            self.monitor.kill()
262        self.done = True
263        self.aborted = True
264        self.cleanup()
265
266
267    def _get_consistent_execution_path(self, execution_entries):
268        first_execution_path = execution_entries[0].execution_path()
269        for execution_entry in execution_entries[1:]:
270            assert execution_entry.execution_path() == first_execution_path, (
271                '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
272                                        execution_entry,
273                                        first_execution_path,
274                                        execution_entries[0]))
275        return first_execution_path
276
277
278    def _copy_results(self, execution_entries, use_monitor=None):
279        """
280        @param execution_entries: list of objects with execution_path() method
281        """
282        if use_monitor is not None and not use_monitor.has_process():
283            return
284
285        assert len(execution_entries) > 0
286        if use_monitor is None:
287            assert self.monitor
288            use_monitor = self.monitor
289        assert use_monitor.has_process()
290        execution_path = self._get_consistent_execution_path(execution_entries)
291        results_path = execution_path + '/'
292        use_monitor.try_copy_to_results_repository(results_path)
293
294
295    def _parse_results(self, queue_entries):
296        for queue_entry in queue_entries:
297            queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
298
299
300    def _command_line(self):
301        """
302        Return the command line to run.  Must be overridden.
303        """
304        raise NotImplementedError
305
306
307    @property
308    def num_processes(self):
309        """
310        Return the number of processes forked by this AgentTask's process.
311        It may only be approximate.  To be overridden if necessary.
312        """
313        return 1
314
315
316    def _paired_with_monitor(self):
317        """
318        If this AgentTask's process must run on the same machine as some
319        previous process, this method should be overridden to return a
320        PidfileRunMonitor for that process.
321        """
322        return self._NullMonitor()
323
324
325    @property
326    def owner_username(self):
327        """
328        Return login of user responsible for this task.  May be None.  Must be
329        overridden.
330        """
331        raise NotImplementedError
332
333
334    def _working_directory(self):
335        """
336        Return the directory where this AgentTask's process executes.
337        Must be overridden.
338        """
339        raise NotImplementedError
340
341
342    def _pidfile_name(self):
343        """
344        Return the name of the pidfile this AgentTask's process uses.  To be
345        overridden if necessary.
346        """
347        return drone_manager.AUTOSERV_PID_FILE
348
349
350    def _check_paired_results_exist(self):
351        if not self._paired_with_monitor().has_process():
352            metrics.Counter(
353                'chromeos/autotest/errors/scheduler/no_paired_results'
354            ).increment()
355            self.finished(False)
356            return False
357        return True
358
359
360    def _create_monitor(self):
361        assert not self.monitor
362        self.monitor = pidfile_monitor.PidfileRunMonitor()
363
364
365    def run(self):
366        if not self._check_paired_results_exist():
367            return
368
369        self._create_monitor()
370        self.monitor.run(
371                self._command_line(), self._working_directory(),
372                num_processes=self.num_processes,
373                nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
374                pidfile_name=self._pidfile_name(),
375                paired_with_pidfile=self._paired_with_monitor().pidfile_id,
376                username=self.owner_username,
377                drone_hostnames_allowed=self.get_drone_hostnames_allowed())
378
379
380    def get_drone_hostnames_allowed(
381            self, restricted_subnets=utils.RESTRICTED_SUBNETS,
382            enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
383        filtered_drones = None
384        has_unrestricted_host = False
385        if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
386            for hostname in self.hostnames.values():
387                subnet = utils.get_restricted_subnet(hostname,
388                                                     restricted_subnets)
389
390                # Return an empty set if the list of hosts exists both in
391                # restricted and unrestricted subnet. No drone can work in such
392                # case.
393                if ((not subnet and filtered_drones is not None) or
394                    (subnet and has_unrestricted_host)):
395                    logging.error('The test has some DUT in restricted subnet, '
396                                  'but some in unrestricted subnet. Therefore, '
397                                  'no drone is available to run the test.')
398                    return set()
399
400                if not subnet:
401                    has_unrestricted_host = True
402                    continue
403
404                server_ip_map=system_utils.DroneCache.get_drone_ip_map()
405                filtered_drones_for_host = set(
406                        utils.get_servers_in_same_subnet(
407                                subnet[0], subnet[1],
408                                server_ip_map=server_ip_map))
409                logging.info('DUT %s is in restricted subnet, drone can only '
410                             'be chosen from %s', hostname,
411                             filtered_drones_for_host)
412                if filtered_drones is None:
413                    filtered_drones = filtered_drones_for_host
414                else:
415                    filtered_drones = set.intersection(
416                            filtered_drones, filtered_drones_for_host)
417
418                # If filtered_drones is an empty set, that means no drone is
419                # allowed to run the task. This is different fron None, which
420                # means all drones are allowed.
421                if filtered_drones == set():
422                    logging.error('DUT(s) is in restricted subnet, but no '
423                                  'drone is available to run the test.')
424                    return filtered_drones
425
426        # If host is not in restricted subnet, use the unrestricted drones only.
427        if (filtered_drones is None and restricted_subnets and
428            enable_drone_in_subnet):
429            filtered_drones = set(
430                    system_utils.DroneCache.get_unrestricted_drones(
431                            restricted_subnets=restricted_subnets))
432
433        if not models.DroneSet.drone_sets_enabled():
434            return filtered_drones
435
436        hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
437        if not hqes:
438            # Only special tasks could be missing host queue entries
439            assert isinstance(self, SpecialAgentTask)
440            return self._user_or_global_default_drone_set(
441                    self.task, self.task.requested_by)
442
443        job_ids = hqes.values_list('job', flat=True).distinct()
444        assert job_ids.count() == 1, ("AgentTask's queue entries "
445                                      "span multiple jobs")
446
447        job = models.Job.objects.get(id=job_ids[0])
448        drone_set = job.drone_set
449        if not drone_set:
450            return self._user_or_global_default_drone_set(job, job.user())
451
452        if filtered_drones:
453            return set.intersection(filtered_drones,
454                                    drone_set.get_drone_hostnames())
455        else:
456            return drone_set.get_drone_hostnames()
457
458
459    def _user_or_global_default_drone_set(self, obj_with_owner, user):
460        """
461        Returns the user's default drone set, if present.
462
463        Otherwise, returns the global default drone set.
464        """
465        default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
466        if not user:
467            logging.warning('%s had no owner; using default drone set',
468                         obj_with_owner)
469            return default_hostnames
470        if not user.drone_set:
471            logging.warning('User %s has no default drone set, using global '
472                         'default', user.login)
473            return default_hostnames
474        return user.drone_set.get_drone_hostnames()
475
476
477    def register_necessary_pidfiles(self):
478        pidfile_id = self._drone_manager.get_pidfile_id_from(
479                self._working_directory(), self._pidfile_name())
480        self._drone_manager.register_pidfile(pidfile_id)
481
482        paired_pidfile_id = self._paired_with_monitor().pidfile_id
483        if paired_pidfile_id:
484            self._drone_manager.register_pidfile(paired_pidfile_id)
485
486
487    def recover(self):
488        if not self._check_paired_results_exist():
489            return
490
491        self._create_monitor()
492        self.monitor.attach_to_existing_process(
493                self._working_directory(), pidfile_name=self._pidfile_name(),
494                num_processes=self.num_processes)
495        if not self.monitor.has_process():
496            # no process to recover; wait to be started normally
497            self.monitor = None
498            return
499
500        self.started = True
501        logging.info('Recovering process %s for %s at %s',
502                     self.monitor.get_process(), type(self).__name__,
503                     self._working_directory())
504
505
506    def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
507                                    allowed_host_statuses=None):
508        class_name = self.__class__.__name__
509        for entry in queue_entries:
510            if entry.status not in allowed_hqe_statuses:
511                # In the orignal code, here we raise an exception. In an
512                # effort to prevent downtime we will instead abort the job and
513                # send out an email notifying us this has occured.
514                error_message = ('%s attempting to start entry with invalid '
515                                 'status %s: %s. Aborting Job: %s.'
516                                 % (class_name, entry.status, entry,
517                                    entry.job))
518                logging.error(error_message)
519                email_manager.manager.enqueue_notify_email(
520                    'Job Aborted - Invalid Host Queue Entry Status',
521                    error_message)
522                entry.job.request_abort()
523            invalid_host_status = (
524                    allowed_host_statuses is not None
525                    and entry.host.status not in allowed_host_statuses)
526            if invalid_host_status:
527                # In the orignal code, here we raise an exception. In an
528                # effort to prevent downtime we will instead abort the job and
529                # send out an email notifying us this has occured.
530                error_message = ('%s attempting to start on queue entry with '
531                                 'invalid host status %s: %s. Aborting Job: %s'
532                                 % (class_name, entry.host.status, entry,
533                                    entry.job))
534                logging.error(error_message)
535                email_manager.manager.enqueue_notify_email(
536                    'Job Aborted - Invalid Host Status', error_message)
537                entry.job.request_abort()
538
539
540class TaskWithJobKeyvals(object):
541    """AgentTask mixin providing functionality to help with job keyval files."""
542    _KEYVAL_FILE = 'keyval'
543    def _format_keyval(self, key, value):
544        return '%s=%s' % (key, value)
545
546
547    def _keyval_path(self):
548        """Subclasses must override this"""
549        raise NotImplementedError
550
551
552    def _write_keyval_after_job(self, field, value):
553        assert self.monitor
554        if not self.monitor.has_process():
555            return
556        self._drone_manager.write_lines_to_file(
557            self._keyval_path(), [self._format_keyval(field, value)],
558            paired_with_process=self.monitor.get_process())
559
560
561    def _job_queued_keyval(self, job):
562        return 'job_queued', int(time.mktime(job.created_on.timetuple()))
563
564
565    def _write_job_finished(self):
566        self._write_keyval_after_job("job_finished", int(time.time()))
567
568
569    def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
570        keyval_contents = '\n'.join(self._format_keyval(key, value)
571                                    for key, value in keyval_dict.iteritems())
572        # always end with a newline to allow additional keyvals to be written
573        keyval_contents += '\n'
574        self._drone_manager.attach_file_to_execution(self._working_directory(),
575                                                keyval_contents,
576                                                file_path=keyval_path)
577
578
579    def _write_keyvals_before_job(self, keyval_dict):
580        self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
581
582
583    def _write_host_keyvals(self, host):
584        keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
585                                   host.hostname)
586        platform, all_labels = host.platform_and_labels()
587        all_labels = [ urllib.quote(label) for label in all_labels ]
588        keyval_dict = dict(platform=platform, labels=','.join(all_labels))
589        self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
590
591
592class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
593    """
594    Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
595    """
596
597    TASK_TYPE = None
598    host = None
599    queue_entry = None
600    _COUNT_METRIC = 'chromeos/autotest/scheduler/special_task_count'
601    _DUT_METRIC = 'chromeos/autotest/scheduler/special_task_by_dut'
602    _DURATION_METRIC = 'chromeos/autotest/scheduler/special_task_durations'
603
604
605    def __init__(self, task, extra_command_args):
606        super(SpecialAgentTask, self).__init__()
607
608        assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
609
610        self.host = rdb_lib.get_hosts([task.host.id])[0]
611        self.host.dbg_str = 'Task: %s' % str(task)
612        self.queue_entry = None
613        if task.queue_entry:
614            self.queue_entry = scheduler_models.HostQueueEntry(
615                    id=task.queue_entry.id)
616            self.host.dbg_str += self.queue_entry.get_dbg_str()
617
618        # This is of type SpecialTask (as defined in frontend/afe/models.py)
619        self.task = task
620        self._extra_command_args = extra_command_args
621        self.host.metadata = self.get_metadata()
622        self._milestone = ''
623
624
625    def get_metadata(self):
626        """Get a dictionary that contains task information.
627
628        The return value is a dictionary that includes task information like id,
629        name and related job information. The value will be stored in metadata
630        database.
631        @return: A dictionary containing the task id, name and related job id.
632                 If some attributes are failed to be accessed, an empty
633                 dictionary will be returned, and error will be logged.
634        """
635        try:
636            metadata = {'task_id':self.task.id, 'task_name':self.task.task,
637                        'hostname':self.task.host.hostname}
638            if self.task.queue_entry:
639                job = self.task.queue_entry.job
640                metadata.update(
641                        scheduler_models.get_job_metadata(job))
642            return metadata
643        except AttributeError as e:
644            logging.error('Task has missing attribute: %s', e)
645            return {}
646
647
648    def _keyval_path(self):
649        return os.path.join(self._working_directory(), self._KEYVAL_FILE)
650
651
652    def _command_line(self):
653        return autoserv_utils.autoserv_run_job_command(
654                autoserv_utils.autoserv_directory,
655                self.host.hostname,
656                results_directory=drone_manager.WORKING_DIRECTORY,
657                extra_args=self._extra_command_args,
658                queue_entry=self.queue_entry,
659                in_lab=True,
660        )
661
662
663    def _working_directory(self):
664        return self.task.execution_path()
665
666
667    @property
668    def owner_username(self):
669        if self.task.requested_by:
670            return self.task.requested_by.login
671        return None
672
673
674    def prolog(self):
675        super(SpecialAgentTask, self).prolog()
676        self.task.activate()
677        self._write_host_keyvals(self.host)
678
679
680    def _fail_queue_entry(self):
681        assert self.queue_entry
682
683        if self.queue_entry.meta_host:
684            return # don't fail metahost entries, they'll be reassigned
685
686        self.queue_entry.update_from_database()
687        if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
688            return # entry has been aborted
689
690        self._actually_fail_queue_entry()
691
692
693    def epilog(self):
694        super(SpecialAgentTask, self).epilog()
695        self._emit_special_task_status_metric()
696
697
698    def _emit_special_task_status_metric(self):
699        """Increments an accumulator associated with this special task."""
700        fields = {'type': self.TASK_TYPE,
701                  'success': bool(self.success),
702                  'board': str(self.host.board),
703                  'milestone': self._milestone}
704        metrics.Counter(self._COUNT_METRIC).increment(
705            fields=fields)
706
707        if (self.task.time_finished and self.task.time_started):
708            duration = (self.task.time_finished -
709                        self.task.time_started).total_seconds()
710            metrics.SecondsDistribution(self._DURATION_METRIC).add(
711                duration, fields=fields)
712
713        dut_fields = {
714            'type': self.TASK_TYPE,
715            'success': bool(self.success),
716            'board': str(self.host.board),
717            'dut_host_name': self.host.hostname
718        }
719        metrics.Counter(self._DUT_METRIC).increment(fields=dut_fields)
720
721    # TODO(milleral): http://crbug.com/268607
722    # All this used to be a part of _fail_queue_entry.  The
723    # exact semantics of when one should and should not be failing a queue
724    # entry need to be worked out, because provisioning has placed us in a
725    # case where we want to fail a queue entry that could be requeued,
726    # which makes us fail the two above if statements, and thus
727    # _fail_queue_entry() would exit early and have no effect.
728    # What's left here with _actually_fail_queue_entry is a hack to be able to
729    # bypass the checks and unconditionally execute the code.
730    def _actually_fail_queue_entry(self):
731        self.queue_entry.set_execution_subdir()
732        queued_key, queued_time = self._job_queued_keyval(
733            self.queue_entry.job)
734        self._write_keyval_after_job(queued_key, queued_time)
735        self._write_job_finished()
736
737        # copy results logs into the normal place for job results
738        self.monitor.try_copy_results_on_drone(
739                source_path=self._working_directory() + '/',
740                destination_path=self.queue_entry.execution_path() + '/')
741
742        pidfile_id = self._drone_manager.get_pidfile_id_from(
743                self.queue_entry.execution_path(),
744                pidfile_name=drone_manager.AUTOSERV_PID_FILE)
745        self._drone_manager.register_pidfile(pidfile_id)
746
747        # TODO(ayatane): This should obey self.queue_entry.job.parse_failed_repair
748        # But nothing sets self.queue_entry.job.parse_failed_repair?
749        # Check Git blame
750        self._parse_results([self.queue_entry])
751
752        # Also fail all other special tasks that have not yet run for this HQE
753        pending_tasks = models.SpecialTask.objects.filter(
754                queue_entry__id=self.queue_entry.id,
755                is_complete=0)
756        for task in pending_tasks:
757            task.finish(False)
758
759
760    def cleanup(self):
761        super(SpecialAgentTask, self).cleanup()
762
763        # We will consider an aborted task to be "Failed"
764        self.task.finish(bool(self.success))
765
766        if self.monitor:
767            if self.monitor.has_process():
768                self._copy_results([self.task])
769            if self.monitor.pidfile_id is not None:
770                self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
771
772
773    def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
774        """Remove a type of special task in all tasks, keep last one if needed.
775
776        @param special_task_to_remove: type of special task to be removed, e.g.,
777            models.SpecialTask.Task.VERIFY.
778        @param keep_last_one: True to keep the last special task if its type is
779            the same as of special_task_to_remove.
780
781        """
782        queued_special_tasks = models.SpecialTask.objects.filter(
783            host__id=self.host.id,
784            task=special_task_to_remove,
785            is_active=False, is_complete=False, queue_entry=None)
786        if keep_last_one:
787            queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
788        queued_special_tasks.delete()
789
790
791    def _generate_autoserv_label_args(self, task):
792        """
793        @param task: An instance of afe model's SpecialTask.
794        @returns: The list of arguments to pass to autoserv to tell it what the
795                  labels of a job are.
796
797        """
798        labels = {x.name for x in task.queue_entry.job.labels}
799        return ['--job-labels', ','.join(labels)]
800