Importing DynamoDB Data Using Apache Hive on Amazon EMR

栏目: IT技术 · 发布时间: 4年前

内容简介:This article describes one of the many ways to import a csv data file into AWS DynamoDB database. The option explained here leverages Amazon EMR and Hive. Using Amazon EMR and Hive you can quickly and efficiently process large amounts of data, such as data
Importing DynamoDB Data Using Apache Hive on Amazon EMR

This article describes one of the many ways to import a csv data file into AWS DynamoDB database. The option explained here leverages Amazon EMR and Hive. Using Amazon EMR and Hive you can quickly and efficiently process large amounts of data, such as data stored in S3 and import data into DynamoDB.

What this example accomplishes?

  1. Every day an external datasource sends a csv file with about 1000 records to S3 bucket.
  2. A lambda function that will get triggered when an csv object is placed into an S3 bucket.
  3. Lambda function will start a EMR job with steps includes:
  • Create a Hive table that references data stored in DynamoDB.
  • Creating a hive table that references a location in Amazon S3.
  • Load data form S3 table to DynamoDB table.

The following diagram shows the architecture of the process.

Importing DynamoDB Data Using Apache Hive on Amazon EMR

Prerequisites

  • Basic understanding of CloudFormation.
  • Basic understanding of EMR.
  • Setup an AWS account.
  • Install  Serverless Framework .

Now, Let’s start

Before getting started, Install the Serverless Framework. Open up a terminal and type npm install -g serverless.

There is a yml file (serverless.yml) in the project directory. Let’s start to define a set of objects in template file as below:

S3 bucket

There are 2 S3 buckets,  LogBucket is for EMR logs,  S3BucketCsvimport is to store csv files.

Resources:
  LogBucket:
    Type: AWS::S3::Bucket
    Properties:
      AccessControl: Private
  S3BucketCsvimport:
    Type: AWS::S3::Bucket
    Properties:
      AccessControl: Private
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: ${self:custom.csvImportBucketName}

DynamoDB table

A DynamoDB table to load csv data from S3.

Resources:
  ContactsTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: ${self:custom.contactsTable}
      SSESpecification:
        SSEEnabled: true
      AttributeDefinitions:
        - AttributeName: id
          AttributeType: S
      KeySchema:
        - AttributeName: id
          KeyType: HASH
      ProvisionedThroughput:
        ReadCapacityUnits: ${self:custom.tableThroughputs.${self:provider.stage}}
        WriteCapacityUnits: ${self:custom.tableThroughputs.${self:provider.stage}}
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES

Lambda function configuration

Adding lambda function configuration to serverless.yml. it will be triggered by S3 new objected created event, the lambda function will then start a EMR job flow to process data importing.

startEMRJob:
  handler: src/handler.startEMRJob
  environment:
   CONTACTS_TABLE: ${self:custom.contactsTable}
   SUBNET_ID: ${self:custom.vpc.subsetId}
   EMR_LOGS_BUCKET:
     Ref: LogBucket
   CSV_IMPORT_BUCKET: ${self:custom.csvImportBucketName}
  events:
   - s3:
       bucket: ${self:custom.csvImportBucketName}
       event: s3:ObjectCreated:*
       rules:
         - prefix: uploads/
         - suffix: .csv
       existing: true

IAM role

We also need to create IAM role for the lambda function, so our lambda function has permission to start EMR job flow.

provider:
  iamRoleStatements:
    - Effect: "Allow"
      Action:
        - "iam:PassRole"
      Resource:
        - arn:aws:iam::#{AWS::AccountId}:role/EMR_DefaultRole
        - arn:aws:iam::#{AWS::AccountId}:role/EMR_EC2_DefaultRole
    - Effect: "Allow"
      Action:
        - "elasticmapreduce:RunJobFlow"
      Resource: "*"
    - Effect: "Allow"
      Action:
        - "s3:PutObject"
      Resource:
        - "Fn::Join":
            - ""
            - - "arn:aws:s3:::"
              - ${self:custom.csvImportBucketName}
              - "/*"
    - Effect: "Allow"
      Action:
        - "dynamodb:*"
      Resource:
        - "Fn::GetAtt": [ContactsTable, Arn]
        - "Fn::Join":
            - "/"
            - - { "Fn::GetAtt": [ContactsTable, Arn] }
              - "index/*"

Adding lambda function

Let’s add a lambda function to create an AWS EMR cluster and adding the step details such as the location of the hive scripts, arguments etc. We can use the boto3 lib for EMR, in order to create a cluster and submit the job dynamically from lambda function.

