Migrating to Amazon Elastic Map Reduce (EMR)

I recently migrated some of our data pipelines from our local Ambari manged cluster to Amazon Elastic Map Reduce to take advantage of the great cluster startup times, allowing scalable bootstrapping of clusters as necessary (and their subsequent termination).

The process was actually more difficult than I anticipated. The overview page describes lots of magical tools including hive and sqoop, but when it comes to implementing them, you're basically on your own... well not really, on your own with Amazon support (which is awesome). At times, the docs are non-existant.

Below, I've tried to document some of the magic I learned as I went through the migration process. Hope it helps you make your data pipelines faster and more resource efficent.

Starting a cluster

I wrapped my Amazon EMR app in Ruby and Rake, so I can easily configure the cluster options, but at the core is this command. This is what I use to launch the cluster, specifying the bootstrap actions and steps.

config/emr.rb:

module Config
  class EMR
    def initialize opts={}
      @root_path = File.expand_path(File.join(File.dirname(__FILE__), '..'))
      @shared_path = File.join(@root_path, 'config/aws')
      @bucket = opts[:bucket]||'example-etl-app'
      raise 'Specify an :app name in opts' if (@app = opts[:app]).nil?
    end

    def cfg file, opts={}
      if opts[:shared].nil?
        File.join(@root_path, "#{@app}-workflow", 'aws', file)
      else
        File.join(@shared_path, file)
      end
    end

    def bucket
      @bucket
    end

    def create_cluster
      bootstrap_actions =
        if File.exists?(bootstrap_file = cfg('bootstrap.json'))
          "--bootstrap-actions file://#{bootstrap_file}"
        else
          ""
        end
      # http://docs.aws.amazon.com/cli/latest/reference/emr/create-cluster.html
      cmd = <<-EOS
        aws emr --debug \
        create-cluster --ami-version=3.3.0 \
        --log-uri s3://#{bucket}/log \
        --enable-debugging \
        --ec2-attributes file://#{cfg('ec2_attributes_emr.json', shared: true)} \
        --applications Name=Hue Name=Hive Name=Pig \
        --use-default-roles \
        --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=m3.xlarge InstanceGroupType=CORE,InstanceCount=2,InstanceType=m1.large \
        --steps file://#{cfg('steps.json')} #{bootstrap_actions}
      EOS
      puts cmd
      system cmd
    end

    def self.run_cluster app
      emr = Config::EMR.new app: app
      emr.create_cluster
    end

  end
end

Documentation for the AWS CLI EMR is here: http://docs.aws.amazon.com/cli/latest/reference/emr/index.html

A workflow is a series of steps and can now be run with rake:

emr.rake:

require './config/emr.rb'
desc "run the cluster"
task :emr do |t, args|
  %w/ an_etl another_etl /.each do |app|
    Config::EMR.run_cluster app
  end
end

In my project folder, each pipeline is organized into folders named:

my-cool-app/
  first-pipeline-workflow/
    aws/
      steps.json
      bootstrap.json
    hive/
      transform-some-data.hive.sql
  another-pipeline-workflow/
    aws/
      steps.json
      bootstrap.json
    pig/
      load-some-data.pig
  ...

Shared script or organized by the tool name:

my-cool-app/
  hive/
    create-schema.hive.sql
  python/
    mysql-to-hive-schema-translator.py
  ...

Configuration

I like to put all the configuration tasks before any ETL tasks so we can fail the cluster as early as possible if there is a setup problem.

Currently, I place each setup step in steps.json for pipeline it is needed in. Since there is duplicated code, these should be moved into a common location and compiled by rake into the steps.json file. I'll probably move these configs to a yaml file that can be translated into json.

You'll notice I've set: "ActionOnFailure": "CANCEL_AND_WAIT", this is for development. In production, these should be switched to TERMINATE_CLUSTER to avoid any inactive long running clusters.

HCatalog

I use HCatalog in pig to load csvs into Hive

{
  "Name": "Configure HCatalog",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://support.elasticmapreduce/bootstrap-actions/hive/0.13.0/hcatalog_configurer.rb"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
}

Sqoop

Used to import/export data from my application layer db -- postgres.

{
  "Name": "Install Sqoop",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://support.elasticmapreduce/bootstrap-actions/sqoop/install-sqoop-v2"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
}

Then copy in any postgres connector you might need:

{
  "Name": "Install Sqoop Postgres",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://my-cool-app/current/aws/install_sqoop_postgres.sh"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
}

Where install_sqoop_postgres.sh:

