Skip to main content

Load Hadoop data via Amazon EMR

This document describes the general steps to set up Hadoop batch indexing for Imply Hybrid (formerly Imply Cloud) with Amazon Elastic Map Reduce (EMR).

The processes outlined in this topic use Amazon EMR 6.10.0 and Hadoop 3.3.3. For the most recent instructions for Amazon EMR, see the Amazon EMR Management Guide.

Set up a new EMR cluster

If you do not already have an EMR cluster:

  1. In the Amazon EMR dashboard, click Create cluster. To make configuration simpler, create the cluster in the Imply Hybrid VPC. Alternatively, you can create the cluster in a different VPC and then set up a VPC peering connection between the two VPCs. See Set up an existing EMR cluster.

  2. Click Go to advanced options.

  3. On Step 1: Software and Steps choose the following:

    • Release version: emr-6.10.0

    • Hadoop version: Hadoop 3.3.3

      Accept the defaults for the remaining settings.

  4. On Step 2: Hardware set the following:

    • Network configuration: Specify the Imply Hybrid VPC

    • Select appropriate instance types for the master, core, and task nodes.

      You can't change master and core instance types later, but you can create additional task groups with different instance types later.

  5. On Step 3: General Cluster Settings accept the defaults.

  6. On Step 4: Security, setting an EC2 key pair is recommended but not required. Note the security group IDs for the master and slave security groups which you will need to modify later.

  7. Click Create cluster.

  8. When the cluster becomes available, click it in the cluster list.

  9. In the Hardware section, click the ID of the "Master Instance Group" and note the value for Private DNS Name.

  10. Edit the security groups for the EMR master and EMR slave nodes. The defaults are ElasticMapReduce-master and ElasticMapReduce-slave. Add a rule to each to allow All Traffic from the Imply Hybrid Unmanaged security group. To identify the Imply Hybrid unmanaged security group ID, go to the VPC Dashboard, click on Security Groups, and note the Group ID for the security group with a description of "Imply Hybrid Unmanaged".

Go to Configure your Imply Hybrid cluster.

Set up an existing EMR cluster

If you already have an EMR cluster:

  1. Click on the cluster in the cluster list.
  2. In Hardware, click the ID of the "Master Instance Group" and note the value for the Private Dns Name.
  3. Set up a VPC peering connection between the VPC containing your EMR cluster and the Imply Hybrid VPC. Both VPCs must be in the same region.
  4. On the VPC Dashboard, go to Peering Connections and Click Create Peering Connection.
  5. Select either the EMR or Imply Hybrid VPC as the Requester and the other VPC as the Accepter. Note the CIDR block for both VPCs.
  6. Click on Create Peering Connection.
  7. For the peering connection that you just created choose Accept request under Actions.
  8. Navigate to Route Tables and select the route table for the Imply Hybrid VPC. The name uses the following convention: imply-xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-network-rtb.
  9. Click Routes / Edit and Add another route.
  10. Enter the CIDR block for the EMR VPC as the Destination and the ID of the peering connection you created as the Target.
  11. Repeat the previous step for the active route table for the EMR VPC. Enter the CIDR block for the Imply Hybrid VPC as the Destination and the ID of the peering connection you created as the Target.
  12. Edit the security groups for the EMR master and EMR slave nodes. The defaults are ElasticMapReduce-master and ElasticMapReduce-slave. Add a rule to each to allow All Traffic from the Imply Hybrid Unmanaged security group. To identify the Imply Hybrid unmanaged security group ID, go to the VPC Dashboard, click on Security Groups, and note the Group ID for the security group with a description of ‘Imply Hybrid Unmanaged’.

Configure your Imply Hybrid cluster

If you need to allow access to your Imply Hybrid cluster from the EMR VPC, edit the security groups for Druid nodes. Add a rule to each to allow All Traffic from your EMR security group.

Imply Hybrid Druid clusters don't include the configuration files normally used in Hadoop client configuration and job properties. To configure the Hadoop client for Imply Hybrid:

  • Specify Hadoop configurations as properties in the common.runtime.properties or runtime.properties for the Middle Manager with the hadoop prefix.
  • Specify Hadoop configurations as jobProperties in the batch ingestion spec.

You must specify hadoop.fs.defaultFS in the runtime.properties file. If not, the indexing job will not be able to locate the final segments in HDFS after the MapReduce jobs complete. Otherwise you can specify the configurations in either place.

Configure the Middle Manager

Add properties to the runtime.properties file of the Middle Manager as follows:

  1. In the Imply Hybrid Manager, navigate to Advanced / Override Druid Properties.
  2. Add the following to the Middle Manager configuration:
