• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2013 Amazon.com, Inc. or its affiliates.  All Rights Reserved
2#
3# Permission is hereby granted, free of charge, to any person obtaining a
4# copy of this software and associated documentation files (the
5# "Software"), to deal in the Software without restriction, including
6# without limitation the rights to use, copy, modify, merge, publish, dis-
7# tribute, sublicense, and/or sell copies of the Software, and to permit
8# persons to whom the Software is furnished to do so, subject to the fol-
9# lowing conditions:
10#
11# The above copyright notice and this permission notice shall be included
12# in all copies or substantial portions of the Software.
13#
14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
20# IN THE SOFTWARE.
21#
22
23import boto
24from boto.compat import json
25from boto.connection import AWSQueryConnection
26from boto.regioninfo import RegionInfo
27from boto.exception import JSONResponseError
28from boto.datapipeline import exceptions
29
30
31class DataPipelineConnection(AWSQueryConnection):
32    """
33    This is the AWS Data Pipeline API Reference . This guide provides
34    descriptions and samples of the AWS Data Pipeline API.
35
36    AWS Data Pipeline is a web service that configures and manages a
37    data-driven workflow called a pipeline. AWS Data Pipeline handles
38    the details of scheduling and ensuring that data dependencies are
39    met so your application can focus on processing the data.
40
41    The AWS Data Pipeline API implements two main sets of
42    functionality. The first set of actions configure the pipeline in
43    the web service. You call these actions to create a pipeline and
44    define data sources, schedules, dependencies, and the transforms
45    to be performed on the data.
46
47    The second set of actions are used by a task runner application
48    that calls the AWS Data Pipeline API to receive the next task
49    ready for processing. The logic for performing the task, such as
50    querying the data, running data analysis, or converting the data
51    from one format to another, is contained within the task runner.
52    The task runner performs the task assigned to it by the web
53    service, reporting progress to the web service as it does so. When
54    the task is done, the task runner reports the final success or
55    failure of the task to the web service.
56
57    AWS Data Pipeline provides an open-source implementation of a task
58    runner called AWS Data Pipeline Task Runner. AWS Data Pipeline
59    Task Runner provides logic for common data management scenarios,
60    such as performing database queries and running data analysis
61    using Amazon Elastic MapReduce (Amazon EMR). You can use AWS Data
62    Pipeline Task Runner as your task runner, or you can write your
63    own task runner to provide custom data management.
64
65    The AWS Data Pipeline API uses the Signature Version 4 protocol
66    for signing requests. For more information about how to sign a
67    request with this protocol, see `Signature Version 4 Signing
68    Process`_. In the code examples in this reference, the Signature
69    Version 4 Request parameters are represented as AuthParams.
70    """
71    APIVersion = "2012-10-29"
72    DefaultRegionName = "us-east-1"
73    DefaultRegionEndpoint = "datapipeline.us-east-1.amazonaws.com"
74    ServiceName = "DataPipeline"
75    TargetPrefix = "DataPipeline"
76    ResponseError = JSONResponseError
77
78    _faults = {
79        "PipelineDeletedException": exceptions.PipelineDeletedException,
80        "InvalidRequestException": exceptions.InvalidRequestException,
81        "TaskNotFoundException": exceptions.TaskNotFoundException,
82        "PipelineNotFoundException": exceptions.PipelineNotFoundException,
83        "InternalServiceError": exceptions.InternalServiceError,
84    }
85
86    def __init__(self, **kwargs):
87        region = kwargs.pop('region', None)
88        if not region:
89            region = RegionInfo(self, self.DefaultRegionName,
90                                self.DefaultRegionEndpoint)
91        kwargs['host'] = region.endpoint
92        super(DataPipelineConnection, self).__init__(**kwargs)
93        self.region = region
94
95    def _required_auth_capability(self):
96        return ['hmac-v4']
97
98    def activate_pipeline(self, pipeline_id):
99        """
100        Validates a pipeline and initiates processing. If the pipeline
101        does not pass validation, activation fails.
102
103        Call this action to start processing pipeline tasks of a
104        pipeline you've created using the CreatePipeline and
105        PutPipelineDefinition actions. A pipeline cannot be modified
106        after it has been successfully activated.
107
108        :type pipeline_id: string
109        :param pipeline_id: The identifier of the pipeline to activate.
110
111        """
112        params = {'pipelineId': pipeline_id, }
113        return self.make_request(action='ActivatePipeline',
114                                 body=json.dumps(params))
115
116    def create_pipeline(self, name, unique_id, description=None):
117        """
118        Creates a new empty pipeline. When this action succeeds, you
119        can then use the PutPipelineDefinition action to populate the
120        pipeline.
121
122        :type name: string
123        :param name: The name of the new pipeline. You can use the same name
124            for multiple pipelines associated with your AWS account, because
125            AWS Data Pipeline assigns each new pipeline a unique pipeline
126            identifier.
127
128        :type unique_id: string
129        :param unique_id: A unique identifier that you specify. This identifier
130            is not the same as the pipeline identifier assigned by AWS Data
131            Pipeline. You are responsible for defining the format and ensuring
132            the uniqueness of this identifier. You use this parameter to ensure
133            idempotency during repeated calls to CreatePipeline. For example,
134            if the first call to CreatePipeline does not return a clear
135            success, you can pass in the same unique identifier and pipeline
136            name combination on a subsequent call to CreatePipeline.
137            CreatePipeline ensures that if a pipeline already exists with the
138            same name and unique identifier, a new pipeline will not be
139            created. Instead, you'll receive the pipeline identifier from the
140            previous attempt. The uniqueness of the name and unique identifier
141            combination is scoped to the AWS account or IAM user credentials.
142
143        :type description: string
144        :param description: The description of the new pipeline.
145
146        """
147        params = {'name': name, 'uniqueId': unique_id, }
148        if description is not None:
149            params['description'] = description
150        return self.make_request(action='CreatePipeline',
151                                 body=json.dumps(params))
152
153    def delete_pipeline(self, pipeline_id):
154        """
155        Permanently deletes a pipeline, its pipeline definition and
156        its run history. You cannot query or restore a deleted
157        pipeline. AWS Data Pipeline will attempt to cancel instances
158        associated with the pipeline that are currently being
159        processed by task runners. Deleting a pipeline cannot be
160        undone.
161
162        To temporarily pause a pipeline instead of deleting it, call
163        SetStatus with the status set to Pause on individual
164        components. Components that are paused by SetStatus can be
165        resumed.
166
167        :type pipeline_id: string
168        :param pipeline_id: The identifier of the pipeline to be deleted.
169
170        """
171        params = {'pipelineId': pipeline_id, }
172        return self.make_request(action='DeletePipeline',
173                                 body=json.dumps(params))
174
175    def describe_objects(self, object_ids, pipeline_id, marker=None,
176                         evaluate_expressions=None):
177        """
178        Returns the object definitions for a set of objects associated
179        with the pipeline. Object definitions are composed of a set of
180        fields that define the properties of the object.
181
182        :type pipeline_id: string
183        :param pipeline_id: Identifier of the pipeline that contains the object
184            definitions.
185
186        :type object_ids: list
187        :param object_ids: Identifiers of the pipeline objects that contain the
188            definitions to be described. You can pass as many as 25 identifiers
189            in a single call to DescribeObjects.
190
191        :type evaluate_expressions: boolean
192        :param evaluate_expressions: Indicates whether any expressions in the
193            object should be evaluated when the object descriptions are
194            returned.
195
196        :type marker: string
197        :param marker: The starting point for the results to be returned. The
198            first time you call DescribeObjects, this value should be empty. As
199            long as the action returns `HasMoreResults` as `True`, you can call
200            DescribeObjects again and pass the marker value from the response
201            to retrieve the next set of results.
202
203        """
204        params = {
205            'pipelineId': pipeline_id,
206            'objectIds': object_ids,
207        }
208        if evaluate_expressions is not None:
209            params['evaluateExpressions'] = evaluate_expressions
210        if marker is not None:
211            params['marker'] = marker
212        return self.make_request(action='DescribeObjects',
213                                 body=json.dumps(params))
214
215    def describe_pipelines(self, pipeline_ids):
216        """
217        Retrieve metadata about one or more pipelines. The information
218        retrieved includes the name of the pipeline, the pipeline
219        identifier, its current state, and the user account that owns
220        the pipeline. Using account credentials, you can retrieve
221        metadata about pipelines that you or your IAM users have
222        created. If you are using an IAM user account, you can
223        retrieve metadata about only those pipelines you have read
224        permission for.
225
226        To retrieve the full pipeline definition instead of metadata
227        about the pipeline, call the GetPipelineDefinition action.
228
229        :type pipeline_ids: list
230        :param pipeline_ids: Identifiers of the pipelines to describe. You can
231            pass as many as 25 identifiers in a single call to
232            DescribePipelines. You can obtain pipeline identifiers by calling
233            ListPipelines.
234
235        """
236        params = {'pipelineIds': pipeline_ids, }
237        return self.make_request(action='DescribePipelines',
238                                 body=json.dumps(params))
239
240    def evaluate_expression(self, pipeline_id, expression, object_id):
241        """
242        Evaluates a string in the context of a specified object. A
243        task runner can use this action to evaluate SQL queries stored
244        in Amazon S3.
245
246        :type pipeline_id: string
247        :param pipeline_id: The identifier of the pipeline.
248
249        :type object_id: string
250        :param object_id: The identifier of the object.
251
252        :type expression: string
253        :param expression: The expression to evaluate.
254
255        """
256        params = {
257            'pipelineId': pipeline_id,
258            'objectId': object_id,
259            'expression': expression,
260        }
261        return self.make_request(action='EvaluateExpression',
262                                 body=json.dumps(params))
263
264    def get_pipeline_definition(self, pipeline_id, version=None):
265        """
266        Returns the definition of the specified pipeline. You can call
267        GetPipelineDefinition to retrieve the pipeline definition you
268        provided using PutPipelineDefinition.
269
270        :type pipeline_id: string
271        :param pipeline_id: The identifier of the pipeline.
272
273        :type version: string
274        :param version: The version of the pipeline definition to retrieve.
275            This parameter accepts the values `latest` (default) and `active`.
276            Where `latest` indicates the last definition saved to the pipeline
277            and `active` indicates the last definition of the pipeline that was
278            activated.
279
280        """
281        params = {'pipelineId': pipeline_id, }
282        if version is not None:
283            params['version'] = version
284        return self.make_request(action='GetPipelineDefinition',
285                                 body=json.dumps(params))
286
287    def list_pipelines(self, marker=None):
288        """
289        Returns a list of pipeline identifiers for all active
290        pipelines. Identifiers are returned only for pipelines you
291        have permission to access.
292
293        :type marker: string
294        :param marker: The starting point for the results to be returned. The
295            first time you call ListPipelines, this value should be empty. As
296            long as the action returns `HasMoreResults` as `True`, you can call
297            ListPipelines again and pass the marker value from the response to
298            retrieve the next set of results.
299
300        """
301        params = {}
302        if marker is not None:
303            params['marker'] = marker
304        return self.make_request(action='ListPipelines',
305                                 body=json.dumps(params))
306
307    def poll_for_task(self, worker_group, hostname=None,
308                      instance_identity=None):
309        """
310        Task runners call this action to receive a task to perform
311        from AWS Data Pipeline. The task runner specifies which tasks
312        it can perform by setting a value for the workerGroup
313        parameter of the PollForTask call. The task returned by
314        PollForTask may come from any of the pipelines that match the
315        workerGroup value passed in by the task runner and that was
316        launched using the IAM user credentials specified by the task
317        runner.
318
319        If tasks are ready in the work queue, PollForTask returns a
320        response immediately. If no tasks are available in the queue,
321        PollForTask uses long-polling and holds on to a poll
322        connection for up to a 90 seconds during which time the first
323        newly scheduled task is handed to the task runner. To
324        accomodate this, set the socket timeout in your task runner to
325        90 seconds. The task runner should not call PollForTask again
326        on the same `workerGroup` until it receives a response, and
327        this may take up to 90 seconds.
328
329        :type worker_group: string
330        :param worker_group: Indicates the type of task the task runner is
331            configured to accept and process. The worker group is set as a
332            field on objects in the pipeline when they are created. You can
333            only specify a single value for `workerGroup` in the call to
334            PollForTask. There are no wildcard values permitted in
335            `workerGroup`, the string must be an exact, case-sensitive, match.
336
337        :type hostname: string
338        :param hostname: The public DNS name of the calling task runner.
339
340        :type instance_identity: dict
341        :param instance_identity: Identity information for the Amazon EC2
342            instance that is hosting the task runner. You can get this value by
343            calling the URI, `http://169.254.169.254/latest/meta-data/instance-
344            id`, from the EC2 instance. For more information, go to `Instance
345            Metadata`_ in the Amazon Elastic Compute Cloud User Guide. Passing
346            in this value proves that your task runner is running on an EC2
347            instance, and ensures the proper AWS Data Pipeline service charges
348            are applied to your pipeline.
349
350        """
351        params = {'workerGroup': worker_group, }
352        if hostname is not None:
353            params['hostname'] = hostname
354        if instance_identity is not None:
355            params['instanceIdentity'] = instance_identity
356        return self.make_request(action='PollForTask',
357                                 body=json.dumps(params))
358
359    def put_pipeline_definition(self, pipeline_objects, pipeline_id):
360        """
361        Adds tasks, schedules, and preconditions that control the
362        behavior of the pipeline. You can use PutPipelineDefinition to
363        populate a new pipeline or to update an existing pipeline that
364        has not yet been activated.
365
366        PutPipelineDefinition also validates the configuration as it
367        adds it to the pipeline. Changes to the pipeline are saved
368        unless one of the following three validation errors exists in
369        the pipeline.
370
371        #. An object is missing a name or identifier field.
372        #. A string or reference field is empty.
373        #. The number of objects in the pipeline exceeds the maximum
374           allowed objects.
375
376
377
378        Pipeline object definitions are passed to the
379        PutPipelineDefinition action and returned by the
380        GetPipelineDefinition action.
381
382        :type pipeline_id: string
383        :param pipeline_id: The identifier of the pipeline to be configured.
384
385        :type pipeline_objects: list
386        :param pipeline_objects: The objects that define the pipeline. These
387            will overwrite the existing pipeline definition.
388
389        """
390        params = {
391            'pipelineId': pipeline_id,
392            'pipelineObjects': pipeline_objects,
393        }
394        return self.make_request(action='PutPipelineDefinition',
395                                 body=json.dumps(params))
396
397    def query_objects(self, pipeline_id, sphere, marker=None, query=None,
398                      limit=None):
399        """
400        Queries a pipeline for the names of objects that match a
401        specified set of conditions.
402
403        The objects returned by QueryObjects are paginated and then
404        filtered by the value you set for query. This means the action
405        may return an empty result set with a value set for marker. If
406        `HasMoreResults` is set to `True`, you should continue to call
407        QueryObjects, passing in the returned value for marker, until
408        `HasMoreResults` returns `False`.
409
410        :type pipeline_id: string
411        :param pipeline_id: Identifier of the pipeline to be queried for object
412            names.
413
414        :type query: dict
415        :param query: Query that defines the objects to be returned. The Query
416            object can contain a maximum of ten selectors. The conditions in
417            the query are limited to top-level String fields in the object.
418            These filters can be applied to components, instances, and
419            attempts.
420
421        :type sphere: string
422        :param sphere: Specifies whether the query applies to components or
423            instances. Allowable values: `COMPONENT`, `INSTANCE`, `ATTEMPT`.
424
425        :type marker: string
426        :param marker: The starting point for the results to be returned. The
427            first time you call QueryObjects, this value should be empty. As
428            long as the action returns `HasMoreResults` as `True`, you can call
429            QueryObjects again and pass the marker value from the response to
430            retrieve the next set of results.
431
432        :type limit: integer
433        :param limit: Specifies the maximum number of object names that
434            QueryObjects will return in a single call. The default value is
435            100.
436
437        """
438        params = {'pipelineId': pipeline_id, 'sphere': sphere, }
439        if query is not None:
440            params['query'] = query
441        if marker is not None:
442            params['marker'] = marker
443        if limit is not None:
444            params['limit'] = limit
445        return self.make_request(action='QueryObjects',
446                                 body=json.dumps(params))
447
448    def report_task_progress(self, task_id):
449        """
450        Updates the AWS Data Pipeline service on the progress of the
451        calling task runner. When the task runner is assigned a task,
452        it should call ReportTaskProgress to acknowledge that it has
453        the task within 2 minutes. If the web service does not recieve
454        this acknowledgement within the 2 minute window, it will
455        assign the task in a subsequent PollForTask call. After this
456        initial acknowledgement, the task runner only needs to report
457        progress every 15 minutes to maintain its ownership of the
458        task. You can change this reporting time from 15 minutes by
459        specifying a `reportProgressTimeout` field in your pipeline.
460        If a task runner does not report its status after 5 minutes,
461        AWS Data Pipeline will assume that the task runner is unable
462        to process the task and will reassign the task in a subsequent
463        response to PollForTask. task runners should call
464        ReportTaskProgress every 60 seconds.
465
466        :type task_id: string
467        :param task_id: Identifier of the task assigned to the task runner.
468            This value is provided in the TaskObject that the service returns
469            with the response for the PollForTask action.
470
471        """
472        params = {'taskId': task_id, }
473        return self.make_request(action='ReportTaskProgress',
474                                 body=json.dumps(params))
475
476    def report_task_runner_heartbeat(self, taskrunner_id, worker_group=None,
477                                     hostname=None):
478        """
479        Task runners call ReportTaskRunnerHeartbeat every 15 minutes
480        to indicate that they are operational. In the case of AWS Data
481        Pipeline Task Runner launched on a resource managed by AWS
482        Data Pipeline, the web service can use this call to detect
483        when the task runner application has failed and restart a new
484        instance.
485
486        :type taskrunner_id: string
487        :param taskrunner_id: The identifier of the task runner. This value
488            should be unique across your AWS account. In the case of AWS Data
489            Pipeline Task Runner launched on a resource managed by AWS Data
490            Pipeline, the web service provides a unique identifier when it
491            launches the application. If you have written a custom task runner,
492            you should assign a unique identifier for the task runner.
493
494        :type worker_group: string
495        :param worker_group: Indicates the type of task the task runner is
496            configured to accept and process. The worker group is set as a
497            field on objects in the pipeline when they are created. You can
498            only specify a single value for `workerGroup` in the call to
499            ReportTaskRunnerHeartbeat. There are no wildcard values permitted
500            in `workerGroup`, the string must be an exact, case-sensitive,
501            match.
502
503        :type hostname: string
504        :param hostname: The public DNS name of the calling task runner.
505
506        """
507        params = {'taskrunnerId': taskrunner_id, }
508        if worker_group is not None:
509            params['workerGroup'] = worker_group
510        if hostname is not None:
511            params['hostname'] = hostname
512        return self.make_request(action='ReportTaskRunnerHeartbeat',
513                                 body=json.dumps(params))
514
515    def set_status(self, object_ids, status, pipeline_id):
516        """
517        Requests that the status of an array of physical or logical
518        pipeline objects be updated in the pipeline. This update may
519        not occur immediately, but is eventually consistent. The
520        status that can be set depends on the type of object.
521
522        :type pipeline_id: string
523        :param pipeline_id: Identifies the pipeline that contains the objects.
524
525        :type object_ids: list
526        :param object_ids: Identifies an array of objects. The corresponding
527            objects can be either physical or components, but not a mix of both
528            types.
529
530        :type status: string
531        :param status: Specifies the status to be set on all the objects in
532            `objectIds`. For components, this can be either `PAUSE` or
533            `RESUME`. For instances, this can be either `CANCEL`, `RERUN`, or
534            `MARK_FINISHED`.
535
536        """
537        params = {
538            'pipelineId': pipeline_id,
539            'objectIds': object_ids,
540            'status': status,
541        }
542        return self.make_request(action='SetStatus',
543                                 body=json.dumps(params))
544
545    def set_task_status(self, task_id, task_status, error_id=None,
546                        error_message=None, error_stack_trace=None):
547        """
548        Notifies AWS Data Pipeline that a task is completed and
549        provides information about the final status. The task runner
550        calls this action regardless of whether the task was
551        sucessful. The task runner does not need to call SetTaskStatus
552        for tasks that are canceled by the web service during a call
553        to ReportTaskProgress.
554
555        :type task_id: string
556        :param task_id: Identifies the task assigned to the task runner. This
557            value is set in the TaskObject that is returned by the PollForTask
558            action.
559
560        :type task_status: string
561        :param task_status: If `FINISHED`, the task successfully completed. If
562            `FAILED` the task ended unsuccessfully. The `FALSE` value is used
563            by preconditions.
564
565        :type error_id: string
566        :param error_id: If an error occurred during the task, this value
567            specifies an id value that represents the error. This value is set
568            on the physical attempt object. It is used to display error
569            information to the user. It should not start with string "Service_"
570            which is reserved by the system.
571
572        :type error_message: string
573        :param error_message: If an error occurred during the task, this value
574            specifies a text description of the error. This value is set on the
575            physical attempt object. It is used to display error information to
576            the user. The web service does not parse this value.
577
578        :type error_stack_trace: string
579        :param error_stack_trace: If an error occurred during the task, this
580            value specifies the stack trace associated with the error. This
581            value is set on the physical attempt object. It is used to display
582            error information to the user. The web service does not parse this
583            value.
584
585        """
586        params = {'taskId': task_id, 'taskStatus': task_status, }
587        if error_id is not None:
588            params['errorId'] = error_id
589        if error_message is not None:
590            params['errorMessage'] = error_message
591        if error_stack_trace is not None:
592            params['errorStackTrace'] = error_stack_trace
593        return self.make_request(action='SetTaskStatus',
594                                 body=json.dumps(params))
595
596    def validate_pipeline_definition(self, pipeline_objects, pipeline_id):
597        """
598        Tests the pipeline definition with a set of validation checks
599        to ensure that it is well formed and can run without error.
600
601        :type pipeline_id: string
602        :param pipeline_id: Identifies the pipeline whose definition is to be
603            validated.
604
605        :type pipeline_objects: list
606        :param pipeline_objects: A list of objects that define the pipeline
607            changes to validate against the pipeline.
608
609        """
610        params = {
611            'pipelineId': pipeline_id,
612            'pipelineObjects': pipeline_objects,
613        }
614        return self.make_request(action='ValidatePipelineDefinition',
615                                 body=json.dumps(params))
616
617    def make_request(self, action, body):
618        headers = {
619            'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action),
620            'Host': self.region.endpoint,
621            'Content-Type': 'application/x-amz-json-1.1',
622            'Content-Length': str(len(body)),
623        }
624        http_request = self.build_base_http_request(
625            method='POST', path='/', auth_path='/', params={},
626            headers=headers, data=body)
627        response = self._mexe(http_request, sender=None,
628                              override_num_retries=10)
629        response_body = response.read().decode('utf-8')
630        boto.log.debug(response_body)
631        if response.status == 200:
632            if response_body:
633                return json.loads(response_body)
634        else:
635            json_body = json.loads(response_body)
636            fault_name = json_body.get('__type', None)
637            exception_class = self._faults.get(fault_name, self.ResponseError)
638            raise exception_class(response.status, response.reason,
639                                  body=json_body)
640