import boto3
import logging
import os
from datetime import datetime
from pathlib import Path

emr = boto3.client('emr')
s3 = boto3.resource('s3')
dynamodb = boto3.resource('dynamodb')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def startEMRJob(event, context):
    try:
        put_dump_record_to_db()
        put_step_scripts_to_s3()
        cluster_id = emr.run_job_flow(
            Name='test_emr_job',
            LogUri="s3://{}".format(os.environ['EMR_LOGS_BUCKET']),
            ReleaseLabel='emr-5.18.0',
            Applications=[
                {
                    'Name': 'Hadoop'
                },
                {
                    'Name': 'Livy'
                },
                {
                    'Name': 'Pig'
                },
                {
                    'Name': 'Hue'
                },
                {
                    'Name': 'Hue'
                },
                {
                    'Name': 'Hive'
                },
            ],
            Instances={
                'InstanceGroups': [
                    {
                        'Name': "Master nodes",
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'MASTER',
                        'InstanceType': 'm1.medium',
                        'InstanceCount': 1,
                    },
                    {
                        'Name': "Slave nodes",
                        'Market': 'ON_DEMAND',
                        'InstanceRole': 'CORE',
                        'InstanceType': 'm1.medium',
                        'InstanceCount': 2,
                    }
                ],
                'KeepJobFlowAliveWhenNoSteps': False,
                'TerminationProtected': False,
                'Ec2SubnetId': os.environ['SUBNET_ID'],
            },
            Configurations=[
                {
                    'Classification': 'hive-site',
                    'Properties': {
                        'hive.execution.engine': 'mr'
                    }
                },
            ],
            Steps=[
                {
                    'Name': 'creating dynamodb table',
                    'ActionOnFailure': 'CONTINUE',
                    'HadoopJarStep': {
                            'Jar': 'command-runner.jar',
                            'Args': ['hive-script',
                                     '--run-hive-script',
                                     '--args',
                                     '-f',
                                     's3://{}/scripts/step1.q'.format(
                                         os.environ['CSV_IMPORT_BUCKET']),
                                     '-d',
                                     'DYNAMODBTABLE={}'.format(
                                         os.environ["CONTACTS_TABLE"])]
                    }
                },
                {
                    'Name': 'creating csv table',
                    'ActionOnFailure': 'CONTINUE',
                    'HadoopJarStep': {
                            'Jar': 'command-runner.jar',
                            'Args': ['hive-script',
                                     '--run-hive-script',
                                     '--args',
                                     '-f',
                                     's3://{}/scripts/step2.q'.format(
                                         os.environ['CSV_IMPORT_BUCKET']),
                                     '-d',
                                     'INPUT=s3://{}'.format(
                                         os.environ['CSV_IMPORT_BUCKET']),
                                     '-d',
                                     'TODAY={}'.format(
                                         datetime.today().strftime('%Y-%m-%d'))]
                    }
                },
                {
                    'Name': 'adding partition',
                    'ActionOnFailure': 'CONTINUE',
                    'HadoopJarStep': {
                            'Jar': 'command-runner.jar',
                            'Args': ['hive-script',
                                     '--run-hive-script',
                                     '--args',
                                     '-f',
                                     's3://{}/scripts/step3.q'.format(
                                         os.environ['CSV_IMPORT_BUCKET']),
                                     '-d',
                                     'INPUT=s3://{}'.format(
                                         os.environ['CSV_IMPORT_BUCKET']),
                                     '-d',
                                     'TODAY={}'.format(
                                         datetime.today().strftime('%Y-%m-%d'))]
                    }
                },
                {
                    'Name': 'import date to dynamodb',
                    'ActionOnFailure': 'TERMINATE_CLUSTER',
                    'HadoopJarStep': {
                            'Jar': 'command-runner.jar',
                            'Args': ['hive-script',
                                     '--run-hive-script',
                                     '--args',
                                     '-f',
                                     's3://{}/scripts/step4.q'.format(
                                         os.environ['CSV_IMPORT_BUCKET']),
                                     '-d',
                                     'TODAY={}'.format(
                                         datetime.today().strftime('%Y-%m-%d'))]
                    }
                }
            ],
            VisibleToAllUsers=True,
            JobFlowRole='EMR_EC2_DefaultRole',
            ServiceRole='EMR_DefaultRole',
        )
        logger.info('cluster {} created with the step...'.format(
            cluster_id['JobFlowId']))

    except Exception as e:
        logger.error(e)
        raise


