AWS CloudFormation Template for creating EMR Cluster with Autoscaling, Cloudwatch metrics and Lambda

There are various ways we can spin up an EMR cluster such as

Manual approach:

  1. AWS console,
  2. using CLI command,

e.g.

as simple as :

aws emr create-cluster —name “Test Spark cluster” \

–release-label emr-5.7.0 —applications Name=Spark —ec2-attributes KeyName=test1_Ec2_keypair – – instance-type m4.xlarge —instance-count 3 —use-default-roles

with Advanced options:

aws emr create-cluster –release-label emr-5.7.0 –name EMR_Cluster20 –service-role EMR_DefaultRole –ec2-attributes  KeyName=test1_keypair  InstanceProfile=EMR_EC2_DefaultRole –auto-scaling-role EMR_AutoScaling_DefaultRole  –instance-groups ‘Name=MyMasterIG, InstanceGroupType=MASTER, InstanceType=m3.xlarge,InstanceCount=1’ ‘Name=MyCoreIG, InstanceGroupType=CORE, InstanceType=m3.xlarge, InstanceCount=2, AutoScalingPolicy={Constraints={MinCapacity=2, MaxCapacity=10},Rules=[{Name=Default-scale-out,Description=Replicates the default scale-out rule in the console., Action={SimpleScalingPolicyConfiguration={AdjustmentType=CHANGE_IN_CAPACITY, ScalingAdjustment=1, CoolDown=300}}, Trigger={CloudWatchAlarmDefinition={ComparisonOperator=LESS_THAN, EvaluationPeriods=1, MetricName=YARNMemoryAvailablePercentage, Namespace=AWS/ElasticMapReduce, Period=300, Statistic=AVERAGE, Threshold=15, Unit=PERCENT, Dimensions=[{Key=JobFlowId, Value=”${emr:cluster_id}”}]}}}]}’ –region us-east-1

Programmatic Approach:

  1. AWS SDK (java, python etc..)

I will cover this in a different document.

  1. AWS Cloudformation

Once we know of all the options and configurations to be used for an EMR cluster, it is then a lot easier to create and manage EMR cluster and all associated resources using AWS Cloudformation template. The biggest advantage here is you can use a single cloudformation template to create IAM Roles, Security Group, EMR cluster, Cloudwatch events and lambda function, and then when you want to shutdown the cluster by deleting the Cloudformation stack it will also delete all the resources created for EMR  cluster (IAM roles, SecurityGroup, Cloudwatch events, lambda etc..).  So you don’t have manually create and delete all the resources everytime thus result less human error.

This template will create the  following resources in sequence:

  1. Security group for EMR cluster
  2. EMR cluster with Autoscaling (enabled for both core and Task group)
  3. Lambda function to submit a step to EMR cluster whenever a step fails
  4. Cloudwatch Event to monitor EMR step (so when ever a step fails it will trigger the lambda function created in previous step)
  5. Submit a step to EMR cluster .
####################################################################

Clodformation Template (in Json)

