Today's Offer - AWS Certification Training - Enroll at Flat 10% Off.

- AWS Blogs -

What is AWS Data Pipeline? AWS Data Pipeline Tutorial Guide

What is AWS Data Pipeline?

In any real-world application, data needs to flow across several stages and services. In the Amazon Cloud environment, AWS Data Pipeline service makes this dataflow possible between these different services. It enables automation of data-driven workflows.

Getting started with AWS Data Pipeline

AWS Data Pipelines consists of the following basic components:

  • DataNodes
  • Activities

AWS Data Pipeline DataNodes – represent data stores for input and output data. DataNodes can be of various types depending on the backend AWS Service used for data storage. Examples include:

  • DynamoDBDataNode
  • SqlDataNode
  • RedShiftDataNode
  • S3DataNode

Activities – specify the task to be performed on data stored in datanodes. Different types of activities are provided depending on the application. These include:

  • CopyActivity
  • RedShiftCopyActivity
  • SqlActivity
  • ShellCommandActivity
  • EmrActivity
  • HiveActivity
  • HiveCopyActivity
  • PigActivity

The Activities are executed on their respective Resources, namely EC2Resource or EmrCluster depending on the nature of the activity.

AWS Data Pipelines can be created using the Data Pipeline console (https://console.aws.amazon.com/datapipeline/)

Creating a basic AWS Pipeline in the console involves the following steps:

Read: AWS vs. Heroku vs. Firebase - Which one is the best?
  • Select ‘Region’ from the dropdown
  • Enter the Name for the pipeline
  • Enter a Description
  • If you are using a template for building, select the template name in the ‘Source’ segment.
  • Specify the Parameters for the selected template.
  • Configure the schedule for pipeline execution including repetition frequency and termination criteria.
  • Specify ‘Pipeline Configuration’ parameters and Security details.
  • Activate the pipeline. The pipeline definition gets saved in a JSON based format called ‘pipeline definition file.’
  • The monitor runtime behavior of pipeline components on the console
  • Once the pipeline run completes, the generated output can be analyzed from the target DataNode.

AWS Curriculum

illustrate UseCases

  • Process input data stored in S3 bucket and store it in an AWS RDS database
  • Source data from CSV files in S3 and DynamoDB data on the cloud and create a data warehouse on AWS RedShift
  • Analyze multiple text files on S3 buckets using Hadoop cluster on AWS EMR.

Pipeline definition File  Samples

Sample 1:


{
  "objects": [
    {
      "myComment" : "Activity to run the hive script to export data to CSV",
      "output": {
        "ref": "DataNodeId_pnzAW"
      },
      "input": {
        "ref": "DataNodeId_cERqb"
      },
      "name": "TableBackupActivity",
      "hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{myS3ColMapping})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n                    \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'\nLOCATION '#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}';\n                    \nINSERT OVERWRITE TABLE s3Table SELECT * FROM tempTable;",
      "runsOn": { "ref" : "EmrCluster1
" },
      "id": "TableBackupActivity",
      "type": "HiveActivity"
    },
    {
      "period": "1 days",
      "name": "Every 1 day",
      "id": "DefaultSchedule",
      "type": "Schedule",
      "startAt": "FIRST_ACTIVATION_DATE_TIME"
    },
    {
      "myComment" : "The DynamoDB table from which data is exported",
      "dataFormat": {
        "ref": "dynamoExportFormat"
      },
      "name": "DynamoDB",
      "id": "DataNodeId_cERqb",
      "type": "DynamoDBDataNode",
      "tableName": "#{dynamoTableName}"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{logUri}",
      "scheduleType": "cron",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "EmrCluster1",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "amiVersion": "3.3.2",
      "id": "EmrCluster1",
      "type": "EmrCluster",
      "terminateAfter": "1 Hour"
    },
    {
      "myComment" : "The S3 path to which we export data to",
      "directoryPath": "#{targetS3Bucket}/#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}/",
      "dataFormat": {
        "ref": "DataFormatId_xqWRk"
      },
      "name": "S3DataNode",
      "id": "DataNodeId_pnzAW",
      "type": "S3DataNode"
    },
    {
      "myComment" : "Format for the S3 Path",
      "name": "DefaultDataFormat1",
      "column": "not_used STRING",
      "id": "DataFormatId_xqWRk",
      "type": "CSV"
    },
    {
      "myComment" : "Format for the DynamoDB table",
      "name": "dynamoExportFormat",
      "id": "dynamoExportFormat",
      "column": "not_used STRING",
      "type": "DynamoDBExportDataFormat"
    }
  ],
  "parameters": [
    {
      "description": "Output S3 folder",
      "id": "targetS3Bucket",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "DynamoDB table name",
      "id": "dynamoTableName",
      "type": "String"
    },
    {
      "description": "S3 to DynamoDB Column Mapping",
      "id": "dynamoTableColMapping",
      "type": "String"
    },
    {
      "description": "S3 Column Mappings",
      "id": "myS3ColMapping",
      "type": "String"
    },
    {
      "description": "DataPipeline Log Uri",
      "id": "logUri",
      "type": "String"
    }
  ]
}

Sample 2:


{
  "objects": [
    {
      "myComment" : "Activity used to run hive script to import CSV data",
      "output": {
        "ref": "DataNodeId_pnzAW"
      },
      "input": {
        "ref": "DataNodeId_cERqb"
      },
      "name": "TableRestoreActivity",
      "hiveScript": "DROP TABLE IF EXISTS tempTable;\n\nDROP TABLE IF EXISTS s3Table;\n\nCREATE EXTERNAL TABLE tempTable (#{dynamoColDefn})\nSTORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' \nTBLPROPERTIES (\"dynamodb.table.name\" = \"#{dynamoTableName}\", \"dynamodb.column.mapping\" = \"#{dynamoTableColMapping}\");\n                    \nCREATE EXTERNAL TABLE s3Table (#{myS3ColMapping})\nROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\\n' LOCATION '#{sourceS3Bucket}';\n                    \nINSERT OVERWRITE TABLE tempTable SELECT * FROM s3Table;",
      "id": "TableRestoreActivity",
      "runsOn": { "ref" : "EmrClusterForRestore" },
      "stage": "false",
      "type": "HiveActivity"
    },
    {
      "myComment" : "The DynamoDB table from which we need to import data from",
      "dataFormat": {
        "ref": "DDBExportFormat"
      },
      "name": "DynamoDB",
      "id": "DataNodeId_cERqb",
      "type": "DynamoDBDataNode",
      "tableName": "#{dynamoTableName}"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{logUri}",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "name": "EmrClusterForRestore",
      "coreInstanceType": "m1.medium",
      "coreInstanceCount": "1",
      "masterInstanceType": "m1.medium",
      "releaseLabel": "emr-4.4.0",
      "id": "EmrClusterForRestore",
      "type": "EmrCluster",
      "terminateAfter": "1 Hour"
    },
    {
      "myComment" : "The S3 path from which data is imported",
      "directoryPath": "#{sourceS3Bucket}",
      "dataFormat": {
        "ref": "DataFormatId_xqWRk"
      },
      "name": "S3DataNode",
      "id": "DataNodeId_pnzAW",
      "type": "S3DataNode"
    },
    {
      "myComment" : "Format for the S3 Path",
      "name": "DefaultDataFormat1",
      "column": "not_used STRING",
      "id": "DataFormatId_xqWRk",
      "type": "CSV"
    },
    {
      "myComment" : "Format for the DynamoDB table",
      "name": "DDBExportFormat",
      "id": "DDBExportFormat",
      "column": "not_used STRING",
      "type": "DynamoDBExportDataFormat"
    }
  ],
  "parameters": [
    {
      "description": "Input S3 folder",
      "id": "sourceS3Bucket",
      "default": "s3://aws-datapipeline-csv/",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "description": "DynamoDB table name",
      "id": "dynamoTableName",
      "type": "String"
    },
    {
      "description": "S3 to DynamoDB Column Mapping",
      "id": "dynamoTableColMapping",
      "default" : "id:id,age:age,job:job,education:education ",
      "type": "String"
    },
    {
      "description": "S3 Column Mappings",
      "id": "myS3ColMapping",
      "default" : "id string,age int,job string,education string",
      "type": "String"
    },
    {
      "description": "DynamoDB Column Mappings",
      "id": "dynamoColDefn",
      "default" : "id string,age bigint,job string, education string ",
      "type": "String"
    },
    {
      "description": "DataPipeline Log Uri",
      "id": "logUri",
      "type": "AWS::S3::ObjectKey"
    }
  ]
}

Sample 3:


{
  "objects": [
    {
      "myComment": "Default configuration for objects in the pipeline.",

      "name": "Default",
      "id": "Default",
      "failureAndRerunMode": "CASCADE",
      "schedule": {
        "ref": "DefaultSchedule"
      },
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "#{logUri}",
      "scheduleType": "cron"
    },
    {
      "myComment": "Connection details for the Redshift cluster.",

      "name": "DefaultDatabase1",
      "id": "DatabaseId_Kw6D8",
      "connectionString": "#{myConnectionString}",
      "databaseName": "#{myRedshiftDatabase}",
      "*password": "#{myRedshiftPassword}",
      "type": "RedshiftDatabase",
      "username": "#{myRedshiftUsername}"
    },
    {
      "myComment": "This object is used to provide the resource where the copy job is invoked.",
      
      "name": "DefaultResource1",
      "id": "ResourceId_idL0Y",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "type": "Ec2Resource",
      "terminateAfter": "1 Hour"
    },
    {
      "myComment": "This object is used to specify the copy activity for moving data from DynamoDB to Redshift.",

      "name": "CopyFromDDBToRedshift",
      "id": "ActivityId_asVf6",
      "database": {
        "ref": "DatabaseId_Kw6D8"
      },      
      "runsOn": {
        "ref": "ResourceId_idL0Y"
      },
      "type": "SqlActivity",
      "script": "#{myScript}"
    },
    {
      "myComment": "This object is used to control the task schedule.", 

      "name": "RunOnce",
      "id": "DefaultSchedule",
      "occurrences": "1",
      "period": "2 Days",
      "type": "Schedule",
      "startAt": "FIRST_ACTIVATION_DATE_TIME"
    }   
  ],
  "parameters": []
}

Advanced Concepts of AWS Data Pipeline

Read: Difference between AWS Inspector and AWS Trusted Advisor

Precondition – A precondition specifies a condition which must evaluate to tru for an activity to be executed. For example Presence of Source Data Table or S3 bucket prior to performing operations on it.

AWS Data Pipeline provides certain prebuilt Precondition elements such as :

  • DynamoDBDataExists
  • DynamoDBTableExists
  • S3KeyExists
  • S3PrefixNotEmpty

Action – Actions are event handlers which are executed in response to pipeline events.

Examples include:

  • SnsAlarm
  • Terminate

AWS Quiz

For example, if a specific pipeline component executes for a time greater than maximum configured value, a SNSAlarm can be sent to the administrator. AWS Data Pipeline provides event handlers on pipeline components such as onSuccess, OnFail, and onLateAction where these actions can be made use of.

Let us take a look at a simple program in R which prints “Hello World.” This can be accomplished either from the command line in the R interpreter or via an R script. Let us look at both mechanisms.

Read: AWS S3 Tutorial Guide for Beginner

Instance and Attempts: When a pipeline is executed, the pipeline components defined in pipeline definition file are compiled to set of executable instances. For fault tolerance, any instance that undergoes failure is retried multiple times as per its configuration limit. These retries are tracked using Attempt objects corresponding to an instance. AWS Data Pipeline

AWS Data Pipeline Command Line Interface


Create pipeline:
$ aws datapipeline create-pipeline --name mypipeline --unique-id mypipeline 

 {
     "pipelineId": "df-032468332JABJ6QUWCR6"
 }

Associate pipeline with pipeline definition file:
$ aws datapipeline put-pipeline-definition --pipeline-id df-032468332JABJ6QUWCR6 --pipeline-definition file://mypipeline.json --parameter-values myS3LogsPath="s3://<s3 bucket path>"

 {
     "validationErrors": [],
     "validationWarnings": [],
     "errored": false
 }

Activate pipeline:
$> aws datapipeline activate-pipeline --pipeline-id df-032468332JABJ6QUWCR6

Runtime monitoring of pipeline:
$ aws datapipeline list-runs --pipeline-id df-032468332JABJ6QUWCR6
#          Name                                                Scheduled Start      Status                 
#          ID                                                  Started              Ended              
#   ---------------------------------------------------------------------------------------------------
#      1.  EC2Resource_mypipeline                              2019-04-14T12:51:36  RUNNING                
#          @EC2Resource_mypipeline_2019-04-14T16:51:56         2019-04-14T12:51:39                     
#   
#      2.  ShellCommandActivity_mypipeline                     2019-04-14T12:51:36  WAITING_FOR_RUNNER     
#          @ShellCommandActivity_mypipeline_2019-04-14T16:51:  2019-04-14T12:51:39   

Diagnostics/Troubleshooting:

AWS CloudTrail captures all API calls for AWS Data Pipeline as events. These events can be streamed to a target S3 bucket by creating a trail from the AWS console. A CloudTrail event represents a single request from any source and includes information about the requested action, the date and time of the action, request parameters, and so on. This information is quite useful when diagnosing a problem on a configured data pipeline at runtime. The log entry format is as follows:


{
  "Records": [
    {
      "eventVersion": "1.0",
      "userIdentity": {
        "type": "Root",
        "principalId": "12120",
        "arn": "arn:aws:iam::user-account-id:root",
        "accountId": "user-account-id",
        "accessKeyId": "user-access-key"
      },
      "eventTime": "2019-04-13T19:15:15Z",
      "eventSource": "datapipeline.amazonaws.com",
      "eventName": "CreatePipeline",
      "awsRegion": "us-west-1",
      "sourceIPAddress": "72.21.196.64",
      "userAgent": "aws-cli/1.5.2 Python/2.7.5 Darwin/13.4.0",
      "requestParameters": {
        "name": "demopipeline",
        "uniqueId": "myunique"
      },
      "responseElements": {
        "pipelineId": "df-03265491AP32SAMPLE"
      },
      "requestID": "352ba1e2-6c21-11dd-8816-c3c09bc09c8a",
      "eventID": "9f99dce0-0732-49b6-baaa-e77943195632",
      "eventType": "AwsApiCall",
      "recipientAccountId": "user-account-id"
    }, 
      ...
  ]
}

Conclusion:

AWS Data Pipeline is a powerful service in the Amazon Cloud Environment for creating powerful data workflows while making leveraging all kinds of storage and compute resources available in the ecosystem. Data Pipeline makes it feasible to design big data applications involving several terabytes of data from varied sources to be analysed systematically on the cloud. Any technologist working on data analytics in the cloud space should try to acquire skills related to this service.


    Janbask Training

    JanBask Training is a leading Global Online Training Provider through Live Sessions. The Live classes provide a blended approach of hands on experience along with theoretical knowledge which is driven by certified professionals.


Trending Courses

AWS

  • AWS & Fundamentals of Linux
  • Amazon Simple Storage Service
  • Elastic Compute Cloud
  • Databases Overview & Amazon Route 53

Upcoming Class

8 days 14 Dec 2019

DevOps

  • Intro to DevOps
  • GIT and Maven
  • Jenkins & Ansible
  • Docker and Cloud Computing

Upcoming Class

9 days 15 Dec 2019

Data Science

  • Data Science Introduction
  • Hadoop and Spark Overview
  • Python & Intro to R Programming
  • Machine Learning

Upcoming Class

9 days 15 Dec 2019

Hadoop

  • Architecture, HDFS & MapReduce
  • Unix Shell & Apache Pig Installation
  • HIVE Installation & User-Defined Functions
  • SQOOP & Hbase Installation

Upcoming Class

-0 day 06 Dec 2019

Salesforce

  • Salesforce Configuration Introduction
  • Security & Automation Process
  • Sales & Service Cloud
  • Apex Programming, SOQL & SOSL

Upcoming Class

14 days 20 Dec 2019

Course for testing

  • Salesforce Configuration Introduction
  • Security & Automation Process
  • Sales & Service Cloud
  • Apex Programming, SOQL & SOSL

Upcoming Class

18 days 24 Dec 2019

QA

  • Introduction and Software Testing
  • Software Test Life Cycle
  • Automation Testing and API Testing
  • Selenium framework development using Testing

Upcoming Class

3 days 09 Dec 2019

Business Analyst

  • BA & Stakeholders Overview
  • BPMN, Requirement Elicitation
  • BA Tools & Design Documents
  • Enterprise Analysis, Agile & Scrum

Upcoming Class

-0 day 06 Dec 2019

SQL Server

  • Introduction & Database Query
  • Programming, Indexes & System Functions
  • SSIS Package Development Procedures
  • SSRS Report Design

Upcoming Class

3 days 09 Dec 2019

Comments

Search Posts

Reset

Receive Latest Materials and Offers on AWS Course

Interviews