def put_dump_record_to_db():
    table = dynamodb.Table(os.environ["CONTACTS_TABLE"])
    if table.item_count == 0:
        table.put_item(
            Item={'id': 'NA',
                  'full_name': 'demo user',
                  'gender': 'M',
                  'address': 'NA',
                  'language': ["English"]})


def put_step_scripts_to_s3():
    root_path = Path(__file__).parent.parent
    scripts = ["scripts/step1.q",
               "scripts/step2.q",
               "scripts/step3.q",
               "scripts/step4.q"]
    for script in scripts:
        s3.Bucket(os.environ['CSV_IMPORT_BUCKET']).upload_file(
            '{}/{}'.format(root_path, script), script)

Hive scripts

And finally…let’s add Hive script.

Step 1. Creating mapping between Hive and S3

We will create an external Hive table that maps to the csv data file.

-- Create external table for dynamodb table.
CREATE EXTERNAL TABLE IF NOT EXISTS
  dynamodb_contacts (
    id string,full_name string, email string,
    gender string,address string,language array<string>)
  STORED BY
    'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler'
  TBLPROPERTIES
    ("dynamodb.table.name" = "${DYNAMODBTABLE}",
     "dynamodb.column.mapping" = "id:id,full_name:full_name,email:email,gender:gender,address:address,language:language");

Step 2. Creating mapping between Hive and DynamoDB

Establish a mapping between Hive and the Features table in DynamoDB.

-- Create table using csv in s3
CREATE EXTERNAL TABLE IF NOT EXISTS
  csv_contacts (id string,first_name string,
                last_name string, email string, gender string,
                address string, language array<string> )
  PARTITIONED BY (created_date string)
  ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  COLLECTION ITEMS TERMINATED BY ','
  LOCATION '${INPUT}/uploads/'
  TBLPROPERTIES (
    'serialization.null.format' = '',
    'skip.header.line.count' = '1');

Step3. Load the table with data in S3

Add current date as partition and load table with csv data in S3.

-- add current date as partition
ALTER TABLE csv_contacts
  add IF NOT EXISTS PARTITION (created_date='${TODAY}')
  LOCATION '${INPUT}/uploads/created_date=${TODAY}/';

Step4.Import a table from Amazon S3 to DynamoDB

Using following Hive script to write data from Amazon S3 to DynamoDB.

-- Import csv data to DynamoDB table
INSERT OVERWRITE TABLE dynamodb_contacts
  SELECT DISTINCT id, CONCAT_WS(' ',first_name,last_name),email,
                  gender, address, language FROM csv_contacts
  WHERE
    created_date >= '${TODAY}'

Notethat Amazon EMR operations on a DynamoDB table count as read/write operations, and are subject to the table’s provisioned throughput settings. for more details please visit  EMR document .

Now let’s deploy the service and test it out!

$sls deploy --stage dev
After script deployed, copy a csv file to S3 bucket with
 created_date={CURRENT_DATE}
prefix, eg.
$aws s3 cp csv/contacts.csv s3://myemr.csv.import.dev/uploads/created_date=2020-02-03/contacts.csv

Then we can go to AWS EMR console and check the progress of the EMR steps.

Importing DynamoDB Data Using Apache Hive on Amazon EMR

It will take several minutes to complete all the steps and cluster will terminate automatically after running above steps.

Next We can go to AWS DynamoDB console to verify that the data has been loaded into DynamoDB:

Importing DynamoDB Data Using Apache Hive on Amazon EMR

Eventually Importing process took about 6 mins to load 1000 records total 76kb into DynamoDB table with write capacity units 10 and no auto scaling enabled.

That’s about it, Thanks for reading!

I hope you have found this article useful, You can find the complete project in my  GitHub repo .


以上就是本文的全部内容,希望本文的内容对大家的学习或者工作能带来一定的帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

数字乌托邦

数字乌托邦

[美]弗雷德·特纳 / 张行舟、王芳、叶富华、余倩 / 译言·东西文库/电子工业出版社 / 2013-5-1 / 49.80元

20世纪60年代早期,在美国大众眼中,计算机只是冷战中冰冷的机器,然而到了90年代互联网到来之时,计算机却呈现出一个截然不同的世界——它们模拟出了一个数字乌托邦般的协同体,而这正是曾经最反对冷战的嬉皮士们的共同愿景。 本书正是探索这次非同寻常,且颇具讽刺意味的变革的第一本书。作者挖掘出那些在旧金山湾区的先驱者——斯图尔特·布兰德和他的“全球网络”鲜为人知的故事。1968年到1998年期间,通......一起来看看 《数字乌托邦》 这本书的介绍吧!

RGB转16进制工具
RGB转16进制工具

RGB HEX 互转工具

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具