{
“AWSTemplateFormatVersion”: “2010-09-09”,
“Description”: “Cloudformation Template to spin up EMR cluster with autoscaling”,
“Conditions”: {
“WithSpotPrice”: {
“Fn::Not”: [
{
“Fn::Equals”: [
{
“Ref”: “SpotPrice”
},
“0”
]
}
]
}
},
“Parameters”: {
“ApplicationName”: {
“Description”: “Name of Application “,
“Type”: “String”,
“Default”: “TestEMR_Cluster_usingCF”
},
“CoreInstanceCount”: {
“Description”: “Number of core instances”,
“Type”: “String”,
“Default”: “”
},
“EmrVersion”: {
“Description”: “Version of EMR”,
“Type”: “String”,
“Default”: “emr-5.7.0”
},
“MasterInstanceType”: {
“Description”: “Instance type of Master Node”,
“Type”: “String”,
“Default”: “m4.xlarge”
},
“CoreInstanceType”: {
“Description”: “Instance type of Core Node”,
“Type”: “String”,
“Default”: “m4.xlarge”
},
“TaskInstanceType”: {
“Description”: “Instance type of Core Node”,
“Type”: “String”,
“Default”: “m3.xlarge”
},
“S3LogKey”: {
“Description”: “s3 path for where logs will be stored”,
“Type”: “String”,
“Default”: “s3://myEmr-s3-testbucket2/”
},
“SSHKey”: {
“Description”: “SSH Suffix will be appended to VpcName”,
“Type”: “String”,
“Default”: “NorthVirginia”
},
“VPC”: {
“Description”: “VPC ID”,
“Type”: “String”,
“Default”: “vpc-6bdxx60c”
},
“VPCname”: {
“Description”: “Name of VPC”,
“Type”: “String”,
“Default”: “DefaultVPC”
},
“SubnetID”: {
“Description”: “Subnet ID”,
“Type”: “AWS::EC2::Subnet::Id”,
“Default”: “subnet-e75xx5ca”
},
“SpotPrice”: {
“Default”: “0.1”,
“Description”: “Spot price (or use 0 for ‘on demand’ instance)”,
“Type”: “Number”
}
},
“Resources”: {
“EMRClusterAdditionalSG”: {
“Type”: “AWS::EC2::SecurityGroup”,
“Properties”: {
“GroupDescription”: “Allow ssh and http connections”,
“VpcId”: {
“Ref”: “VPC”
},
“SecurityGroupIngress”: [
{
“IpProtocol”: “tcp”,
“FromPort”: “22”,
“ToPort”: “22”,
“CidrIp”: “54.240.193.1/32”
},
{
“IpProtocol”: “tcp”,
“FromPort”: “80”,
“ToPort”: “80”,
“CidrIp”: “54.240.193.1/32”
},
{
“IpProtocol”: “tcp”,
“FromPort”: “443”,
“ToPort”: “443”,
“CidrIp”: “54.240.193.1/32”
},
{
“IpProtocol”: “tcp”,
“FromPort”: “22”,
“ToPort”: “22”,
“SourceSecurityGroupId”: {
“Ref”: “BastionHostSecurityGroup”
}
}
],
“Tags”: [
{
“Key”: “Name”,
“Value”: “EMRClusterSG”
},
{
“Key”: “Application”,
“Value”: {
“Ref”: “ApplicationName”
}
},
{
“Key”: “AppID”,
“Value”: {
“Ref”: “ApplicationName”
}
}
]
}
},
“BastionHostSecurityGroup”: {
“Type”: “AWS::EC2::SecurityGroup”,
“Properties”: {
“GroupDescription”: “Allow ssh and http connections to Jump host”,
“VpcId”: {
“Ref”: “VPC”
},
“SecurityGroupIngress”: [
{
“IpProtocol”: “tcp”,
“FromPort”: “22”,
“ToPort”: “22”,
“CidrIp”: “54.240.193.1/32”
}
],
“Tags”: [
{
“Key”: “Name”,
“Value”: “EMRClusterSG_BastionHost”
},
{
“Key”: “Application”,
“Value”: {
“Ref”: “ApplicationName”
}
},
{
“Key”: “AppID”,
“Value”: {
“Ref”: “ApplicationName”
}
}
]
}
},
“MyEMRCluster”: {
“DependsOn”: [
“EMRClusterAdditionalSG”,
“BastionHostSecurityGroup”
],
“Type”: “AWS::EMR::Cluster”,
“Properties”: {
“Applications”: [
{
“Name”: “Hadoop”
},
{
“Name”: “Hive”
},
{
“Name”: “Pig”
},
{
“Name”: “Spark”
},
{
“Name”: “Zeppelin”
},
{
“Name”: “Hue”
}
],
“Configurations”: [
{
“Classification”: “core-site”,
“ConfigurationProperties”: {
“hadoop.rpc.protection”: “privacy”
}
}
],
“Instances”: {
“MasterInstanceGroup”: {
“InstanceCount”: 1,
“InstanceType”: {
“Ref”: “MasterInstanceType”
},
“Market”: “ON_DEMAND”,
“Name”: “Master”
},
“CoreInstanceGroup”: {
“InstanceCount”: {
“Ref”: “CoreInstanceCount”
},
“InstanceType”: {
“Ref”: “CoreInstanceType”
},
“AutoScalingPolicy”: {
“Constraints”: {
“MinCapacity”: 1,
“MaxCapacity”: 10
},
“Rules”: [
{
“Name”: “CoreNode-scale-out”,
“Description”: “CoreNode Scale-out based on HDFSUtilization”,
“Action”: {
“SimpleScalingPolicyConfiguration”: {
“AdjustmentType”: “CHANGE_IN_CAPACITY”,
“ScalingAdjustment”: 2,
“CoolDown”: 300
}
},
“Trigger”: {
“CloudWatchAlarmDefinition”: {
“ComparisonOperator”: “GREATER_THAN”,
“EvaluationPeriods”: 1,
“MetricName”: “HDFSUtilization”,
“Namespace”: “AWS/ElasticMapReduce”,
“Period”: 300,
“Threshold”: 70,
“Statistic”: “AVERAGE”,
“Unit”: “PERCENT”,
“Dimensions”: [
{
“Key”: “JobFlowId”,
“Value”: “${emr:clusterid}”
}
]
}
}
},
{
“Name”: “CoreNode-scale-in”,
“Description”: “CoreNode Scale-out based on HDFSUtilization”,
“Action”: {
“SimpleScalingPolicyConfiguration”: {
“AdjustmentType”: “CHANGE_IN_CAPACITY”,
“ScalingAdjustment”: 1,
“CoolDown”: 300
}
},
“Trigger”: {
“CloudWatchAlarmDefinition”: {
“ComparisonOperator”: “LESS_THAN”,
“EvaluationPeriods”: 1,
“MetricName”: “HDFSUtilization”,
“Namespace”: “AWS/ElasticMapReduce”,
“Period”: 300,
“Threshold”: 50,
“Statistic”: “AVERAGE”,
“Unit”: “PERCENT”,
“Dimensions”: [
{
“Key”: “JobFlowId”,
“Value”: “${emr:clusterid}”
}
]
}
}
}
]
},
“EbsConfiguration”: {
“EbsBlockDeviceConfigs”: [
{
“VolumeSpecification”: {
“SizeInGB”: “50”,
“VolumeType”: “gp2”
},
“VolumesPerInstance”: “1”
}
],
“EbsOptimized”: “true”
},
“Market”: “ON_DEMAND”,
“Name”: “Core”
},
“Ec2KeyName”: {
“Fn::Join”: [
“_”,
[
{
“Ref”: “SSHKey”
},
“keypair”
]
]
},
“AdditionalMasterSecurityGroups”: [
{
“Ref”: “BastionHostSecurityGroup”
},
{
“Ref”: “EMRClusterAdditionalSG”
}
],
“Ec2SubnetId”: {
“Ref”: “SubnetID”
},
“TerminationProtected”: false
},
“JobFlowRole”: “EMR_EC2_DefaultRole”,
“ServiceRole”: “EMR_DefaultRole”,
“AutoScalingRole”: “EMR_AutoScaling_DefaultRole”,
“LogUri”: {
“Ref”: “S3LogKey”
},
“Name”: {
“Ref”: “ApplicationName”
},
“ReleaseLabel”: “emr-5.7.0”,
“ScaleDownBehavior”: “TERMINATE_AT_INSTANCE_HOUR”,
“Tags”: [
{
“Key”: “AppID”,
“Value”: {
“Ref”: “ApplicationName”
}
},
{
“Key”: “Name”,
“Value”: “Test-EmrCluster-using_CloudFormation”
}
],
“VisibleToAllUsers”: true,
“BootstrapActions”: [
{
“Name”: “Dummy bootstrap action”,
“ScriptBootstrapAction”: {
“Args”: [],
“Path”: “s3://myS3bucket-testbucket2/emrjars/test_ba.sh”
}
}
]
}
},
“TestTaskInstanceGroupConfig”: {
“Type”: “AWS::EMR::InstanceGroupConfig”,
“Properties”: {
“InstanceCount”: 2,
“InstanceType”: {
“Ref”: “TaskInstanceType”
},
“InstanceRole”: “TASK”,
“Market”: “SPOT”,
“Name”: “TaskGroup1”,
“AutoScalingPolicy”: {
“Constraints”: {
“MaxCapacity”: “10”,
“MinCapacity”: “1”
},
“Rules”: [
{
“Name”: “TaskNode-scale-out”,
“Description”: “Task Node scale-outPolicy”,
“Action”: {
“SimpleScalingPolicyConfiguration”: {
“AdjustmentType”: “CHANGE_IN_CAPACITY”,
“CoolDown”: “300”,
“ScalingAdjustment”: “1”
}
},
“Trigger”: {
“CloudWatchAlarmDefinition”: {
“Dimensions”: [
{
“Key”: “JobFlowId”,
“Value”: “${emr.clusterId}”
}
],
“EvaluationPeriods”: 1,
“MetricName”: “YARNMemoryAvailablePercentage”,
“Namespace”: “AWS/ElasticMapReduce”,
“Period”: 300,
“ComparisonOperator”: “LESS_THAN”,
“Statistic”: “AVERAGE”,
“Threshold”: 20,
“Unit”: “PERCENT”
}
}
},
{
“Name”: “TaskNode-scale-in”,
“Description”: “TaskNode Scale-in policy”,
“Action”: {
“SimpleScalingPolicyConfiguration”: {
“AdjustmentType”: “CHANGE_IN_CAPACITY”,
“ScalingAdjustment”: -1,
“CoolDown”: 30
}
},
“Trigger”: {
“CloudWatchAlarmDefinition”: {
“Dimensions”: [
{
“Key”: “JobFlowId”,
“Value”: “${emr.clusterId}”
}
],
“EvaluationPeriods”: 1,
“MetricName”: “YARNMemoryAvailablePercentage”,
“Namespace”: “AWS/ElasticMapReduce”,
“Period”: 300,
“ComparisonOperator”: “GREATER_THAN”,
“Statistic”: “AVERAGE”,
“Threshold”: 75,
“Unit”: “PERCENT”
}
}
}
]
},
“BidPrice”: {
“Fn::If”: [
“WithSpotPrice”,
{
“Ref”: “SpotPrice”
},
{
“Ref”: “AWS::NoValue”
}
]
},
“JobFlowId”: {
“Ref”: “MyEMRCluster”
}
}
},
“LambdaExecutionRoleforEMR”: {
“Type”: “AWS::IAM::Role”,
“Properties”: {
“RoleName”: “LambdaRoleforEMRtest1”,
“AssumeRolePolicyDocument”: {
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Principal”: {
“Service”: [
“lambda.amazonaws.com”
]
},
“Action”: [
“sts:AssumeRole”
]
}
]
},
“Path”: “/”
}
},
“LambdaExecutionRoleforEMRPolicy”: {
“DependsOn”: [
“LambdaExecutionRoleforEMR”
],
“Type”: “AWS::IAM::Policy”,
“Properties”: {
“PolicyName”: “MyLambdaforEMRRolePolicytest1”,
“Roles”: [
{
“Ref”: “LambdaExecutionRoleforEMR”
}
],
“PolicyDocument”: {
“Version”: “2012-10-17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: [
“logs:CreateLogGroup”,
“logs:CreateLogStream”,
“logs:PutLogEvents”
],
“Resource”: “arn:aws:logs:*:*:*”
},
{
“Effect”: “Allow”,
“Action”: [
“elasticmapreduce:*”
],
“Resource”: “*”
}
]
}
}
},
“EmrCloudWatchEventLambda”: {
“Type”: “AWS::Lambda::Function”,
“DependsOn”: [
“LambdaExecutionRoleforEMR”,
“LambdaExecutionRoleforEMRPolicy”
],
“Properties”: {
“Code”: {
“S3Bucket”: “myS3bucket-testbucket2”,
“S3Key”: “lambda_testemrcode.zip”
},
“Role”: {
“Fn::GetAtt”: [
“LambdaExecutionRoleforEMR”,
“Arn”
]
},
“Timeout”: 60,
“Handler”: “lambda_testemrcode.lambda_handler”,
“Runtime”: “python3.6”,
“MemorySize”: 128
}
},
“EventRuleEMRtest”: {
“DependsOn”: [
“MyEMRCluster”
],
“Type”: “AWS::Events::Rule”,
“Properties”: {
“Description”: “CloudWatch Event Rule for EMR to trigger Lambda on a step failure”,
“EventPattern”: {
“source”: [
“aws.emr”
],
“detail-type”: [
“EMR Step Status Change”
],
“detail”: {
“state”: [
“FAILED”
],
“clusterId”: [
{
“Ref”: “MyEMRCluster”
}
]
}
},
“State”: “ENABLED”,
“Targets”: [
{
“Arn”: {
“Fn::GetAtt”: [
“EmrCloudWatchEventLambda”,
“Arn”
]
},
“Id”: “TargetFunctionV1”
}
]
}
},
“PermissionForEventsToInvokeLambda”: {
“Type”: “AWS::Lambda::Permission”,
“Properties”: {
“FunctionName”: {
“Ref”: “EmrCloudWatchEventLambda”
},
“Action”: “lambda:InvokeFunction”,
“Principal”: “events.amazonaws.com”,
“SourceArn”: {
“Fn::GetAtt”: [
“EventRuleEMRtest”,
“Arn”
]
}
}
},
“FirstStep”: {
“DependsOn”: [
“MyEMRCluster”,
“EventRuleEMRtest”
],
“Properties”: {
“ActionOnFailure”: “CONTINUE”,
“HadoopJarStep”: {
“Args”: [
“spark-submit”,
“–executor-memory”,
“1g”,
“–class”,
“org.apache.spark.examples.SparkPi”,
“/usr/lib/spark/examples/jars/spark-examples.jar”,
“100”
],
“Jar”: “command-runner.jar”
},
“JobFlowId”: {
“Ref”: “MyEMRCluster”
},
“Name”: “FirstTestStep”
},
“Type”: “AWS::EMR::Step”
}
}
}