hadoop.fs.defaultFS=hdfs://ip-xxx-xx-xx-xxx.ec2.internal:8020
hadoop.yarn.resourcemanager.hostname=ip-xxx-xx-xx-xxx.ec2.internal
hadoop.yarn.application.classpath=$HADOOP_CONF_DIR,$HADOOP_COMMON_HOME/*,$HADOOP_COMMON_HOME/lib/*,$HADOOP_HDFS_HOME/*,$HADOOP_HDFS_HOME/lib/*,$HADOOP_MAPRED_HOME/*,$HADOOP_MAPRED_HOME/lib/*,$HADOOP_YARN_HOME/*,$HADOOP_YARN_HOME/lib/*,/usr/lib/hadoop-lzo/lib/*,/usr/share/aws/emr/emrfs/conf,/usr/share/aws/emr/emrfs/lib/*,/usr/share/aws/emr/emrfs/auxlib/*,/usr/share/aws/emr/lib/*,/usr/share/aws/emr/ddb/lib/emr-ddb-hadoop.jar,/usr/share/aws/emr/goodies/lib/emr-hadoop-goodies.jar,/usr/share/aws/emr/kinesis/lib/emr-kinesis-hadoop.jar,/usr/share/aws/emr/cloudwatch-sink/lib/*
hadoop.mapreduce.framework.name=yarn
hadoop.fs.s3a.awsAccessKeyId={accessKey}
hadoop.fs.s3a.awsSecretAccessKey={secretAccessKey}
  1. Replace hadoop.fs.defaultFS and hadoop.yarn.resourcemanager.hostname with the values for your EMR master's private DNS name and set hadoop.fs.s3a.awsAccessKeyId and hadoop.fs.s3a.awsSecretAccessKey.
  2. Restart or update your Imply Hybrid cluster to apply your changes.

Sample Hadoop indexing job

The following example demonstrates how to specify additional Hadoop configurations in the jobProperties section of the indexing spec.

  1. In the Imply Hybrid Manager cluster view, click Load data.

  2. Select Batch > Other.

  3. Use the following template to define your indexing spec. Change the dataSource and intervals and update the ioConfig.inputSpec.paths to point to your input file. Update any other relevant fields to match your data.

    Click to see ingestion spec template
    {
    "type": "index_hadoop",
    "spec": {
    "dataSchema": {
    "dataSource": "MY_DATASOURCE",
    "timestampSpec": null,
    "dimensionsSpec": null,
    "metricsSpec": [
    {
    "type": "count",
    "name": "added"
    }
    ],
    "granularitySpec": {
    "type": "uniform",
    "segmentGranularity": "DAY",
    "queryGranularity": {
    "type": "none"
    },
    "rollup": true,
    "intervals": ["MY_INTERVAL_EXAMPLE_2022-01-01/2023-01-01"]
    },
    "transformSpec": {
    "filter": null,
    "transforms": []
    },
    "parser": {
    "type": "hadoopyString",
    "parseSpec": {
    "format": "json",
    "timestampSpec": {
    "column": "currentTime",
    "format": "auto"
    },
    "dimensionsSpec": {
    "dimensions": [
    "app_name_id",
    "country",
    "direction_change"
    ]
    }
    }
    }
    },
    "ioConfig": {
    "type": "hadoop",
    "inputSpec": {
    "type": "static",
    "paths": "s3a://bucket/path/MY_FILE.json"
    },
    "metadataUpdateSpec": null,
    "segmentOutputPath": null
    },
    "tuningConfig": {
    "type": "hadoop",
    "workingPath": null,
    "version": "2023-07-17T10:25:28.939Z",
    "partitionsSpec": {
    "type": "hashed",
    "numShards": null,
    "partitionDimensions": [],
    "partitionFunction": "murmur3_32_abs",
    "maxRowsPerSegment": 20000
    },
    "shardSpecs": {},
    "indexSpec": {
    "bitmap": {
    "type": "roaring"
    },
    "dimensionCompression": "lz4",
    "stringDictionaryEncoding": {
    "type": "utf8"
    },
    "metricCompression": "lz4",
    "longEncoding": "longs"
    },
    "indexSpecForIntermediatePersists": {
    "bitmap": {
    "type": "roaring"
    },
    "dimensionCompression": "lz4",
    "stringDictionaryEncoding": {
    "type": "utf8"
    },
    "metricCompression": "lz4",
    "longEncoding": "longs"
    },
    "appendableIndexSpec": {
    "type": "onheap",
    "preserveExistingMetrics": false
    },
    "maxRowsInMemory": 500000,
    "maxBytesInMemory": 0,
    "leaveIntermediate": false,
    "cleanupOnFailure": true,
    "overwriteFiles": false,
    "ignoreInvalidRows": false,
    "jobProperties": {
    "mapreduce.job.classloader": "true",
    "mapreduce.job.classloader.system.classes": "-javax.el.,org.apache.commons.logging.,org.apache.log4j.,org.apache.hadoop.,com.fasterxml.jackson.databind.PropertyNamingStrategy",
    "mapreduce.framework.name": "yarn",
    "fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "fs.s3n.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem",
    "fs.s3.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
    },
    "combineText": false,
    "useCombiner": false,
    "numBackgroundPersistThreads": 0,
    "forceExtendableShardSpecs": false,
    "useExplicitVersion": false,
    "allowedHadoopPrefix": [],
    "logParseExceptions": false,
    "maxParseExceptions": 0,
    "useYarnRMJobStatusFallback": true,
    "awaitSegmentAvailabilityTimeoutMillis": 0
    },
    "uniqueId": "xxxxx",
    "context": {
    "forceTimeChunkLock": true,
    "useLineageBasedSegmentAllocation": true
    }
    },
    "hadoopDependencyCoordinates": null,
    "classpathPrefix": null,
    "context": {
    "forceTimeChunkLock": true,
    "useLineageBasedSegmentAllocation": true
    },
    "dataSource": "MY_DATASOURCE"
    }