#!/bin/bash
# the postgres jar is on the machine, just not in the right place, yet
cp /usr/lib/oozie/libtools/postgresql-9.0-801.jdbc4.jar /home/hadoop/sqoop/lib/

Running Hive

Start with the hive step runner:

  {
    "Name": "Hive Transform Companies",
    "Args": [
      "-f", "s3://my-cool-app/current/an-etl-workflow/hive/transform_companies.hive.sql"
    ],
    "ActionOnFailure": "CANCEL_AND_WAIT",
    "Type": "HIVE"
  }

If you get an error about specifying base path ERROR missing required argument base-path, use this instead:

  {
    "Name": "Hive Setup Databases and UDFs",
    "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
    "Args": [
      "s3://elasticmapreduce/libs/hive/hive-script",
      "--base-path", "s3://elasticmapreduce/libs/hive/",
      "--run-hive-script", "--hive-versions", "0.13.1",
      "--args",
      "-f", "s3://my-cool-app/current/hive/setup.hive.sql"
    ],
    "ActionOnFailure": "CONTINUE",
    "Type": "CUSTOM_JAR"
  }

Where setup.hive.sql is the hive script you want to run.

Running Pig with -useHCatalog

Load some data

{
  "Name": "Pig Stage CSVs",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://support.elasticmapreduce/libs/pig/pig-script-run", "--run-pig-script",
    "--args", "-useHCatalog",
    "-f", "s3://my-cool-app/current/an-etl-workflow/pig/stage-csvs.pig"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
}

Whate stage-csvs.pig is something like:

-- runwith: pig -useHCatalog
--  use the -w flag to show detailed warnings

REGISTER 'file:/home/hadoop/lib/pig/piggybank.jar'
DEFINE CSVExcelStorage org.apache.pig.piggybank.storage.CSVExcelStorage(
  ',', 'YES_MULTILINE', 'NOCHANGE', 'SKIP_INPUT_HEADER'
);


-- get the csvs from this year and last year
daily_raw_csvs = LOAD 's3n://my-cool-app/data/csvs/*201[45]-[0-9][0-9]-[0-9][0-9].csv' using CSVExcelStorage() AS (rank:int, rank_variation:chararry, category:chararray, period:chararray);

-- remove any empty rows
filtered_daily_raw_csvs = FILTER daily_raw_csvs BY (rank IS NOT NULL) AND (period IS NOT NULL);

-- remove any duplicates
distinct_csvs = DISTINCT filtered_daily_raw_csvs;

-- extract the date fields and rank_variation, which is a percent in the input data
with_dates = FOREACH distinct_csvs GENERATE rank, ((double)REGEX_EXTRACT(rank_variation, '([-]{0,1}\\d+)%', 1)/100) AS rank_variation, category, ToDate(REGEX_EXTRACT(period, '.*(\\d{4}-\\d{2}-\\d{2})$', 1)) AS period;

-- store the transformed data into hive, in the `stage` schema
STORE with_dates INTO 'stage.csvs' USING org.apache.hive.hcatalog.pig.HCatStorer();

Transforming data with hive

Use the hive step runner listed above:

  {
    "Name": "Hive Transform Companies",
    "Args": [
      "-f", "s3://my-cool-app/current/an-etl-workflow/hive/transform_ranks.hive.sql"
    ],
    "ActionOnFailure": "CANCEL_AND_WAIT",
    "Type": "HIVE"
  }

Say we have rank data for multiple categories coming in on multiple rows (fully denormalized) and we want to normalize the category data.

transform_ranks.hive.sql would look something like this:

DROP TABLE IF EXISTS transformed.categories;
CREATE TABLE transformed.categories AS
  SELECT
         reflect("java.util.UUID", "randomUUID") AS id,
         C.category AS name,
         CAST(unix_timestamp() AS TIMESTAMP) AS created_at,
         CAST(unix_timestamp() AS TIMESTAMP) AS updated_at
    FROM stage.csvs C
;

Then when we transform ranks, we can reference the id we generated in the step above.

Running Sqoop

Use the script-runner.jar

{
  "Name": "Export to Sqoop",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://my-cool-app/current/an-etl-workflow/sqoop/export.sqoop.sh"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
}

Where export.sqoop.sh:

# amazon emr warehouse directory
WAREHOUSE_DIR=/user/hive/warehouse

# table we're exporting
TABLE=categories

/home/hadoop/sqoop/bin/sqoop export \
  --verbose \
  --hcatalog-home /home/hadoop/.versions/hive-0.13.1/hcatalog \
  --connect jdbc:postgresql://mydbserver.mydomain.com:5432/db_name \
  --username dbusername --password dbpassword \
  --export-dir ${WAREHOUSE_DIR}/transformed.db/${TABLE} \
  --table ${TABLE}_temp \
  --input-fields-terminated-by '\0001' --input-null-string '\\N' --input-null-non-string '\\N'

