Skip to main content

Load data from files

This tutorial refers to release artifacts that are may not be available in Imply 4.0. For best results, refer to the Druid ingestion documentation.

Batch data consists of a denormalized, flattened data set.

There are two supported methods for loading batch data into Druid:

  1. Built-in ingestion with the "index" task. This performs the ingestion work on your Druid nodes. Each task processes data in a single thread, but you can parallelize ingestion by submitting multiple tasks.

  2. Hadoop-based ingestion:

    • For Imply on-prem: Hadoop-based ingestion with the "index_hadoop" task. This performs the ingestion work on a YARN cluster using Hadoop Map/Reduce, where it is automatically parallelized.
    • For Imply Hybrid (formerly Imply Cloud): EMR Hadoop-based ingestion with the "index_hadoop" task. This performs the ingestion work on an Amazon EMR cluster using Hadoop Map/Reduce, where it is automatically parallelized.

If you've never loaded data files into Druid before, we recommend trying out the example in the Quickstart page first and then coming back to this page.

Built-in ingestion

Druid can load files using built-in ingestion with the "index" task. Each indexing task you submit will run single-threaded. To parallelize the data loading process, you can partition your data by timefor example, hour, day, or some other time bucketingand then submit an indexing task for each time partition. Indexing tasks for different intervals can run simultaneously.

For an example of using built-in ingestion, see the Imply quickstart. For reference documentation, see the Druid documentation for ingestion tasks.

Hadoop-based ingestion

Druid can leverage Hadoop Map/Reduce to scale out ingestion, allowing it to load data from files on HDFS, S3, or other filesystems via parallelized YARN jobs. These jobs will scan through your raw data and produce optimized Druid data segments in your configured deep storage. The data will then be loaded by Druid Historical Nodes. Once loading is complete, Hadoop and YARN are not involved in the query path of Druid in any way.

The main advantages of loading data using Hadoop is that it automatically parallelizes the batch data loading process, and that it uses YARN resources instead of using your Druid machines (leaving your Druid machines free to handle queries).

For reference documentation, see the Druid documentation for Hadoop ingestion.

Configuration

To configure Druid for running ingestion tasks on a Hadoop cluster:

  • Update druid.indexer.task.hadoopWorkingPath in conf/druid/middleManager/runtime.properties to a path on HDFS that you'd like to use for temporary files required during the indexing process. druid.indexer.task.hadoopWorkingPath=/tmp/druid-indexing is a common choice.

  • Place your Hadoop configuration XMLs (core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml) on the classpath of your Druid nodes. You can do this by copying them into conf/druid/_common/.

  • Ensure that you have configured distributed deep storage. Note that while you do need a distributed deep storage in order to load data with Hadoop, it doesn't need to be HDFS. For example, if your cluster is running on Amazon Web Services, we recommend using S3 for deep storage even if you are loading data using Hadoop or Elastic MapReduce.

  • Hadoop-based Druid ingestion task specs use a different format from built-in ingestion task specs.

S3 setup

If your data is stored in S3, you can load it using Elastic MapReduce (EMR) or your own Hadoop cluster. To do this, use the following job properties in your Druid "index_hadoop" task:

"jobProperties" : {
"fs.s3.awsAccessKeyId" : "YOUR_ACCESS_KEY",
"fs.s3.awsSecretAccessKey" : "YOUR_SECRET_KEY",
"fs.s3.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
"fs.s3n.awsAccessKeyId" : "YOUR_ACCESS_KEY",
"fs.s3n.awsSecretAccessKey" : "YOUR_SECRET_KEY",
"fs.s3n.impl" : "org.apache.hadoop.fs.s3native.NativeS3FileSystem",
"io.compression.codecs" : "org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,org.apache.hadoop.io.compress.BZip2Codec,org.apache.hadoop.io.compress.SnappyCodec"
}

If you don't already have a Hadoop cluster, you can follow these steps to create an EMR cluster:

  • Create a persistent, long-running cluster.
  • When creating your cluster, enter the following configuration. If you're using the wizard, this is in advanced mode under Edit software settings.
classification=yarn-site,properties=[mapreduce.reduce.memory.mb=6144,mapreduce.reduce.java.opts=-server -Xms2g -Xmx2g -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps,mapreduce.map.java.opts=758,mapreduce.map.java.opts=-server -Xms512m -Xmx512m -Duser.timezone=UTC -Dfile.encoding=UTF-8 -XX:+PrintGCDetails -XX:+PrintGCTimeStamps,mapreduce.task.timeout=1800000]

This method uses Hadoop's built-in S3 file system rather than Amazon's EMRFS, and is not compatible with Amazon-specific features such as S3 encryption and consistent views. If you need to use those features, you will need to make the Amazon EMR Hadoop JARs available to Druid through one of the mechanisms described in the Using other Hadoop distributions section.

Using other Hadoop distributions

Druid works out of the box with many Hadoop distributions. If you are having dependency conflicts between Druid and your version of Hadoop, you can try reading the Druid Different Hadoop Versions documentation, searching for a solution in the Druid user groups, or contacting us for help.

EMR Hadoop-based ingestion

For Imply Hybrid, Druid can leverage Hadoop Map/Reduce using Amazon EMR to scale out ingestion, allowing it to load data from files on S3 via parallelized YARN jobs. These jobs will scan through your raw data and produce optimized Druid data segments in S3. The data will then be loaded by Druid Historical Nodes. Once loading is complete, EMR is not involved in the query path of Druid in any way.

The main advantages of loading data using EMR is that it automatically parallelizes the batch data loading process, and that it uses EMR resources instead of using your Druid machines (leaving your Druid machines free to handle queries).

See EMR Setup for instructions on how to configure EMR-based batch ingestion.

Loading additional data

When you load additional data into Druid using subsequent indexing tasks, the behavior depends on the intervals of the subsequent tasks. Batch loads in Druid act in a replace-by-interval manner, so if you submit two tasks for the same interval, only data from the later task will be visible. If you submit two tasks for different intervals, both sets of data will be visible.

This behavior makes it easy to reload data that you have corrected or amended in some way: just resubmit an indexing task for the same interval, but pointing at the new data. The replacement occurs atomically.

If you want to append to existing data for a given interval rather than replace it, you can do this in one of two ways:

  1. With built-in ingestion through the "index" task, use the parameter "appendToExisting" as described in the Druid documentation for ingestion tasks.
  2. With Hadoop-based ingestion through the "index_hadoop" task, use delta ingestion as described in the Druid documentation for updating existing data