#!/usr/bin/env python # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import glib import logging import random DEFAULT_MAX_RANDOM_DELAY_MS = 10000 _instance = None def get_instance(): """ Return the singleton instance of the TaskLoop class. """ global _instance if _instance is None: _instance = TaskLoop() return _instance class TaskLoop(object): """ The context to place asynchronous calls. This is a wrapper around the GLIB mainloop interface, exposing methods to place (delayed) asynchronous calls. In addition to wrapping around the GLIB API, this provides switches to control how delays are incorporated in method calls globally. This class is meant to be a singleton. Do not create an instance directly, use the module level function get_instance() instead. Running the TaskLoop is blocking for the caller. So use this class like so: tl = task_loop.get_instance() # Setup other things. # Add initial tasks to tl to do stuff, post more tasks, and make the world a # better place. tl.start() # This thread is now blocked. Some task should eventually call tl.stop() to continue here. @var ignore_delays: Flag to control if delayed tasks are posted immediately. @var random_delays: Flag to control if arbitrary delays are inserted between posted tasks. @var max_random_delay_ms: When random_delays is True, the maximum delay inserted between posted tasks. """ def __init__(self): self._logger = logging.getLogger(__name__) # Initialize properties self._ignore_delays = False self._random_delays = False self._max_random_delay_ms = DEFAULT_MAX_RANDOM_DELAY_MS # Get the mainloop so that tasks can be posted even before running the # task loop. self._mainloop = glib.MainLoop() # Initialize dictionary to track posted tasks. self._next_post_id = 0 self._posted_tasks = {} @property def ignore_delays(self): """ Boolean flag to control if delayed tasks are posted immediately. If True, all tasks posted henceforth are immediately marked active ignoring any delay requested. With this switch, all other delay related switches are ignored. """ return self._ignore_delays @ignore_delays.setter def ignore_delays(self, value): """ Set |ignore_delays|. @param value: Boolean value for the |ignore_delays| flag """ self._logger.debug('Turning %s delays ignored mode.', ('on' if value else 'off')) self._ignore_delays = value @property def random_delays(self): """ Boolean flag to control if random delays are inserted in posted tasks. If True, arbitrary delays in range [0, |max_random_delay_ms|] are inserted in all posted tasks henceforth, ignoring the actual delay requested. """ return self._random_delays @random_delays.setter def random_delays(self, value): """ Set |random_delays|. @param value: Boolean value for the random_delays flag. """ self._logger.debug('Turning %s random delays.', ('on' if value else 'off')) self._random_delays = value @property def max_random_delay_ms(self): """ The maximum arbitrary delay inserted in posted tasks in milliseconds. Type: int """ return self._max_random_delay_ms @max_random_delay_ms.setter def max_random_delay_ms(self, value): """ Set |max_random_delay_ms|. @param value: Non-negative int value for |max_random_delay_ms|. Negative values are clamped to 0. """ if value < 0: self._logger.warning( 'Can not set max_random_delay_ms to negative value %s. ' 'Setting to 0 instead.', value) value = 0 self._logger.debug('Set max random delay to %d. Random delay is %s', value, ('on' if self.random_delays else 'off')) self._max_random_delay_ms = value def start(self): """ Run the task loop. This call is blocking. The thread that calls TaskLoop.start(...) becomes the task loop itself and is blocked as such till TaskLoop.stop(...) is called. """ self._logger.info('Task Loop is now processing tasks...') self._mainloop.run() def stop(self): """ Stop the task loop. """ self._logger.info('Task Loop quitting.') self._mainloop.quit() def post_repeated_task(self, callback, delay_ms=0): """ Post the given callback repeatedly forever until cancelled. The posted callback must not expect any arguments. It likely does not make sense to provide fixed data parameters to a repeated task. Use the object reference to provide context. In the |ignore_delays| mode, the task is reposted immediately after dispatch. In the |random_delays| mode, a new arbitrary delay is inserted before each call to |callback|. @param callback: The function to call repeatedly. |callback| must expect an object reference as the only argument. The return value from |callback| is ignored. @param delay_ms: The delay between repeated calls to |callback|. The first call is also delayed by this amount. Default: 0 @return: An integer ID that can be used to cancel the posted task. """ assert callback is not None post_id = self._next_post_id self._next_post_id += 1 next_delay_ms = self._next_delay_ms(delay_ms) self._posted_tasks[post_id] = glib.timeout_add( next_delay_ms, TaskLoop._execute_repeated_task, self, post_id, callback, delay_ms) return post_id def post_task_after_delay(self, callback, delay_ms, *args, **kwargs): """ Post the given callback once to be dispatched after |delay_ms|. @param callback: The function to call. The function may expect arbitrary number of arguments, passed in as |*args| and |**kwargs|. The return value from |callback| is ignored. @param delay_ms: The delay before the call to |callback|. Default: 0 @return: An integer ID that can be used to cancel the posted task. """ assert callback is not None post_id = self._next_post_id self._next_post_id = self._next_post_id + 1 delay_ms = self._next_delay_ms(delay_ms) self._posted_tasks[post_id] = glib.timeout_add(delay_ms, callback, *args, **kwargs) return post_id def post_task(self, callback, *args, **kwargs): """ Post the given callback once. In |random_delays| mode, this function is equivalent to |post_task_after_delay|. @param callback: The function to call. The function may expect arbitrary number of arguments, passed in as |*args| and |**kwargs|. The return value from |callback| is ignored. @return: An integer ID that can be used to cancel the posted task. """ self._logger.debug('Task posted: %s', repr(callback)) self._logger.debug('Arguments: %s, Keyword arguments: %s', repr(args), repr(kwargs)) return self.post_task_after_delay(callback, 0, *args, **kwargs) def cancel_posted_task(self, post_id): """ Cancels a previously posted task that is yet to be dispatched. @param post_id: The |post_id| of the task to cancel, as returned by one of the functions that post a task. @return: True if the posted task was removed. """ if post_id in self._posted_tasks: retval = glib.source_remove(self._posted_tasks[post_id]) if retval: del self._posted_tasks[post_id] return retval else: return False def _next_delay_ms(self, user_delay_ms): """ Determine the actual delay to post the next task. The actual delay posted may be different from the user requested delay based on what mode we're in. @param user_delay_ms: The delay requested by the user. @return The actual delay to be posted. """ next_delay_ms = user_delay_ms if self.ignore_delays: next_delay_ms = 0 elif self.random_delays: next_delay_ms = random.randint(0, self.max_random_delay_ms) return next_delay_ms def _execute_repeated_task(self, post_id, callback, delay_ms): """ A wrapper to repost an executed task, and return False. We need this to be able to repost the task at arbitrary intervals. @param post_id: The private post_id tracking this repeated task. @param callback: The user callback that must be called. @param delay_ms: The user requested delay between calls. """ retval = callback() self._logger.debug('Ignored return value from repeated task: %s', repr(retval)) next_delay_ms = self._next_delay_ms(delay_ms) self._posted_tasks[post_id] = glib.timeout_add( next_delay_ms, TaskLoop._execute_repeated_task, self, post_id, callback, delay_ms) return False