• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 #pylint: disable-msg=C0111
2 
3 """ This is the module for everything related to the AgentTask.
4 
5 The BaseAgentTask imposes an interface through which the scheduler can monitor
6 a processes; Examples of such processes include Verify, Cleanup and the Queue
7 Tasks that run the tests. The scheduler itself only understands Agents.
8 Agents:
9     The Agent is the bridge between the scheduler and the AgentTask. The
10     schedulers tick has a method called handle_agents, which calls the
11     tick of each agent in the Dispatchers queue. This leads to the Agent
12     polling its AgentTask. The scheduler will keep polling a task through
13     the associated Agent till the Agent is removed from the dispatcher.
14 
15     At a high level:
16         agents finished = tasks done
17         agent polls till finished
18             task polls till done
19                 task sets done
20         agent is removed from dispatcher
21 AgentTasks:
22     Basic AgentTasks are created when an hqe changes state. Examples of these
23     are the QueueTask, which is created when a hqe goes into the Starting state
24     and the FinalReparseTask, which is created when the hqe goes into parsing.
25 SpecialAgentTasks:
26     Unlike AgentTasks, SpecialAgentTasks are only created when a row is inserted
27     in the afe_special_tasks table. All PrejobTasks are SpecialAgentTasks.
28 
29 Monitor_db.get_agent_task_for_special_task/get_agent_task_for_queue_entry maps
30 an AgentTask to an Agent, which the scheduler understands. From this point
31 onward, the scheduler manages the task through the Agents interface,as follows:
32 At a high level:
33     task poll
34         start
35             prolog
36         tick till we get an exit code
37         finished(exit==0)
38             done=True
39             epilog
40                 cleanup
41                     set is_active, is_complete, success (checked in scheduler)
42 
43 The first special task for an HQE is usually Reset.
44 -poll: The first poll will start the task, polls thereafter will call the tasks
45        tick method. A started task will have the started bit set.
46 - start: Call prolog, run the process and set the start bit.
47     - prolog: Usually where one puts any model state changes that happen before
48               the actual task. Different per Task. Examples of things that might
49               happen in a prolog:
50                   - state of Host, HQE (to something like Resetting)
51                   - delete any unwanted queued special tasks
52                   - register a pidfile
53                   - set the is_active bit on the special task
54     - run:
55         - create a PidfileRunMonitor
56         - pass the autoserv command, working directory etc to drone manager.
57           This will start the actual autoserv process.
58    - set the start bit: so subsequent polls do not 'start' again
59 
60 - tick: For as long as a started tasks done bit is not set, a poll will lead
61         to a tick. The tick monitors the pid file of the autoserv process
62         running on the drone through the PidfileRunMonitor created in prolog.
63         If the autoserv process has finished we call finished with true/false
64         depending on autoserv exit code.
65 
66         - finished: sets the done and success values, then calls epilog. The
67                     done bit is important because the Agent polls this bit to
68                     measure the success or failure of its task.
69 
70             - epilog: Is generally where we set status of the Host/HQE again,
71                       requeue any other task that needs to run after this one
72                       and perform cleanup. Just like the prolog, this step is
73                       different per task.
74 
75                       - cleanup: Sets the is_active and is_complete and success
76                                  states on the tasks model. Also uses the
77                                  drone_manager to:
78                                     unregister the pidfile
79                                     copy results of the task
80                                  (Note this is not to be confused with the
81                                   special task called cleanup).
82 
83                       The actions we take in the epilog are based on the
84                       success/failure of the autoserv process set in cleanup,
85                       eg: if reset failed we will enqueue a repair, but if all
86                       is well the epilog will just return. Prejob task epilogs
87                       also have an on_pending method that change the status of
88                       the HQE to pending/starting, which gets picked up in the
89                       scheduler.
90 By this point the is_done flag is set, which results in the Agent noticing that
91 the task has finished and unregistering it from the dispatcher.Class hierarchy:
92 BaseAgentTask
93  |--->SpecialAgentTask (prejob_task.py)
94       |--->RepairTask
95       |--->PreJobTask
96            |--->Verify, Cleanup, Reset, Provision
97 
98  |--->AbstractQueueTask (monitor_db.py)
99       |--->QueueTask
100       |--->HostlessQueueTask
101 
102  |--->PostJobTask (postjob_task.py)
103       |--->GatherLogsTask
104       |--->SelfThrottledPostJobTask
105             |--->FinalReparseTask
106             |--->ArchiveResultsTask
107 
108 """
109 
110 import logging
111 import os
112 import urllib
113 import time
114 
115 from autotest_lib.client.common_lib import global_config
116 from autotest_lib.client.common_lib import utils
117 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
118 from autotest_lib.frontend.afe import models
119 from autotest_lib.scheduler import drone_manager, pidfile_monitor
120 from autotest_lib.scheduler import scheduler_lib
121 from autotest_lib.scheduler import rdb_lib
122 from autotest_lib.scheduler import scheduler_models
123 from autotest_lib.server import autoserv_utils
124 from autotest_lib.server import system_utils
125 
126 CONFIG = global_config.global_config
127 AUTOSERV_NICE_LEVEL = 10
128 
129 ENABLE_DRONE_IN_RESTRICTED_SUBNET = CONFIG.get_config_value(
130         'CROS', 'enable_drone_in_restricted_subnet', type=bool,
131         default=False)
132 
133 
134 class BaseAgentTask(object):
135     class _NullMonitor(object):
136         pidfile_id = None
137 
138         def has_process(self):
139             return True
140 
141 
142     def __init__(self, log_file_name=None):
143         """
144         @param log_file_name: (optional) name of file to log command output to
145         """
146         self._drone_manager = drone_manager.instance()
147         self.done = False
148         self.started = False
149         self.success = None
150         self.aborted = False
151         self.monitor = None
152         self.queue_entry_ids = []
153         self.host_ids = []
154         # A map between host id and hostname.
155         self.hostnames = {}
156         self._log_file_name = log_file_name
157 
158 
159     def _set_ids(self, host=None, queue_entries=None):
160         if queue_entries and queue_entries != [None]:
161             self.host_ids = [entry.host.id for entry in queue_entries]
162             self.queue_entry_ids = [entry.id for entry in queue_entries]
163             self.hostnames = dict((entry.host.id, entry.host.hostname)
164                                   for entry in queue_entries)
165         else:
166             assert host
167             self.host_ids = [host.id]
168             self.hostnames = {host.id: host.hostname}
169 
170 
171     def poll(self):
172         if not self.started:
173             self.start()
174         if not self.done:
175             self.tick()
176 
177 
178     def tick(self):
179         assert self.monitor
180         exit_code = self.monitor.exit_code()
181         if exit_code is None:
182             return
183 
184         success = (exit_code == 0)
185         self.finished(success)
186 
187 
188     def is_done(self):
189         return self.done
190 
191 
192     def finished(self, success):
193         if self.done:
194             assert self.started
195             return
196         self.started = True
197         self.done = True
198         self.success = success
199         self.epilog()
200 
201 
202     def prolog(self):
203         """
204         To be overridden.
205         """
206         assert not self.monitor
207         self.register_necessary_pidfiles()
208 
209 
210     def _log_file(self):
211         if not self._log_file_name:
212             return None
213         return os.path.join(self._working_directory(), self._log_file_name)
214 
215 
216     def cleanup(self):
217         log_file = self._log_file()
218         if self.monitor and log_file:
219             self.monitor.try_copy_to_results_repository(log_file)
220 
221 
222     def epilog(self):
223         """
224         To be overridden.
225         """
226         self.cleanup()
227         logging.info("%s finished with success=%s", type(self).__name__,
228                      self.success)
229 
230 
231     def start(self):
232         if not self.started:
233             self.prolog()
234             self.run()
235 
236         self.started = True
237 
238 
239     def abort(self):
240         if self.monitor:
241             self.monitor.kill()
242         self.done = True
243         self.aborted = True
244         self.cleanup()
245 
246 
247     def _get_consistent_execution_path(self, execution_entries):
248         first_execution_path = execution_entries[0].execution_path()
249         for execution_entry in execution_entries[1:]:
250             assert execution_entry.execution_path() == first_execution_path, (
251                 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
252                                         execution_entry,
253                                         first_execution_path,
254                                         execution_entries[0]))
255         return first_execution_path
256 
257 
258     def _copy_results(self, execution_entries, use_monitor=None):
259         """
260         @param execution_entries: list of objects with execution_path() method
261         """
262         if use_monitor is not None and not use_monitor.has_process():
263             return
264 
265         assert len(execution_entries) > 0
266         if use_monitor is None:
267             assert self.monitor
268             use_monitor = self.monitor
269         assert use_monitor.has_process()
270         execution_path = self._get_consistent_execution_path(execution_entries)
271         results_path = execution_path + '/'
272         use_monitor.try_copy_to_results_repository(results_path)
273 
274 
275     def _parse_results(self, queue_entries):
276         for queue_entry in queue_entries:
277             queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
278 
279 
280     def _archive_results(self, queue_entries):
281         for queue_entry in queue_entries:
282             queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
283 
284 
285     def _command_line(self):
286         """
287         Return the command line to run.  Must be overridden.
288         """
289         raise NotImplementedError
290 
291 
292     @property
293     def num_processes(self):
294         """
295         Return the number of processes forked by this BaseAgentTask's process.
296         It may only be approximate.  To be overridden if necessary.
297         """
298         return 1
299 
300 
301     def _paired_with_monitor(self):
302         """
303         If this BaseAgentTask's process must run on the same machine as some
304         previous process, this method should be overridden to return a
305         PidfileRunMonitor for that process.
306         """
307         return self._NullMonitor()
308 
309 
310     @property
311     def owner_username(self):
312         """
313         Return login of user responsible for this task.  May be None.  Must be
314         overridden.
315         """
316         raise NotImplementedError
317 
318 
319     def _working_directory(self):
320         """
321         Return the directory where this BaseAgentTask's process executes.
322         Must be overridden.
323         """
324         raise NotImplementedError
325 
326 
327     def _pidfile_name(self):
328         """
329         Return the name of the pidfile this BaseAgentTask's process uses.  To be
330         overridden if necessary.
331         """
332         return drone_manager.AUTOSERV_PID_FILE
333 
334 
335     def _check_paired_results_exist(self):
336         if not self._paired_with_monitor().has_process():
337             metadata = {
338                     '_type': 'scheduler_error',
339                     'error': 'No paired results in task',
340                     'task': str(self),
341                     'pidfile_id': str(self._paired_with_monitor().pidfile_id)}
342             autotest_stats.Counter('no_paired_results_in_task',
343                                    metadata=metadata).increment()
344             self.finished(False)
345             return False
346         return True
347 
348 
349     def _create_monitor(self):
350         assert not self.monitor
351         self.monitor = pidfile_monitor.PidfileRunMonitor()
352 
353 
354     def run(self):
355         if not self._check_paired_results_exist():
356             return
357 
358         self._create_monitor()
359         self.monitor.run(
360                 self._command_line(), self._working_directory(),
361                 num_processes=self.num_processes,
362                 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
363                 pidfile_name=self._pidfile_name(),
364                 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
365                 username=self.owner_username,
366                 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
367 
368 
369     def get_drone_hostnames_allowed(
370             self, restricted_subnets=utils.RESTRICTED_SUBNETS,
371             enable_drone_in_subnet=ENABLE_DRONE_IN_RESTRICTED_SUBNET):
372         filtered_drones = None
373         has_unrestricted_host = False
374         if (self.hostnames and restricted_subnets and enable_drone_in_subnet):
375             for hostname in self.hostnames.values():
376                 subnet = utils.get_restricted_subnet(hostname,
377                                                      restricted_subnets)
378 
379                 # Return an empty set if the list of hosts exists both in
380                 # restricted and unrestricted subnet. No drone can work in such
381                 # case.
382                 if ((not subnet and filtered_drones is not None) or
383                     (subnet and has_unrestricted_host)):
384                     logging.error('The test has some DUT in restricted subnet, '
385                                   'but some in unrestricted subnet. Therefore, '
386                                   'no drone is available to run the test.')
387                     return set()
388 
389                 if not subnet:
390                     has_unrestricted_host = True
391                     continue
392 
393                 server_ip_map=system_utils.DroneCache.get_drone_ip_map()
394                 filtered_drones_for_host = set(
395                         utils.get_servers_in_same_subnet(
396                                 subnet[0], subnet[1],
397                                 server_ip_map=server_ip_map))
398                 logging.info('DUT %s is in restricted subnet, drone can only '
399                              'be chosen from %s', hostname,
400                              filtered_drones_for_host)
401                 if filtered_drones is None:
402                     filtered_drones = filtered_drones_for_host
403                 else:
404                     filtered_drones = set.intersection(
405                             filtered_drones, filtered_drones_for_host)
406 
407                 # If filtered_drones is an empty set, that means no drone is
408                 # allowed to run the task. This is different fron None, which
409                 # means all drones are allowed.
410                 if filtered_drones == set():
411                     logging.error('DUT(s) is in restricted subnet, but no '
412                                   'drone is available to run the test.')
413                     return filtered_drones
414 
415         # If host is not in restricted subnet, use the unrestricted drones only.
416         if (filtered_drones is None and restricted_subnets and
417             enable_drone_in_subnet):
418             filtered_drones = set(
419                     system_utils.DroneCache.get_unrestricted_drones(
420                             restricted_subnets=restricted_subnets))
421 
422         if not models.DroneSet.drone_sets_enabled():
423             return filtered_drones
424 
425         hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
426         if not hqes:
427             # Only special tasks could be missing host queue entries
428             assert isinstance(self, SpecialAgentTask)
429             return self._user_or_global_default_drone_set(
430                     self.task, self.task.requested_by)
431 
432         job_ids = hqes.values_list('job', flat=True).distinct()
433         assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
434                                       "span multiple jobs")
435 
436         job = models.Job.objects.get(id=job_ids[0])
437         drone_set = job.drone_set
438         if not drone_set:
439             return self._user_or_global_default_drone_set(job, job.user())
440 
441         if filtered_drones:
442             return set.intersection(filtered_drones,
443                                     drone_set.get_drone_hostnames())
444         else:
445             return drone_set.get_drone_hostnames()
446 
447 
448     def _user_or_global_default_drone_set(self, obj_with_owner, user):
449         """
450         Returns the user's default drone set, if present.
451 
452         Otherwise, returns the global default drone set.
453         """
454         default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
455         if not user:
456             logging.warning('%s had no owner; using default drone set',
457                          obj_with_owner)
458             return default_hostnames
459         if not user.drone_set:
460             logging.warning('User %s has no default drone set, using global '
461                          'default', user.login)
462             return default_hostnames
463         return user.drone_set.get_drone_hostnames()
464 
465 
466     def register_necessary_pidfiles(self):
467         pidfile_id = self._drone_manager.get_pidfile_id_from(
468                 self._working_directory(), self._pidfile_name())
469         self._drone_manager.register_pidfile(pidfile_id)
470 
471         paired_pidfile_id = self._paired_with_monitor().pidfile_id
472         if paired_pidfile_id:
473             self._drone_manager.register_pidfile(paired_pidfile_id)
474 
475 
476     def recover(self):
477         if not self._check_paired_results_exist():
478             return
479 
480         self._create_monitor()
481         self.monitor.attach_to_existing_process(
482                 self._working_directory(), pidfile_name=self._pidfile_name(),
483                 num_processes=self.num_processes)
484         if not self.monitor.has_process():
485             # no process to recover; wait to be started normally
486             self.monitor = None
487             return
488 
489         self.started = True
490         logging.info('Recovering process %s for %s at %s',
491                      self.monitor.get_process(), type(self).__name__,
492                      self._working_directory())
493 
494 
495     def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
496                                     allowed_host_statuses=None):
497         class_name = self.__class__.__name__
498         for entry in queue_entries:
499             if entry.status not in allowed_hqe_statuses:
500                 raise scheduler_lib.SchedulerError(
501                         '%s attempting to start entry with invalid status %s: '
502                         '%s' % (class_name, entry.status, entry))
503             invalid_host_status = (
504                     allowed_host_statuses is not None
505                     and entry.host.status not in allowed_host_statuses)
506             if invalid_host_status:
507                 raise scheduler_lib.SchedulerError(
508                         '%s attempting to start on queue entry with invalid '
509                         'host status %s: %s'
510                         % (class_name, entry.host.status, entry))
511 
512 
513 SiteAgentTask = utils.import_site_class(
514     __file__, 'autotest_lib.scheduler.site_monitor_db',
515     'SiteAgentTask', BaseAgentTask)
516 
517 class AgentTask(SiteAgentTask):
518     pass
519 
520 
521 class TaskWithJobKeyvals(object):
522     """AgentTask mixin providing functionality to help with job keyval files."""
523     _KEYVAL_FILE = 'keyval'
524     def _format_keyval(self, key, value):
525         return '%s=%s' % (key, value)
526 
527 
528     def _keyval_path(self):
529         """Subclasses must override this"""
530         raise NotImplementedError
531 
532 
533     def _write_keyval_after_job(self, field, value):
534         assert self.monitor
535         if not self.monitor.has_process():
536             return
537         self._drone_manager.write_lines_to_file(
538             self._keyval_path(), [self._format_keyval(field, value)],
539             paired_with_process=self.monitor.get_process())
540 
541 
542     def _job_queued_keyval(self, job):
543         return 'job_queued', int(time.mktime(job.created_on.timetuple()))
544 
545 
546     def _write_job_finished(self):
547         self._write_keyval_after_job("job_finished", int(time.time()))
548 
549 
550     def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
551         keyval_contents = '\n'.join(self._format_keyval(key, value)
552                                     for key, value in keyval_dict.iteritems())
553         # always end with a newline to allow additional keyvals to be written
554         keyval_contents += '\n'
555         self._drone_manager.attach_file_to_execution(self._working_directory(),
556                                                 keyval_contents,
557                                                 file_path=keyval_path)
558 
559 
560     def _write_keyvals_before_job(self, keyval_dict):
561         self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
562 
563 
564     def _write_host_keyvals(self, host):
565         keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
566                                    host.hostname)
567         platform, all_labels = host.platform_and_labels()
568         all_labels = [ urllib.quote(label) for label in all_labels ]
569         keyval_dict = dict(platform=platform, labels=','.join(all_labels))
570         self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
571 
572 
573 class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
574     """
575     Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
576     """
577 
578     TASK_TYPE = None
579     host = None
580     queue_entry = None
581 
582     def __init__(self, task, extra_command_args):
583         super(SpecialAgentTask, self).__init__()
584 
585         assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
586 
587         self.host = rdb_lib.get_hosts([task.host.id])[0]
588         self.host.dbg_str = 'Task: %s' % str(task)
589         self.queue_entry = None
590         if task.queue_entry:
591             self.queue_entry = scheduler_models.HostQueueEntry(
592                     id=task.queue_entry.id)
593             self.host.dbg_str += self.queue_entry.get_dbg_str()
594 
595         self.task = task
596         self._extra_command_args = extra_command_args
597         self.host.metadata = self.get_metadata()
598 
599 
600     def get_metadata(self):
601         """Get a dictionary that contains task information.
602 
603         The return value is a dictionary that includes task information like id,
604         name and related job information. The value will be stored in metadata
605         database.
606         @return: A dictionary containing the task id, name and related job id.
607                  If some attributes are failed to be accessed, an empty
608                  dictionary will be returned, and error will be logged.
609         """
610         try:
611             metadata = {'task_id':self.task.id, 'task_name':self.task.task,
612                         'hostname':self.task.host.hostname}
613             if self.task.queue_entry:
614                 job = self.task.queue_entry.job
615                 metadata.update(
616                         scheduler_models.get_job_metadata(job))
617             return metadata
618         except AttributeError as e:
619             logging.error('Task has missing attribute: %s', e)
620             return {}
621 
622 
623     def _keyval_path(self):
624         return os.path.join(self._working_directory(), self._KEYVAL_FILE)
625 
626 
627     def _command_line(self):
628         return autoserv_utils._autoserv_command_line(self.host.hostname,
629                                                      self._extra_command_args,
630                                                      queue_entry=self.queue_entry,
631                                                      in_lab=True)
632 
633 
634     def _working_directory(self):
635         return self.task.execution_path()
636 
637 
638     @property
639     def owner_username(self):
640         if self.task.requested_by:
641             return self.task.requested_by.login
642         return None
643 
644 
645     def prolog(self):
646         super(SpecialAgentTask, self).prolog()
647         self.task.activate()
648         self._write_host_keyvals(self.host)
649 
650 
651     def _fail_queue_entry(self):
652         assert self.queue_entry
653 
654         if self.queue_entry.meta_host:
655             return # don't fail metahost entries, they'll be reassigned
656 
657         self.queue_entry.update_from_database()
658         if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
659             return # entry has been aborted
660 
661         self._actually_fail_queue_entry()
662 
663 
664     # TODO(milleral): http://crbug.com/268607
665     # All this used to be a part of _fail_queue_entry.  The
666     # exact semantics of when one should and should not be failing a queue
667     # entry need to be worked out, because provisioning has placed us in a
668     # case where we want to fail a queue entry that could be requeued,
669     # which makes us fail the two above if statements, and thus
670     # _fail_queue_entry() would exit early and have no effect.
671     # What's left here with _actually_fail_queue_entry is a hack to be able to
672     # bypass the checks and unconditionally execute the code.
673     def _actually_fail_queue_entry(self):
674         self.queue_entry.set_execution_subdir()
675         queued_key, queued_time = self._job_queued_keyval(
676             self.queue_entry.job)
677         self._write_keyval_after_job(queued_key, queued_time)
678         self._write_job_finished()
679 
680         # copy results logs into the normal place for job results
681         self.monitor.try_copy_results_on_drone(
682                 source_path=self._working_directory() + '/',
683                 destination_path=self.queue_entry.execution_path() + '/')
684 
685         pidfile_id = self._drone_manager.get_pidfile_id_from(
686                 self.queue_entry.execution_path(),
687                 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
688         self._drone_manager.register_pidfile(pidfile_id)
689 
690         if self.queue_entry.job.parse_failed_repair:
691             self._parse_results([self.queue_entry])
692         else:
693             self._archive_results([self.queue_entry])
694 
695         # Also fail all other special tasks that have not yet run for this HQE
696         pending_tasks = models.SpecialTask.objects.filter(
697                 queue_entry__id=self.queue_entry.id,
698                 is_complete=0)
699         for task in pending_tasks:
700             task.finish(False)
701 
702 
703     def cleanup(self):
704         super(SpecialAgentTask, self).cleanup()
705 
706         # We will consider an aborted task to be "Failed"
707         self.task.finish(bool(self.success))
708 
709         if self.monitor:
710             if self.monitor.has_process():
711                 self._copy_results([self.task])
712             if self.monitor.pidfile_id is not None:
713                 self._drone_manager.unregister_pidfile(self.monitor.pidfile_id)
714 
715 
716     def remove_special_tasks(self, special_task_to_remove, keep_last_one=False):
717         """Remove a type of special task in all tasks, keep last one if needed.
718 
719         @param special_task_to_remove: type of special task to be removed, e.g.,
720             models.SpecialTask.Task.VERIFY.
721         @param keep_last_one: True to keep the last special task if its type is
722             the same as of special_task_to_remove.
723 
724         """
725         queued_special_tasks = models.SpecialTask.objects.filter(
726             host__id=self.host.id,
727             task=special_task_to_remove,
728             is_active=False, is_complete=False, queue_entry=None)
729         if keep_last_one:
730             queued_special_tasks = queued_special_tasks.exclude(id=self.task.id)
731         queued_special_tasks.delete()
732 
733 
734     def _generate_autoserv_label_args(self, task):
735         """
736         @param task: An instance of afe model's SpecialTask.
737         @returns: The list of arguments to pass to autoserv to tell it what the
738                   labels of a job are.
739 
740         """
741         labels = {x.name for x in task.queue_entry.job.labels}
742         return ['--job-labels', ','.join(labels)]
743