1# Copyright (c) 2013 Amazon.com, Inc. or its affiliates. All Rights Reserved 2# 3# Permission is hereby granted, free of charge, to any person obtaining a 4# copy of this software and associated documentation files (the 5# "Software"), to deal in the Software without restriction, including 6# without limitation the rights to use, copy, modify, merge, publish, dis- 7# tribute, sublicense, and/or sell copies of the Software, and to permit 8# persons to whom the Software is furnished to do so, subject to the fol- 9# lowing conditions: 10# 11# The above copyright notice and this permission notice shall be included 12# in all copies or substantial portions of the Software. 13# 14# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- 16# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT 17# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, 18# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 20# IN THE SOFTWARE. 21# 22 23import boto 24from boto.compat import json 25from boto.connection import AWSQueryConnection 26from boto.regioninfo import RegionInfo 27from boto.exception import JSONResponseError 28from boto.datapipeline import exceptions 29 30 31class DataPipelineConnection(AWSQueryConnection): 32 """ 33 This is the AWS Data Pipeline API Reference . This guide provides 34 descriptions and samples of the AWS Data Pipeline API. 35 36 AWS Data Pipeline is a web service that configures and manages a 37 data-driven workflow called a pipeline. AWS Data Pipeline handles 38 the details of scheduling and ensuring that data dependencies are 39 met so your application can focus on processing the data. 40 41 The AWS Data Pipeline API implements two main sets of 42 functionality. The first set of actions configure the pipeline in 43 the web service. You call these actions to create a pipeline and 44 define data sources, schedules, dependencies, and the transforms 45 to be performed on the data. 46 47 The second set of actions are used by a task runner application 48 that calls the AWS Data Pipeline API to receive the next task 49 ready for processing. The logic for performing the task, such as 50 querying the data, running data analysis, or converting the data 51 from one format to another, is contained within the task runner. 52 The task runner performs the task assigned to it by the web 53 service, reporting progress to the web service as it does so. When 54 the task is done, the task runner reports the final success or 55 failure of the task to the web service. 56 57 AWS Data Pipeline provides an open-source implementation of a task 58 runner called AWS Data Pipeline Task Runner. AWS Data Pipeline 59 Task Runner provides logic for common data management scenarios, 60 such as performing database queries and running data analysis 61 using Amazon Elastic MapReduce (Amazon EMR). You can use AWS Data 62 Pipeline Task Runner as your task runner, or you can write your 63 own task runner to provide custom data management. 64 65 The AWS Data Pipeline API uses the Signature Version 4 protocol 66 for signing requests. For more information about how to sign a 67 request with this protocol, see `Signature Version 4 Signing 68 Process`_. In the code examples in this reference, the Signature 69 Version 4 Request parameters are represented as AuthParams. 70 """ 71 APIVersion = "2012-10-29" 72 DefaultRegionName = "us-east-1" 73 DefaultRegionEndpoint = "datapipeline.us-east-1.amazonaws.com" 74 ServiceName = "DataPipeline" 75 TargetPrefix = "DataPipeline" 76 ResponseError = JSONResponseError 77 78 _faults = { 79 "PipelineDeletedException": exceptions.PipelineDeletedException, 80 "InvalidRequestException": exceptions.InvalidRequestException, 81 "TaskNotFoundException": exceptions.TaskNotFoundException, 82 "PipelineNotFoundException": exceptions.PipelineNotFoundException, 83 "InternalServiceError": exceptions.InternalServiceError, 84 } 85 86 def __init__(self, **kwargs): 87 region = kwargs.pop('region', None) 88 if not region: 89 region = RegionInfo(self, self.DefaultRegionName, 90 self.DefaultRegionEndpoint) 91 kwargs['host'] = region.endpoint 92 super(DataPipelineConnection, self).__init__(**kwargs) 93 self.region = region 94 95 def _required_auth_capability(self): 96 return ['hmac-v4'] 97 98 def activate_pipeline(self, pipeline_id): 99 """ 100 Validates a pipeline and initiates processing. If the pipeline 101 does not pass validation, activation fails. 102 103 Call this action to start processing pipeline tasks of a 104 pipeline you've created using the CreatePipeline and 105 PutPipelineDefinition actions. A pipeline cannot be modified 106 after it has been successfully activated. 107 108 :type pipeline_id: string 109 :param pipeline_id: The identifier of the pipeline to activate. 110 111 """ 112 params = {'pipelineId': pipeline_id, } 113 return self.make_request(action='ActivatePipeline', 114 body=json.dumps(params)) 115 116 def create_pipeline(self, name, unique_id, description=None): 117 """ 118 Creates a new empty pipeline. When this action succeeds, you 119 can then use the PutPipelineDefinition action to populate the 120 pipeline. 121 122 :type name: string 123 :param name: The name of the new pipeline. You can use the same name 124 for multiple pipelines associated with your AWS account, because 125 AWS Data Pipeline assigns each new pipeline a unique pipeline 126 identifier. 127 128 :type unique_id: string 129 :param unique_id: A unique identifier that you specify. This identifier 130 is not the same as the pipeline identifier assigned by AWS Data 131 Pipeline. You are responsible for defining the format and ensuring 132 the uniqueness of this identifier. You use this parameter to ensure 133 idempotency during repeated calls to CreatePipeline. For example, 134 if the first call to CreatePipeline does not return a clear 135 success, you can pass in the same unique identifier and pipeline 136 name combination on a subsequent call to CreatePipeline. 137 CreatePipeline ensures that if a pipeline already exists with the 138 same name and unique identifier, a new pipeline will not be 139 created. Instead, you'll receive the pipeline identifier from the 140 previous attempt. The uniqueness of the name and unique identifier 141 combination is scoped to the AWS account or IAM user credentials. 142 143 :type description: string 144 :param description: The description of the new pipeline. 145 146 """ 147 params = {'name': name, 'uniqueId': unique_id, } 148 if description is not None: 149 params['description'] = description 150 return self.make_request(action='CreatePipeline', 151 body=json.dumps(params)) 152 153 def delete_pipeline(self, pipeline_id): 154 """ 155 Permanently deletes a pipeline, its pipeline definition and 156 its run history. You cannot query or restore a deleted 157 pipeline. AWS Data Pipeline will attempt to cancel instances 158 associated with the pipeline that are currently being 159 processed by task runners. Deleting a pipeline cannot be 160 undone. 161 162 To temporarily pause a pipeline instead of deleting it, call 163 SetStatus with the status set to Pause on individual 164 components. Components that are paused by SetStatus can be 165 resumed. 166 167 :type pipeline_id: string 168 :param pipeline_id: The identifier of the pipeline to be deleted. 169 170 """ 171 params = {'pipelineId': pipeline_id, } 172 return self.make_request(action='DeletePipeline', 173 body=json.dumps(params)) 174 175 def describe_objects(self, object_ids, pipeline_id, marker=None, 176 evaluate_expressions=None): 177 """ 178 Returns the object definitions for a set of objects associated 179 with the pipeline. Object definitions are composed of a set of 180 fields that define the properties of the object. 181 182 :type pipeline_id: string 183 :param pipeline_id: Identifier of the pipeline that contains the object 184 definitions. 185 186 :type object_ids: list 187 :param object_ids: Identifiers of the pipeline objects that contain the 188 definitions to be described. You can pass as many as 25 identifiers 189 in a single call to DescribeObjects. 190 191 :type evaluate_expressions: boolean 192 :param evaluate_expressions: Indicates whether any expressions in the 193 object should be evaluated when the object descriptions are 194 returned. 195 196 :type marker: string 197 :param marker: The starting point for the results to be returned. The 198 first time you call DescribeObjects, this value should be empty. As 199 long as the action returns `HasMoreResults` as `True`, you can call 200 DescribeObjects again and pass the marker value from the response 201 to retrieve the next set of results. 202 203 """ 204 params = { 205 'pipelineId': pipeline_id, 206 'objectIds': object_ids, 207 } 208 if evaluate_expressions is not None: 209 params['evaluateExpressions'] = evaluate_expressions 210 if marker is not None: 211 params['marker'] = marker 212 return self.make_request(action='DescribeObjects', 213 body=json.dumps(params)) 214 215 def describe_pipelines(self, pipeline_ids): 216 """ 217 Retrieve metadata about one or more pipelines. The information 218 retrieved includes the name of the pipeline, the pipeline 219 identifier, its current state, and the user account that owns 220 the pipeline. Using account credentials, you can retrieve 221 metadata about pipelines that you or your IAM users have 222 created. If you are using an IAM user account, you can 223 retrieve metadata about only those pipelines you have read 224 permission for. 225 226 To retrieve the full pipeline definition instead of metadata 227 about the pipeline, call the GetPipelineDefinition action. 228 229 :type pipeline_ids: list 230 :param pipeline_ids: Identifiers of the pipelines to describe. You can 231 pass as many as 25 identifiers in a single call to 232 DescribePipelines. You can obtain pipeline identifiers by calling 233 ListPipelines. 234 235 """ 236 params = {'pipelineIds': pipeline_ids, } 237 return self.make_request(action='DescribePipelines', 238 body=json.dumps(params)) 239 240 def evaluate_expression(self, pipeline_id, expression, object_id): 241 """ 242 Evaluates a string in the context of a specified object. A 243 task runner can use this action to evaluate SQL queries stored 244 in Amazon S3. 245 246 :type pipeline_id: string 247 :param pipeline_id: The identifier of the pipeline. 248 249 :type object_id: string 250 :param object_id: The identifier of the object. 251 252 :type expression: string 253 :param expression: The expression to evaluate. 254 255 """ 256 params = { 257 'pipelineId': pipeline_id, 258 'objectId': object_id, 259 'expression': expression, 260 } 261 return self.make_request(action='EvaluateExpression', 262 body=json.dumps(params)) 263 264 def get_pipeline_definition(self, pipeline_id, version=None): 265 """ 266 Returns the definition of the specified pipeline. You can call 267 GetPipelineDefinition to retrieve the pipeline definition you 268 provided using PutPipelineDefinition. 269 270 :type pipeline_id: string 271 :param pipeline_id: The identifier of the pipeline. 272 273 :type version: string 274 :param version: The version of the pipeline definition to retrieve. 275 This parameter accepts the values `latest` (default) and `active`. 276 Where `latest` indicates the last definition saved to the pipeline 277 and `active` indicates the last definition of the pipeline that was 278 activated. 279 280 """ 281 params = {'pipelineId': pipeline_id, } 282 if version is not None: 283 params['version'] = version 284 return self.make_request(action='GetPipelineDefinition', 285 body=json.dumps(params)) 286 287 def list_pipelines(self, marker=None): 288 """ 289 Returns a list of pipeline identifiers for all active 290 pipelines. Identifiers are returned only for pipelines you 291 have permission to access. 292 293 :type marker: string 294 :param marker: The starting point for the results to be returned. The 295 first time you call ListPipelines, this value should be empty. As 296 long as the action returns `HasMoreResults` as `True`, you can call 297 ListPipelines again and pass the marker value from the response to 298 retrieve the next set of results. 299 300 """ 301 params = {} 302 if marker is not None: 303 params['marker'] = marker 304 return self.make_request(action='ListPipelines', 305 body=json.dumps(params)) 306 307 def poll_for_task(self, worker_group, hostname=None, 308 instance_identity=None): 309 """ 310 Task runners call this action to receive a task to perform 311 from AWS Data Pipeline. The task runner specifies which tasks 312 it can perform by setting a value for the workerGroup 313 parameter of the PollForTask call. The task returned by 314 PollForTask may come from any of the pipelines that match the 315 workerGroup value passed in by the task runner and that was 316 launched using the IAM user credentials specified by the task 317 runner. 318 319 If tasks are ready in the work queue, PollForTask returns a 320 response immediately. If no tasks are available in the queue, 321 PollForTask uses long-polling and holds on to a poll 322 connection for up to a 90 seconds during which time the first 323 newly scheduled task is handed to the task runner. To 324 accomodate this, set the socket timeout in your task runner to 325 90 seconds. The task runner should not call PollForTask again 326 on the same `workerGroup` until it receives a response, and 327 this may take up to 90 seconds. 328 329 :type worker_group: string 330 :param worker_group: Indicates the type of task the task runner is 331 configured to accept and process. The worker group is set as a 332 field on objects in the pipeline when they are created. You can 333 only specify a single value for `workerGroup` in the call to 334 PollForTask. There are no wildcard values permitted in 335 `workerGroup`, the string must be an exact, case-sensitive, match. 336 337 :type hostname: string 338 :param hostname: The public DNS name of the calling task runner. 339 340 :type instance_identity: dict 341 :param instance_identity: Identity information for the Amazon EC2 342 instance that is hosting the task runner. You can get this value by 343 calling the URI, `http://169.254.169.254/latest/meta-data/instance- 344 id`, from the EC2 instance. For more information, go to `Instance 345 Metadata`_ in the Amazon Elastic Compute Cloud User Guide. Passing 346 in this value proves that your task runner is running on an EC2 347 instance, and ensures the proper AWS Data Pipeline service charges 348 are applied to your pipeline. 349 350 """ 351 params = {'workerGroup': worker_group, } 352 if hostname is not None: 353 params['hostname'] = hostname 354 if instance_identity is not None: 355 params['instanceIdentity'] = instance_identity 356 return self.make_request(action='PollForTask', 357 body=json.dumps(params)) 358 359 def put_pipeline_definition(self, pipeline_objects, pipeline_id): 360 """ 361 Adds tasks, schedules, and preconditions that control the 362 behavior of the pipeline. You can use PutPipelineDefinition to 363 populate a new pipeline or to update an existing pipeline that 364 has not yet been activated. 365 366 PutPipelineDefinition also validates the configuration as it 367 adds it to the pipeline. Changes to the pipeline are saved 368 unless one of the following three validation errors exists in 369 the pipeline. 370 371 #. An object is missing a name or identifier field. 372 #. A string or reference field is empty. 373 #. The number of objects in the pipeline exceeds the maximum 374 allowed objects. 375 376 377 378 Pipeline object definitions are passed to the 379 PutPipelineDefinition action and returned by the 380 GetPipelineDefinition action. 381 382 :type pipeline_id: string 383 :param pipeline_id: The identifier of the pipeline to be configured. 384 385 :type pipeline_objects: list 386 :param pipeline_objects: The objects that define the pipeline. These 387 will overwrite the existing pipeline definition. 388 389 """ 390 params = { 391 'pipelineId': pipeline_id, 392 'pipelineObjects': pipeline_objects, 393 } 394 return self.make_request(action='PutPipelineDefinition', 395 body=json.dumps(params)) 396 397 def query_objects(self, pipeline_id, sphere, marker=None, query=None, 398 limit=None): 399 """ 400 Queries a pipeline for the names of objects that match a 401 specified set of conditions. 402 403 The objects returned by QueryObjects are paginated and then 404 filtered by the value you set for query. This means the action 405 may return an empty result set with a value set for marker. If 406 `HasMoreResults` is set to `True`, you should continue to call 407 QueryObjects, passing in the returned value for marker, until 408 `HasMoreResults` returns `False`. 409 410 :type pipeline_id: string 411 :param pipeline_id: Identifier of the pipeline to be queried for object 412 names. 413 414 :type query: dict 415 :param query: Query that defines the objects to be returned. The Query 416 object can contain a maximum of ten selectors. The conditions in 417 the query are limited to top-level String fields in the object. 418 These filters can be applied to components, instances, and 419 attempts. 420 421 :type sphere: string 422 :param sphere: Specifies whether the query applies to components or 423 instances. Allowable values: `COMPONENT`, `INSTANCE`, `ATTEMPT`. 424 425 :type marker: string 426 :param marker: The starting point for the results to be returned. The 427 first time you call QueryObjects, this value should be empty. As 428 long as the action returns `HasMoreResults` as `True`, you can call 429 QueryObjects again and pass the marker value from the response to 430 retrieve the next set of results. 431 432 :type limit: integer 433 :param limit: Specifies the maximum number of object names that 434 QueryObjects will return in a single call. The default value is 435 100. 436 437 """ 438 params = {'pipelineId': pipeline_id, 'sphere': sphere, } 439 if query is not None: 440 params['query'] = query 441 if marker is not None: 442 params['marker'] = marker 443 if limit is not None: 444 params['limit'] = limit 445 return self.make_request(action='QueryObjects', 446 body=json.dumps(params)) 447 448 def report_task_progress(self, task_id): 449 """ 450 Updates the AWS Data Pipeline service on the progress of the 451 calling task runner. When the task runner is assigned a task, 452 it should call ReportTaskProgress to acknowledge that it has 453 the task within 2 minutes. If the web service does not recieve 454 this acknowledgement within the 2 minute window, it will 455 assign the task in a subsequent PollForTask call. After this 456 initial acknowledgement, the task runner only needs to report 457 progress every 15 minutes to maintain its ownership of the 458 task. You can change this reporting time from 15 minutes by 459 specifying a `reportProgressTimeout` field in your pipeline. 460 If a task runner does not report its status after 5 minutes, 461 AWS Data Pipeline will assume that the task runner is unable 462 to process the task and will reassign the task in a subsequent 463 response to PollForTask. task runners should call 464 ReportTaskProgress every 60 seconds. 465 466 :type task_id: string 467 :param task_id: Identifier of the task assigned to the task runner. 468 This value is provided in the TaskObject that the service returns 469 with the response for the PollForTask action. 470 471 """ 472 params = {'taskId': task_id, } 473 return self.make_request(action='ReportTaskProgress', 474 body=json.dumps(params)) 475 476 def report_task_runner_heartbeat(self, taskrunner_id, worker_group=None, 477 hostname=None): 478 """ 479 Task runners call ReportTaskRunnerHeartbeat every 15 minutes 480 to indicate that they are operational. In the case of AWS Data 481 Pipeline Task Runner launched on a resource managed by AWS 482 Data Pipeline, the web service can use this call to detect 483 when the task runner application has failed and restart a new 484 instance. 485 486 :type taskrunner_id: string 487 :param taskrunner_id: The identifier of the task runner. This value 488 should be unique across your AWS account. In the case of AWS Data 489 Pipeline Task Runner launched on a resource managed by AWS Data 490 Pipeline, the web service provides a unique identifier when it 491 launches the application. If you have written a custom task runner, 492 you should assign a unique identifier for the task runner. 493 494 :type worker_group: string 495 :param worker_group: Indicates the type of task the task runner is 496 configured to accept and process. The worker group is set as a 497 field on objects in the pipeline when they are created. You can 498 only specify a single value for `workerGroup` in the call to 499 ReportTaskRunnerHeartbeat. There are no wildcard values permitted 500 in `workerGroup`, the string must be an exact, case-sensitive, 501 match. 502 503 :type hostname: string 504 :param hostname: The public DNS name of the calling task runner. 505 506 """ 507 params = {'taskrunnerId': taskrunner_id, } 508 if worker_group is not None: 509 params['workerGroup'] = worker_group 510 if hostname is not None: 511 params['hostname'] = hostname 512 return self.make_request(action='ReportTaskRunnerHeartbeat', 513 body=json.dumps(params)) 514 515 def set_status(self, object_ids, status, pipeline_id): 516 """ 517 Requests that the status of an array of physical or logical 518 pipeline objects be updated in the pipeline. This update may 519 not occur immediately, but is eventually consistent. The 520 status that can be set depends on the type of object. 521 522 :type pipeline_id: string 523 :param pipeline_id: Identifies the pipeline that contains the objects. 524 525 :type object_ids: list 526 :param object_ids: Identifies an array of objects. The corresponding 527 objects can be either physical or components, but not a mix of both 528 types. 529 530 :type status: string 531 :param status: Specifies the status to be set on all the objects in 532 `objectIds`. For components, this can be either `PAUSE` or 533 `RESUME`. For instances, this can be either `CANCEL`, `RERUN`, or 534 `MARK_FINISHED`. 535 536 """ 537 params = { 538 'pipelineId': pipeline_id, 539 'objectIds': object_ids, 540 'status': status, 541 } 542 return self.make_request(action='SetStatus', 543 body=json.dumps(params)) 544 545 def set_task_status(self, task_id, task_status, error_id=None, 546 error_message=None, error_stack_trace=None): 547 """ 548 Notifies AWS Data Pipeline that a task is completed and 549 provides information about the final status. The task runner 550 calls this action regardless of whether the task was 551 sucessful. The task runner does not need to call SetTaskStatus 552 for tasks that are canceled by the web service during a call 553 to ReportTaskProgress. 554 555 :type task_id: string 556 :param task_id: Identifies the task assigned to the task runner. This 557 value is set in the TaskObject that is returned by the PollForTask 558 action. 559 560 :type task_status: string 561 :param task_status: If `FINISHED`, the task successfully completed. If 562 `FAILED` the task ended unsuccessfully. The `FALSE` value is used 563 by preconditions. 564 565 :type error_id: string 566 :param error_id: If an error occurred during the task, this value 567 specifies an id value that represents the error. This value is set 568 on the physical attempt object. It is used to display error 569 information to the user. It should not start with string "Service_" 570 which is reserved by the system. 571 572 :type error_message: string 573 :param error_message: If an error occurred during the task, this value 574 specifies a text description of the error. This value is set on the 575 physical attempt object. It is used to display error information to 576 the user. The web service does not parse this value. 577 578 :type error_stack_trace: string 579 :param error_stack_trace: If an error occurred during the task, this 580 value specifies the stack trace associated with the error. This 581 value is set on the physical attempt object. It is used to display 582 error information to the user. The web service does not parse this 583 value. 584 585 """ 586 params = {'taskId': task_id, 'taskStatus': task_status, } 587 if error_id is not None: 588 params['errorId'] = error_id 589 if error_message is not None: 590 params['errorMessage'] = error_message 591 if error_stack_trace is not None: 592 params['errorStackTrace'] = error_stack_trace 593 return self.make_request(action='SetTaskStatus', 594 body=json.dumps(params)) 595 596 def validate_pipeline_definition(self, pipeline_objects, pipeline_id): 597 """ 598 Tests the pipeline definition with a set of validation checks 599 to ensure that it is well formed and can run without error. 600 601 :type pipeline_id: string 602 :param pipeline_id: Identifies the pipeline whose definition is to be 603 validated. 604 605 :type pipeline_objects: list 606 :param pipeline_objects: A list of objects that define the pipeline 607 changes to validate against the pipeline. 608 609 """ 610 params = { 611 'pipelineId': pipeline_id, 612 'pipelineObjects': pipeline_objects, 613 } 614 return self.make_request(action='ValidatePipelineDefinition', 615 body=json.dumps(params)) 616 617 def make_request(self, action, body): 618 headers = { 619 'X-Amz-Target': '%s.%s' % (self.TargetPrefix, action), 620 'Host': self.region.endpoint, 621 'Content-Type': 'application/x-amz-json-1.1', 622 'Content-Length': str(len(body)), 623 } 624 http_request = self.build_base_http_request( 625 method='POST', path='/', auth_path='/', params={}, 626 headers=headers, data=body) 627 response = self._mexe(http_request, sender=None, 628 override_num_retries=10) 629 response_body = response.read().decode('utf-8') 630 boto.log.debug(response_body) 631 if response.status == 200: 632 if response_body: 633 return json.loads(response_body) 634 else: 635 json_body = json.loads(response_body) 636 fault_name = json_body.get('__type', None) 637 exception_class = self._faults.get(fault_name, self.ResponseError) 638 raise exception_class(response.status, response.reason, 639 body=json_body) 640