14
DecIn any real-world application, data needs to flow across several stages and services. In the Amazon Cloud environment, AWS Data Pipeline service makes this dataflow possible between these different services. It enables automation of data-driven workflows.
AWS Data Pipelines consists of the following basic components:
DataNodes – represent data stores for input and output data. DataNodes can be of various types depending on the backend AWS Service used for data storage. Examples include:
Activities – specify the task to be performed on data stored in datanodes. Different types of activities are provided depending on the application. These include:
The Activities are executed on their respective Resources, namely EC2Resource or EmrCluster depending on the nature of the activity.
AWS Data Pipelines can be created using the Data Pipeline console (https://console.aws.amazon.com/datapipeline/)
Creating a basic AWS Pipeline in the console involves the following steps:
Read: AWS vs. Heroku vs. Firebase - Which one is the best?
illustrate UseCases
Pipeline definition File Samples
Sample 1:
{
"objects": [
{
"myComment" : "Activity to run the hive script to export data to CSV",
"output": {
"ref": "DataNodeId_pnzAW"
},
"input": {
"ref": "DataNodeId_cERqb"
},
"name": "TableBackupActivity",
"hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{myS3ColMapping})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'\nLOCATION '#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}';\n \nINSERT OVERWRITE TABLE s3Table SELECT * FROM tempTable;",
"runsOn": { "ref" : "EmrCluster1
" },
"id": "TableBackupActivity",
"type": "HiveActivity"
},
{
"period": "1 days",
"name": "Every 1 day",
"id": "DefaultSchedule",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
},
{
"myComment" : "The DynamoDB table from which data is exported",
"dataFormat": {
"ref": "dynamoExportFormat"
},
"name": "DynamoDB",
"id": "DataNodeId_cERqb",
"type": "DynamoDBDataNode",
"tableName": "#{dynamoTableName}"
},
{
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{logUri}",
"scheduleType": "cron",
"name": "Default",
"id": "Default"
},
{
"name": "EmrCluster1",
"coreInstanceType": "m1.medium",
"coreInstanceCount": "1",
"masterInstanceType": "m1.medium",
"amiVersion": "3.3.2",
"id": "EmrCluster1",
"type": "EmrCluster",
"terminateAfter": "1 Hour"
},
{
"myComment" : "The S3 path to which we export data to",
"directoryPath": "#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}/",
"dataFormat": {
"ref": "DataFormatId_xqWRk"
},
"name": "S3DataNode",
"id": "DataNodeId_pnzAW",
"type": "S3DataNode"
},
{
"myComment" : "Format for the S3 Path",
"name": "DefaultDataFormat1",
"column": "not_used STRING",
"id": "DataFormatId_xqWRk",
"type": "CSV"
},
{
"myComment" : "Format for the DynamoDB table",
"name": "dynamoExportFormat",
"id": "dynamoExportFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
}
],
"parameters": [
{
"description": "Output S3 folder",
"id": "targetS3Bucket",
"type": "AWS::S3::ObjectKey"
},
{
"description": "DynamoDB table name",
"id": "dynamoTableName",
"type": "String"
},
{
"description": "S3 to DynamoDB Column Mapping",
"id": "dynamoTableColMapping",
"type": "String"
},
{
"description": "S3 Column Mappings",
"id": "myS3ColMapping",
"type": "String"
},
{
"description": "DataPipeline Log Uri",
"id": "logUri",
"type": "String"
}
]
}
Sample 2:
{
"objects": [
{
"myComment" : "Activity used to run hive script to import CSV data",
"output": {
"ref": "DataNodeId_pnzAW"
},
"input": {
"ref": "DataNodeId_cERqb"
},
"name": "TableRestoreActivity",
"hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{dynamoColDefn})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{sourceS3Bucket}';\n \nINSERT OVERWRITE TABLE tempTable SELECT * FROM s3Table;",
"id": "TableRestoreActivity",
"runsOn": { "ref" : "EmrClusterForRestore" },
"stage": "false",
"type": "HiveActivity"
},
{
"myComment" : "The DynamoDB table from which we need to import data from",
"dataFormat": {
"ref": "DDBExportFormat"
},
"name": "DynamoDB",
"id": "DataNodeId_cERqb",
"type": "DynamoDBDataNode",
"tableName": "#{dynamoTableName}"
},
{
"failureAndRerunMode": "CASCADE",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{logUri}",
"scheduleType": "ONDEMAND",
"name": "Default",
"id": "Default"
},
{
"name": "EmrClusterForRestore",
"coreInstanceType": "m1.medium",
"coreInstanceCount": "1",
"masterInstanceType": "m1.medium",
"releaseLabel": "emr-4.4.0",
"id": "EmrClusterForRestore",
"type": "EmrCluster",
"terminateAfter": "1 Hour"
},
{
"myComment" : "The S3 path from which data is imported",
"directoryPath": "#{sourceS3Bucket}",
"dataFormat": {
"ref": "DataFormatId_xqWRk"
},
"name": "S3DataNode",
"id": "DataNodeId_pnzAW",
"type": "S3DataNode"
},
{
"myComment" : "Format for the S3 Path",
"name": "DefaultDataFormat1",
"column": "not_used STRING",
"id": "DataFormatId_xqWRk",
"type": "CSV"
},
{
"myComment" : "Format for the DynamoDB table",
"name": "DDBExportFormat",
"id": "DDBExportFormat",
"column": "not_used STRING",
"type": "DynamoDBExportDataFormat"
}
],
"parameters": [
{
"description": "Input S3 folder",
"id": "sourceS3Bucket",
"default": "s3://aws-datapipeline-csv/",
"type": "AWS::S3::ObjectKey"
},
{
"description": "DynamoDB table name",
"id": "dynamoTableName",
"type": "String"
},
{
"description": "S3 to DynamoDB Column Mapping",
"id": "dynamoTableColMapping",
"default" : "id:id,age:age,job:job,education:education ",
"type": "String"
},
{
"description": "S3 Column Mappings",
"id": "myS3ColMapping",
"default" : "id string,age int,job string,education string",
"type": "String"
},
{
"description": "DynamoDB Column Mappings",
"id": "dynamoColDefn",
"default" : "id string,age bigint,job string, education string ",
"type": "String"
},
{
"description": "DataPipeline Log Uri",
"id": "logUri",
"type": "AWS::S3::ObjectKey"
}
]
}
Sample 3:
{
"objects": [
{
"myComment": "Default configuration for objects in the pipeline.",
"name": "Default",
"id": "Default",
"failureAndRerunMode": "CASCADE",
"schedule": {
"ref": "DefaultSchedule"
},
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"pipelineLogUri": "#{logUri}",
"scheduleType": "cron"
},
{
"myComment": "Connection details for the Redshift cluster.",
"name": "DefaultDatabase1",
"id": "DatabaseId_Kw6D8",
"connectionString": "#{myConnectionString}",
"databaseName": "#{myRedshiftDatabase}",
"*password": "#{myRedshiftPassword}",
"type": "RedshiftDatabase",
"username": "#{myRedshiftUsername}"
},
{
"myComment": "This object is used to provide the resource where the copy job is invoked.",
"name": "DefaultResource1",
"id": "ResourceId_idL0Y",
"resourceRole": "DataPipelineDefaultResourceRole",
"role": "DataPipelineDefaultRole",
"type": "Ec2Resource",
"terminateAfter": "1 Hour"
},
{
"myComment": "This object is used to specify the copy activity for moving data from DynamoDB to Redshift.",
"name": "CopyFromDDBToRedshift",
"id": "ActivityId_asVf6",
"database": {
"ref": "DatabaseId_Kw6D8"
},
"runsOn": {
"ref": "ResourceId_idL0Y"
},
"type": "SqlActivity",
"script": "#{myScript}"
},
{
"myComment": "This object is used to control the task schedule.",
"name": "RunOnce",
"id": "DefaultSchedule",
"occurrences": "1",
"period": "2 Days",
"type": "Schedule",
"startAt": "FIRST_ACTIVATION_DATE_TIME"
}
],
"parameters": []
}
Advanced Concepts of AWS Data Pipeline
Read: Difference between AWS Inspector and AWS Trusted Advisor
Precondition – A precondition specifies a condition which must evaluate to tru for an activity to be executed. For example Presence of Source Data Table or S3 bucket prior to performing operations on it.
AWS Data Pipeline provides certain prebuilt Precondition elements such as :
Action – Actions are event handlers which are executed in response to pipeline events.
Examples include:
For example, if a specific pipeline component executes for a time greater than maximum configured value, a SNSAlarm can be sent to the administrator. AWS Data Pipeline provides event handlers on pipeline components such as onSuccess, OnFail, and onLateAction where these actions can be made use of.
Let us take a look at a simple program in R which prints “Hello World.” This can be accomplished either from the command line in the R interpreter or via an R script. Let us look at both mechanisms.
Read: AWS S3 Tutorial Guide for Beginner
Instance and Attempts: When a pipeline is executed, the pipeline components defined in pipeline definition file are compiled to set of executable instances. For fault tolerance, any instance that undergoes failure is retried multiple times as per its configuration limit. These retries are tracked using Attempt objects corresponding to an instance.
Create pipeline:
$ aws datapipeline create-pipeline --name mypipeline --unique-id mypipeline
{
"pipelineId": "df-032468332JABJ6QUWCR6"
}
Associate pipeline with pipeline definition file:
$ aws datapipeline put-pipeline-definition --pipeline-id df-032468332JABJ6QUWCR6 --pipeline-definition file://mypipeline.json --parameter-values myS3LogsPath="s3://<s3 bucket path>"
{
"validationErrors": [],
"validationWarnings": [],
"errored": false
}
Activate pipeline:
$> aws datapipeline activate-pipeline --pipeline-id df-032468332JABJ6QUWCR6
Runtime monitoring of pipeline:
$ aws datapipeline list-runs --pipeline-id df-032468332JABJ6QUWCR6
# Name Scheduled Start Status
# ID Started Ended
# ---------------------------------------------------------------------------------------------------
# 1. EC2Resource_mypipeline 2019-04-14T12:51:36 RUNNING
# @EC2Resource_mypipeline_2019-04-14T16:51:56 2019-04-14T12:51:39
#
# 2. ShellCommandActivity_mypipeline 2019-04-14T12:51:36 WAITING_FOR_RUNNER
# @ShellCommandActivity_mypipeline_2019-04-14T16:51: 2019-04-14T12:51:39
Diagnostics/Troubleshooting:
AWS CloudTrail captures all API calls for AWS Data Pipeline as events. These events can be streamed to a target S3 bucket by creating a trail from the AWS console. A CloudTrail event represents a single request from any source and includes information about the requested action, the date and time of the action, request parameters, and so on. This information is quite useful when diagnosing a problem on a configured data pipeline at runtime. The log entry format is as follows:
{
"Records": [
{
"eventVersion": "1.0",
"userIdentity": {
"type": "Root",
"principalId": "12120",
"arn": "arn:aws:iam::user-account-id:root",
"accountId": "user-account-id",
"accessKeyId": "user-access-key"
},
"eventTime": "2019-04-13T19:15:15Z",
"eventSource": "datapipeline.amazonaws.com",
"eventName": "CreatePipeline",
"awsRegion": "us-west-1",
"sourceIPAddress": "72.21.196.64",
"userAgent": "aws-cli/1.5.2 Python/2.7.5 Darwin/13.4.0",
"requestParameters": {
"name": "demopipeline",
"uniqueId": "myunique"
},
"responseElements": {
"pipelineId": "df-03265491AP32SAMPLE"
},
"requestID": "352ba1e2-6c21-11dd-8816-c3c09bc09c8a",
"eventID": "9f99dce0-0732-49b6-baaa-e77943195632",
"eventType": "AwsApiCall",
"recipientAccountId": "user-account-id"
},
...
]
}
Conclusion:
AWS Data Pipeline is a powerful service in the Amazon Cloud Environment for creating powerful data workflows while making leveraging all kinds of storage and compute resources available in the ecosystem. Data Pipeline makes it feasible to design big data applications involving several terabytes of data from varied sources to be analysed systematically on the cloud. Any technologist working on data analytics in the cloud space should try to acquire skills related to this service.
JanBask Training is a leading Global Online Training Provider through Live Sessions. The Live classes provide a blended approach of hands on experience along with theoretical knowledge which is driven by certified professionals.
AWS
DevOps
Data Science
Hadoop
Salesforce
Course for testing
QA
Business Analyst
SQL Server
Search Posts
Trending Posts
Related Posts
Receive Latest Materials and Offers on AWS Course
Interviews