1"""Object-oriented interface to SWF wrapping boto.swf.layer1.Layer1""" 2 3import time 4from functools import wraps 5from boto.swf.layer1 import Layer1 6from boto.swf.layer1_decisions import Layer1Decisions 7 8DEFAULT_CREDENTIALS = { 9 'aws_access_key_id': None, 10 'aws_secret_access_key': None 11} 12 13def set_default_credentials(aws_access_key_id, aws_secret_access_key): 14 """Set default credentials.""" 15 DEFAULT_CREDENTIALS.update({ 16 'aws_access_key_id': aws_access_key_id, 17 'aws_secret_access_key': aws_secret_access_key, 18 }) 19 20class SWFBase(object): 21 22 name = None 23 domain = None 24 aws_access_key_id = None 25 aws_secret_access_key = None 26 region = None 27 28 def __init__(self, **kwargs): 29 # Set default credentials. 30 for credkey in ('aws_access_key_id', 'aws_secret_access_key'): 31 if DEFAULT_CREDENTIALS.get(credkey): 32 setattr(self, credkey, DEFAULT_CREDENTIALS[credkey]) 33 # Override attributes with keyword args. 34 for kwarg in kwargs: 35 setattr(self, kwarg, kwargs[kwarg]) 36 37 self._swf = Layer1(self.aws_access_key_id, 38 self.aws_secret_access_key, 39 region=self.region) 40 41 def __repr__(self): 42 rep_str = str(self.name) 43 if hasattr(self, 'version'): 44 rep_str += '-' + str(getattr(self, 'version')) 45 return '<%s %r at 0x%x>' % (self.__class__.__name__, rep_str, id(self)) 46 47class Domain(SWFBase): 48 49 """Simple Workflow Domain.""" 50 51 description = None 52 retention = 30 53 @wraps(Layer1.describe_domain) 54 def describe(self): 55 """DescribeDomain.""" 56 return self._swf.describe_domain(self.name) 57 58 @wraps(Layer1.deprecate_domain) 59 def deprecate(self): 60 """DeprecateDomain""" 61 self._swf.deprecate_domain(self.name) 62 63 @wraps(Layer1.register_domain) 64 def register(self): 65 """RegisterDomain.""" 66 self._swf.register_domain(self.name, str(self.retention), 67 self.description) 68 69 @wraps(Layer1.list_activity_types) 70 def activities(self, status='REGISTERED', **kwargs): 71 """ListActivityTypes.""" 72 act_types = self._swf.list_activity_types(self.name, status, **kwargs) 73 act_objects = [] 74 for act_args in act_types['typeInfos']: 75 act_ident = act_args['activityType'] 76 del act_args['activityType'] 77 act_args.update(act_ident) 78 act_args.update({ 79 'aws_access_key_id': self.aws_access_key_id, 80 'aws_secret_access_key': self.aws_secret_access_key, 81 'domain': self.name, 82 'region': self.region, 83 }) 84 act_objects.append(ActivityType(**act_args)) 85 return act_objects 86 87 @wraps(Layer1.list_workflow_types) 88 def workflows(self, status='REGISTERED', **kwargs): 89 """ListWorkflowTypes.""" 90 wf_types = self._swf.list_workflow_types(self.name, status, **kwargs) 91 wf_objects = [] 92 for wf_args in wf_types['typeInfos']: 93 wf_ident = wf_args['workflowType'] 94 del wf_args['workflowType'] 95 wf_args.update(wf_ident) 96 wf_args.update({ 97 'aws_access_key_id': self.aws_access_key_id, 98 'aws_secret_access_key': self.aws_secret_access_key, 99 'domain': self.name, 100 'region': self.region, 101 }) 102 103 wf_objects.append(WorkflowType(**wf_args)) 104 return wf_objects 105 106 def executions(self, closed=False, **kwargs): 107 """List list open/closed executions. 108 109 For a full list of available parameters refer to 110 :py:func:`boto.swf.layer1.Layer1.list_closed_workflow_executions` and 111 :py:func:`boto.swf.layer1.Layer1.list_open_workflow_executions` 112 """ 113 if closed: 114 executions = self._swf.list_closed_workflow_executions(self.name, 115 **kwargs) 116 else: 117 if 'oldest_date' not in kwargs: 118 # Last 24 hours. 119 kwargs['oldest_date'] = time.time() - (3600 * 24) 120 executions = self._swf.list_open_workflow_executions(self.name, 121 **kwargs) 122 exe_objects = [] 123 for exe_args in executions['executionInfos']: 124 for nested_key in ('execution', 'workflowType'): 125 nested_dict = exe_args[nested_key] 126 del exe_args[nested_key] 127 exe_args.update(nested_dict) 128 129 exe_args.update({ 130 'aws_access_key_id': self.aws_access_key_id, 131 'aws_secret_access_key': self.aws_secret_access_key, 132 'domain': self.name, 133 'region': self.region, 134 }) 135 136 exe_objects.append(WorkflowExecution(**exe_args)) 137 return exe_objects 138 139 @wraps(Layer1.count_pending_activity_tasks) 140 def count_pending_activity_tasks(self, task_list): 141 """CountPendingActivityTasks.""" 142 return self._swf.count_pending_activity_tasks(self.name, task_list) 143 144 @wraps(Layer1.count_pending_decision_tasks) 145 def count_pending_decision_tasks(self, task_list): 146 """CountPendingDecisionTasks.""" 147 return self._swf.count_pending_decision_tasks(self.name, task_list) 148 149 150class Actor(SWFBase): 151 152 task_list = None 153 last_tasktoken = None 154 domain = None 155 156 def run(self): 157 """To be overloaded by subclasses.""" 158 raise NotImplementedError() 159 160class ActivityWorker(Actor): 161 162 """Base class for SimpleWorkflow activity workers.""" 163 164 @wraps(Layer1.respond_activity_task_canceled) 165 def cancel(self, task_token=None, details=None): 166 """RespondActivityTaskCanceled.""" 167 if task_token is None: 168 task_token = self.last_tasktoken 169 return self._swf.respond_activity_task_canceled(task_token, details) 170 171 @wraps(Layer1.respond_activity_task_completed) 172 def complete(self, task_token=None, result=None): 173 """RespondActivityTaskCompleted.""" 174 if task_token is None: 175 task_token = self.last_tasktoken 176 return self._swf.respond_activity_task_completed(task_token, result) 177 178 @wraps(Layer1.respond_activity_task_failed) 179 def fail(self, task_token=None, details=None, reason=None): 180 """RespondActivityTaskFailed.""" 181 if task_token is None: 182 task_token = self.last_tasktoken 183 return self._swf.respond_activity_task_failed(task_token, details, 184 reason) 185 186 @wraps(Layer1.record_activity_task_heartbeat) 187 def heartbeat(self, task_token=None, details=None): 188 """RecordActivityTaskHeartbeat.""" 189 if task_token is None: 190 task_token = self.last_tasktoken 191 return self._swf.record_activity_task_heartbeat(task_token, details) 192 193 @wraps(Layer1.poll_for_activity_task) 194 def poll(self, **kwargs): 195 """PollForActivityTask.""" 196 task_list = self.task_list 197 if 'task_list' in kwargs: 198 task_list = kwargs.get('task_list') 199 del kwargs['task_list'] 200 task = self._swf.poll_for_activity_task(self.domain, task_list, 201 **kwargs) 202 self.last_tasktoken = task.get('taskToken') 203 return task 204 205class Decider(Actor): 206 207 """Base class for SimpleWorkflow deciders.""" 208 209 @wraps(Layer1.respond_decision_task_completed) 210 def complete(self, task_token=None, decisions=None, **kwargs): 211 """RespondDecisionTaskCompleted.""" 212 if isinstance(decisions, Layer1Decisions): 213 # Extract decision list from a Layer1Decisions instance. 214 decisions = decisions._data 215 if task_token is None: 216 task_token = self.last_tasktoken 217 return self._swf.respond_decision_task_completed(task_token, decisions, 218 **kwargs) 219 220 @wraps(Layer1.poll_for_decision_task) 221 def poll(self, **kwargs): 222 """PollForDecisionTask.""" 223 task_list = self.task_list 224 if 'task_list' in kwargs: 225 task_list = kwargs.get('task_list') 226 del kwargs['task_list'] 227 decision_task = self._swf.poll_for_decision_task(self.domain, task_list, 228 **kwargs) 229 self.last_tasktoken = decision_task.get('taskToken') 230 return decision_task 231 232class WorkflowType(SWFBase): 233 234 """A versioned workflow type.""" 235 236 version = None 237 task_list = None 238 child_policy = 'TERMINATE' 239 240 @wraps(Layer1.describe_workflow_type) 241 def describe(self): 242 """DescribeWorkflowType.""" 243 return self._swf.describe_workflow_type(self.domain, self.name, 244 self.version) 245 @wraps(Layer1.register_workflow_type) 246 def register(self, **kwargs): 247 """RegisterWorkflowType.""" 248 args = { 249 'default_execution_start_to_close_timeout': '3600', 250 'default_task_start_to_close_timeout': '300', 251 'default_child_policy': 'TERMINATE', 252 } 253 args.update(kwargs) 254 self._swf.register_workflow_type(self.domain, self.name, self.version, 255 **args) 256 257 @wraps(Layer1.deprecate_workflow_type) 258 def deprecate(self): 259 """DeprecateWorkflowType.""" 260 self._swf.deprecate_workflow_type(self.domain, self.name, self.version) 261 262 @wraps(Layer1.start_workflow_execution) 263 def start(self, **kwargs): 264 """StartWorkflowExecution.""" 265 if 'workflow_id' in kwargs: 266 workflow_id = kwargs['workflow_id'] 267 del kwargs['workflow_id'] 268 else: 269 workflow_id = '%s-%s-%i' % (self.name, self.version, time.time()) 270 271 for def_attr in ('task_list', 'child_policy'): 272 kwargs[def_attr] = kwargs.get(def_attr, getattr(self, def_attr)) 273 run_id = self._swf.start_workflow_execution(self.domain, workflow_id, 274 self.name, self.version, **kwargs)['runId'] 275 return WorkflowExecution(name=self.name, version=self.version, 276 runId=run_id, domain=self.domain, workflowId=workflow_id, 277 aws_access_key_id=self.aws_access_key_id, 278 aws_secret_access_key=self.aws_secret_access_key) 279 280class WorkflowExecution(SWFBase): 281 282 """An instance of a workflow.""" 283 284 workflowId = None 285 runId = None 286 287 @wraps(Layer1.signal_workflow_execution) 288 def signal(self, signame, **kwargs): 289 """SignalWorkflowExecution.""" 290 self._swf.signal_workflow_execution(self.domain, signame, 291 self.workflowId, **kwargs) 292 293 @wraps(Layer1.terminate_workflow_execution) 294 def terminate(self, **kwargs): 295 """TerminateWorkflowExecution (p. 103).""" 296 return self._swf.terminate_workflow_execution(self.domain, 297 self.workflowId, **kwargs) 298 299 @wraps(Layer1.get_workflow_execution_history) 300 def history(self, **kwargs): 301 """GetWorkflowExecutionHistory.""" 302 return self._swf.get_workflow_execution_history(self.domain, self.runId, 303 self.workflowId, **kwargs)['events'] 304 305 @wraps(Layer1.describe_workflow_execution) 306 def describe(self): 307 """DescribeWorkflowExecution.""" 308 return self._swf.describe_workflow_execution(self.domain, self.runId, 309 self.workflowId) 310 311 @wraps(Layer1.request_cancel_workflow_execution) 312 def request_cancel(self): 313 """RequestCancelWorkflowExecution.""" 314 return self._swf.request_cancel_workflow_execution(self.domain, 315 self.workflowId, self.runId) 316 317 318class ActivityType(SWFBase): 319 320 """A versioned activity type.""" 321 322 version = None 323 324 @wraps(Layer1.deprecate_activity_type) 325 def deprecate(self): 326 """DeprecateActivityType.""" 327 return self._swf.deprecate_activity_type(self.domain, self.name, 328 self.version) 329 330 @wraps(Layer1.describe_activity_type) 331 def describe(self): 332 """DescribeActivityType.""" 333 return self._swf.describe_activity_type(self.domain, self.name, 334 self.version) 335 336 @wraps(Layer1.register_activity_type) 337 def register(self, **kwargs): 338 """RegisterActivityType.""" 339 args = { 340 'default_task_heartbeat_timeout': '600', 341 'default_task_schedule_to_close_timeout': '3900', 342 'default_task_schedule_to_start_timeout': '300', 343 'default_task_start_to_close_timeout': '3600', 344 } 345 args.update(kwargs) 346 self._swf.register_activity_type(self.domain, self.name, self.version, 347 **args) 348