Lambda Code:

lambda_testemrcode.py

import json
import boto3
print(‘Loading function’)
def lambda_handler(event, context):
print(“Received event: ” + json.dumps(event, indent=2))
cluster_id = event[‘detail’][‘clusterId’]
print (“Cluster ID is ” + cluster_id)
# raise Exception(‘Something went wrong’)

# code to add step
Step_args = “spark-submit –deploy-mode cluster –master yarn –executor-memory 768M –num-executors 4 –class org.apache.spark.examples.SparkPi /usr/lib/spark/examples/jars/spark-examples.jar 100000”

Step_args_list = Step_args.split()
taskrunnerEMRstep = [
{
‘Name’: ‘Test SparkJob’,
‘ActionOnFailure’: ‘CONTINUE’,
‘HadoopJarStep’: {
‘Jar’: ‘command-runner.jar’,
‘Args’: Step_args_list
}
}
]

client = boto3.client(’emr’)
print (‘submitting Step to cluster’ + cluster_id)
Response = client.add_job_flow_steps(JobFlowId=cluster_id, Steps=taskrunnerEMRstep)
print(“Add Step Response: ” + json.dumps(Response, indent=2))
# print (‘event Details’)
return event


Comments

Leave a Reply

Your email address will not be published. Required fields are marked *