Handling data swapping in your production db

Observant readers will notice I import the transformed data into tables suffixed by _temp.

Since my webapp uses the postgres database in production, I'll need to cutover to the new data in a single transaction.

I run these postgres scripts from a script runner step:

pg_run.sh:

#!/bin/bash

# PARAMS:
#  the pgparams config file name for the target database (config/postgres/), without .conf
#  path from project home of sql file to run

set -o nounset  # exit if trying to use an uninitialized var
set -o errexit  # exit if any program fails
set -o pipefail # exit if any program in a pipeline fails, also
set -x          # debug mode

PGPARAMS=$1
PGPARAMS_STRING=$( hadoop fs -cat s3://my-cool-app/current/config/postgres/${PGPARAMS}.conf )
PG_SQL_PATH=$2
PGCMD=$( hadoop fs -cat ${PG_SQL_PATH} )
env $PGPARAMS_STRING psql -c "${PGCMD}"

steps:

{
  "Name": "Postgres Indexes",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://my-cool-app/current/postgres/pg_run.sh",
    "dbname",
    "s3://my-cool-app/current/an-etl-workflow/postgres/indexes.pg.sql"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
},
{
  "Name": "Postgres Transformations",
  "Jar": "s3://elasticmapreduce/libs/script-runner/script-runner.jar",
  "Args": [
    "s3://my-cool-app/current/postgres/pg_run.sh",
    "dbname",
    "s3://my-cool-app/current/an-etl-workflow/postgres/transform.pg.sql"
  ],
  "ActionOnFailure": "CANCEL_AND_WAIT",
  "Type": "CUSTOM_JAR"
}

indexes.pg.sql:

CREATE INDEX index_category_names ON categories (name)

transform.pg.sql:

BEGIN;
  -- ranks
  ALTER TABLE categories_temp ADD PRIMARY KEY (id);
   DROP TABLE categories;
  ALTER TABLE categories_temp RENAME TO categories;
COMMIT;

Hive Elasticsearch integration

run hive with elasticsearch, in debug mode:

hive -hiveconf hive.aux.jars.path=elasticsearch-hadoop-hive-2.1.1.jar --hiveconf hive.root.logger=DEBUG,console

then use this to create tables:

DROP TABLE elasticsearch_wigets;
CREATE EXTERNAL TABLE elasticsearch_wigets(
  `id` string,
  `name` string
)
STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
TBLPROPERTIES('es.nodes' = 'elasticsearch.awesome.com:9200',
              'es.resource' = 'wigets')
;

and set the data:

INSERT OVERWRITE TABLE elasticsearch_wigets
    SELECT id,
           name
      FROM wigets
;

Managing Underlying Tasks

use yarn application list (your results will show your ip, I've redacted ours):

[[email protected] var]$ yarn application -list
15/07/15 23:37:54 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:9022
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
                Application-Id      Application-Name      Application-Type        User       Queue               State         Final-State         Progress                        Tracking-URL
application_1436973673154_0030  calc_some_ranks_temp.jar             MAPREDUCE      hadoop     default             RUNNING           UNDEFINED           73.91% http://ip-0-0-0-0.ourdomain.com:36429

Troubleshooting

  • getting underlying jobs
yarn application -list
yarn application -kill <application-id>
  • logs are in /mnt/var

    • step logs are in /mnt/var/log/hadoop/steps

while a step is running you can strema the logs in realtime with these command on the master node:

tail -f /mnt/var/log/hadoop/steps/s-54K87AJP83RR/*
  • task logs are available via the hadoop job -logs command on the master node

for example, if you see something like this in your hadoop logs, where one task is starting another:

http://10.11.1.29:9026/taskdetails.jsp?jobid=job_1441139008487_0013&tipid=task_1441139008487_0013_r_000009

find the logs for this sub-task with:

hadoop job -logs job_1441139008487_0013

Security groups

8443 -- all external management connections from the AWS EMR service happen over this port

The EMR management server IP block is:

205.251.233.160/28
205.251.233.176/29
205.251.233.32/28
205.251.233.48/29
205.251.234.32/28
54.240.230.176/29
54.240.230.184/29
54.240.230.240/29

EMR 4.1 Changes

  • hadoop install is pushed via package installation infrastructure

  • bootstrap actions are no longer used, instead everything is a step

Let me know if I copy+pasted something wrong or if you have any questions. Leave a comment!