• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1"""Object-oriented interface to SWF wrapping boto.swf.layer1.Layer1"""
2
3import time
4from functools import wraps
5from boto.swf.layer1 import Layer1
6from boto.swf.layer1_decisions import Layer1Decisions
7
8DEFAULT_CREDENTIALS = {
9    'aws_access_key_id': None,
10    'aws_secret_access_key': None
11}
12
13def set_default_credentials(aws_access_key_id, aws_secret_access_key):
14    """Set default credentials."""
15    DEFAULT_CREDENTIALS.update({
16        'aws_access_key_id': aws_access_key_id,
17        'aws_secret_access_key': aws_secret_access_key,
18    })
19
20class SWFBase(object):
21
22    name = None
23    domain = None
24    aws_access_key_id = None
25    aws_secret_access_key = None
26    region = None
27
28    def __init__(self, **kwargs):
29        # Set default credentials.
30        for credkey in ('aws_access_key_id', 'aws_secret_access_key'):
31            if DEFAULT_CREDENTIALS.get(credkey):
32                setattr(self, credkey, DEFAULT_CREDENTIALS[credkey])
33        # Override attributes with keyword args.
34        for kwarg in kwargs:
35            setattr(self, kwarg, kwargs[kwarg])
36
37        self._swf = Layer1(self.aws_access_key_id,
38                           self.aws_secret_access_key,
39                           region=self.region)
40
41    def __repr__(self):
42        rep_str = str(self.name)
43        if hasattr(self, 'version'):
44            rep_str += '-' + str(getattr(self, 'version'))
45        return '<%s %r at 0x%x>' % (self.__class__.__name__, rep_str, id(self))
46
47class Domain(SWFBase):
48
49    """Simple Workflow Domain."""
50
51    description = None
52    retention = 30
53    @wraps(Layer1.describe_domain)
54    def describe(self):
55        """DescribeDomain."""
56        return self._swf.describe_domain(self.name)
57
58    @wraps(Layer1.deprecate_domain)
59    def deprecate(self):
60        """DeprecateDomain"""
61        self._swf.deprecate_domain(self.name)
62
63    @wraps(Layer1.register_domain)
64    def register(self):
65        """RegisterDomain."""
66        self._swf.register_domain(self.name, str(self.retention),
67                                  self.description)
68
69    @wraps(Layer1.list_activity_types)
70    def activities(self, status='REGISTERED', **kwargs):
71        """ListActivityTypes."""
72        act_types = self._swf.list_activity_types(self.name, status, **kwargs)
73        act_objects = []
74        for act_args in act_types['typeInfos']:
75            act_ident = act_args['activityType']
76            del act_args['activityType']
77            act_args.update(act_ident)
78            act_args.update({
79                'aws_access_key_id': self.aws_access_key_id,
80                'aws_secret_access_key': self.aws_secret_access_key,
81                'domain': self.name,
82                'region': self.region,
83            })
84            act_objects.append(ActivityType(**act_args))
85        return act_objects
86
87    @wraps(Layer1.list_workflow_types)
88    def workflows(self, status='REGISTERED', **kwargs):
89        """ListWorkflowTypes."""
90        wf_types = self._swf.list_workflow_types(self.name, status, **kwargs)
91        wf_objects = []
92        for wf_args in wf_types['typeInfos']:
93            wf_ident = wf_args['workflowType']
94            del wf_args['workflowType']
95            wf_args.update(wf_ident)
96            wf_args.update({
97                'aws_access_key_id': self.aws_access_key_id,
98                'aws_secret_access_key': self.aws_secret_access_key,
99                'domain': self.name,
100                'region': self.region,
101            })
102
103            wf_objects.append(WorkflowType(**wf_args))
104        return wf_objects
105
106    def executions(self, closed=False, **kwargs):
107        """List list open/closed executions.
108
109        For a full list of available parameters refer to
110        :py:func:`boto.swf.layer1.Layer1.list_closed_workflow_executions` and
111        :py:func:`boto.swf.layer1.Layer1.list_open_workflow_executions`
112        """
113        if closed:
114            executions = self._swf.list_closed_workflow_executions(self.name,
115                                                                   **kwargs)
116        else:
117            if 'oldest_date' not in kwargs:
118                # Last 24 hours.
119                kwargs['oldest_date'] = time.time() - (3600 * 24)
120            executions = self._swf.list_open_workflow_executions(self.name,
121                                                                 **kwargs)
122        exe_objects = []
123        for exe_args in executions['executionInfos']:
124            for nested_key in ('execution', 'workflowType'):
125                nested_dict = exe_args[nested_key]
126                del exe_args[nested_key]
127                exe_args.update(nested_dict)
128
129            exe_args.update({
130                'aws_access_key_id': self.aws_access_key_id,
131                'aws_secret_access_key': self.aws_secret_access_key,
132                'domain': self.name,
133                'region': self.region,
134            })
135
136            exe_objects.append(WorkflowExecution(**exe_args))
137        return exe_objects
138
139    @wraps(Layer1.count_pending_activity_tasks)
140    def count_pending_activity_tasks(self, task_list):
141        """CountPendingActivityTasks."""
142        return self._swf.count_pending_activity_tasks(self.name, task_list)
143
144    @wraps(Layer1.count_pending_decision_tasks)
145    def count_pending_decision_tasks(self, task_list):
146        """CountPendingDecisionTasks."""
147        return self._swf.count_pending_decision_tasks(self.name, task_list)
148
149
150class Actor(SWFBase):
151
152    task_list = None
153    last_tasktoken = None
154    domain = None
155
156    def run(self):
157        """To be overloaded by subclasses."""
158        raise NotImplementedError()
159
160class ActivityWorker(Actor):
161
162    """Base class for SimpleWorkflow activity workers."""
163
164    @wraps(Layer1.respond_activity_task_canceled)
165    def cancel(self, task_token=None, details=None):
166        """RespondActivityTaskCanceled."""
167        if task_token is None:
168            task_token = self.last_tasktoken
169        return self._swf.respond_activity_task_canceled(task_token, details)
170
171    @wraps(Layer1.respond_activity_task_completed)
172    def complete(self, task_token=None, result=None):
173        """RespondActivityTaskCompleted."""
174        if task_token is None:
175            task_token = self.last_tasktoken
176        return self._swf.respond_activity_task_completed(task_token, result)
177
178    @wraps(Layer1.respond_activity_task_failed)
179    def fail(self, task_token=None, details=None, reason=None):
180        """RespondActivityTaskFailed."""
181        if task_token is None:
182            task_token = self.last_tasktoken
183        return self._swf.respond_activity_task_failed(task_token, details,
184                                                      reason)
185
186    @wraps(Layer1.record_activity_task_heartbeat)
187    def heartbeat(self, task_token=None, details=None):
188        """RecordActivityTaskHeartbeat."""
189        if task_token is None:
190            task_token = self.last_tasktoken
191        return self._swf.record_activity_task_heartbeat(task_token, details)
192
193    @wraps(Layer1.poll_for_activity_task)
194    def poll(self, **kwargs):
195        """PollForActivityTask."""
196        task_list = self.task_list
197        if 'task_list' in kwargs:
198            task_list = kwargs.get('task_list')
199            del kwargs['task_list']
200        task = self._swf.poll_for_activity_task(self.domain, task_list,
201                                                **kwargs)
202        self.last_tasktoken = task.get('taskToken')
203        return task
204
205class Decider(Actor):
206
207    """Base class for SimpleWorkflow deciders."""
208
209    @wraps(Layer1.respond_decision_task_completed)
210    def complete(self, task_token=None, decisions=None, **kwargs):
211        """RespondDecisionTaskCompleted."""
212        if isinstance(decisions, Layer1Decisions):
213            # Extract decision list from a Layer1Decisions instance.
214            decisions = decisions._data
215        if task_token is None:
216            task_token = self.last_tasktoken
217        return self._swf.respond_decision_task_completed(task_token, decisions,
218                                                         **kwargs)
219
220    @wraps(Layer1.poll_for_decision_task)
221    def poll(self, **kwargs):
222        """PollForDecisionTask."""
223        task_list = self.task_list
224        if 'task_list' in kwargs:
225            task_list = kwargs.get('task_list')
226            del kwargs['task_list']
227        decision_task = self._swf.poll_for_decision_task(self.domain, task_list,
228                                                  **kwargs)
229        self.last_tasktoken = decision_task.get('taskToken')
230        return decision_task
231
232class WorkflowType(SWFBase):
233
234    """A versioned workflow type."""
235
236    version = None
237    task_list = None
238    child_policy = 'TERMINATE'
239
240    @wraps(Layer1.describe_workflow_type)
241    def describe(self):
242        """DescribeWorkflowType."""
243        return self._swf.describe_workflow_type(self.domain, self.name,
244                                                self.version)
245    @wraps(Layer1.register_workflow_type)
246    def register(self, **kwargs):
247        """RegisterWorkflowType."""
248        args = {
249            'default_execution_start_to_close_timeout': '3600',
250            'default_task_start_to_close_timeout': '300',
251            'default_child_policy': 'TERMINATE',
252        }
253        args.update(kwargs)
254        self._swf.register_workflow_type(self.domain, self.name, self.version,
255                                         **args)
256
257    @wraps(Layer1.deprecate_workflow_type)
258    def deprecate(self):
259        """DeprecateWorkflowType."""
260        self._swf.deprecate_workflow_type(self.domain, self.name, self.version)
261
262    @wraps(Layer1.start_workflow_execution)
263    def start(self, **kwargs):
264        """StartWorkflowExecution."""
265        if 'workflow_id' in kwargs:
266            workflow_id = kwargs['workflow_id']
267            del kwargs['workflow_id']
268        else:
269            workflow_id = '%s-%s-%i' % (self.name, self.version, time.time())
270
271        for def_attr in ('task_list', 'child_policy'):
272            kwargs[def_attr] = kwargs.get(def_attr, getattr(self, def_attr))
273        run_id = self._swf.start_workflow_execution(self.domain, workflow_id,
274                                    self.name, self.version, **kwargs)['runId']
275        return WorkflowExecution(name=self.name, version=self.version,
276               runId=run_id, domain=self.domain, workflowId=workflow_id,
277               aws_access_key_id=self.aws_access_key_id,
278               aws_secret_access_key=self.aws_secret_access_key)
279
280class WorkflowExecution(SWFBase):
281
282    """An instance of a workflow."""
283
284    workflowId = None
285    runId = None
286
287    @wraps(Layer1.signal_workflow_execution)
288    def signal(self, signame, **kwargs):
289        """SignalWorkflowExecution."""
290        self._swf.signal_workflow_execution(self.domain, signame,
291                                            self.workflowId, **kwargs)
292
293    @wraps(Layer1.terminate_workflow_execution)
294    def terminate(self, **kwargs):
295        """TerminateWorkflowExecution (p. 103)."""
296        return self._swf.terminate_workflow_execution(self.domain,
297                                        self.workflowId, **kwargs)
298
299    @wraps(Layer1.get_workflow_execution_history)
300    def history(self, **kwargs):
301        """GetWorkflowExecutionHistory."""
302        return self._swf.get_workflow_execution_history(self.domain, self.runId,
303                                            self.workflowId, **kwargs)['events']
304
305    @wraps(Layer1.describe_workflow_execution)
306    def describe(self):
307        """DescribeWorkflowExecution."""
308        return self._swf.describe_workflow_execution(self.domain, self.runId,
309                                                             self.workflowId)
310
311    @wraps(Layer1.request_cancel_workflow_execution)
312    def request_cancel(self):
313        """RequestCancelWorkflowExecution."""
314        return self._swf.request_cancel_workflow_execution(self.domain,
315                                                   self.workflowId, self.runId)
316
317
318class ActivityType(SWFBase):
319
320    """A versioned activity type."""
321
322    version = None
323
324    @wraps(Layer1.deprecate_activity_type)
325    def deprecate(self):
326        """DeprecateActivityType."""
327        return self._swf.deprecate_activity_type(self.domain, self.name,
328                                                 self.version)
329
330    @wraps(Layer1.describe_activity_type)
331    def describe(self):
332        """DescribeActivityType."""
333        return self._swf.describe_activity_type(self.domain, self.name,
334                                                self.version)
335
336    @wraps(Layer1.register_activity_type)
337    def register(self, **kwargs):
338        """RegisterActivityType."""
339        args = {
340            'default_task_heartbeat_timeout': '600',
341            'default_task_schedule_to_close_timeout': '3900',
342            'default_task_schedule_to_start_timeout': '300',
343            'default_task_start_to_close_timeout': '3600',
344        }
345        args.update(kwargs)
346        self._swf.register_activity_type(self.domain, self.name, self.version,
347                                         **args)
348