Tutorial: Load from Kafka

In this tutorial, you'll load your own streams from Kafka using the Druid Kafka indexing service.

Imply additionally supports a wide variety of batch and streaming loading methods. See the Loading data page for more information about other options, including Kafka, Hadoop, HTTP, Storm, Samza, Spark Streaming, and your own JVM apps.

You can also load Kafka data using Tranquility Kafka. See the Loading from Kafka page for more information about choosing an option.


You will need:

  • Java 8 or better
  • Node.js 4.5.x or better
  • Linux, Mac OS X, or other Unix-like OS (Windows is not supported)
  • At least 4GB of RAM

On Mac OS X, you can use Oracle's JDK 8 to install Java and Homebrew to install Node.js.

On Linux, your OS package manager should be able to help for both Java and Node.js. If your Ubuntu- based OS does not have a recent enough version of Java, WebUpd8 offers packages for those OSes. If your Debian, Ubuntu, or Enterprise Linux OS does not have a recent enough version of Node.js, NodeSource offers packages for those OSes.

Start Imply

If you've already installed and started Imply using the quickstart, you can skip this step.

First, download Imply 2.3.9 from imply.io/get-started and unpack the release archive.

tar -xzf imply-2.3.9.tar.gz
cd imply-2.3.9

Next, you'll need to start up Imply, which includes Druid, Imply Pivot, and ZooKeeper. You can use the included supervise program to start everything with a single command:

bin/supervise -c conf/supervise/quickstart.conf

You should see a log message printed out for each service that starts up. You can view detailed logs for any service by looking in the var/sv/ directory using another terminal.

Later on, if you'd like to stop the services, CTRL-C the supervise program in your terminal. If you want a clean start after stopping the services, remove the var/ directory and then start up again.

Start Kafka

Apache Kafka is a high throughput message bus that works well with Druid. For this tutorial, we will use Kafka To download Kafka, issue the following commands in your terminal:

curl -O http://www-us.apache.org/dist/kafka/
tar -xzf kafka_2.11-
cd kafka_2.11-

Start a Kafka broker by running the following command in a new terminal:

./bin/kafka-server-start.sh config/server.properties

Run this command to create a Kafka topic called wikiticker, to which we'll send data:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wikiticker

Enable Druid Kafka ingestion

We will use Druid's Kafka indexing service to ingest messages from our newly created wikiticker topic. To start the service, we will need to submit a supervisor spec to the Druid overlord by running the following from the Imply directory:

curl -XPOST -H'Content-Type: application/json' -d @quickstart/wikiticker-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor

If the supervisor was successfully created, you will get a response containing the ID of the supervisor; in our case we should see {"id":"wikiticker-kafka"}.

For more details about what's going on here, check out the Druid Kafka indexing service documentation.

Load historical data

Let's launch a console producer for our topic and send some data!

In your Kafka directory, run the following command, where {PATH_TO_IMPLY} is replaced by the path to the Imply directory:

export KAFKA_OPTS="-Dfile.encoding=UTF-8"
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikiticker < {PATH_TO_IMPLY}/quickstart/wikiticker-2016-06-27-sampled.json

The previous command posted sample events to the wikiticker Kafka topic which were then ingested into Druid by the Kafka indexing service. You're now ready to run some queries!

Load real-time data

Exploring historical Wikipedia edits is interesting, but even more fascinating is being able to explore what is trending on Wikipedia right now. To do this, we will download a helper application which will parse events from Wikimedia's IRC feeds and post these events into the wikiticker Kafka topic we set up earlier.

curl -O https://static.imply.io/quickstart/wikiticker-0.4.tar.gz
tar -xzf wikiticker-0.4.tar.gz
cd wikiticker-0.4

Now run wikiticker with parameters instructing it to write output to our Kafka topic:

bin/wikiticker -J-Dfile.encoding=UTF-8 -out kafka -topic wikiticker

Once the data is sent to Druid, you can immediately query it.

Query data

After sending data, you can immediately query it using any of the supported query methods. To start off, try a SQL query:

$ bin/dsql
dsql> SELECT FLOOR(__time TO DAY) AS "Day", SUM("count") AS Edits FROM "wikiticker-kafka" GROUP BY FLOOR(__time TO DAY);

│ Day                      │ Edits │
│ 2016-06-27T00:00:00.000Z │ 24433 │
│ 2017-03-07T00:00:00.000Z │   642 │
Retrieved 2 rows in 0.04s.

You can see both the historical and real-time data loaded into the system. Next, try configuring a datacube in Pivot:

  1. Navigate to Pivot at http://localhost:9095.
  2. Click on the Plus icon in the top right of the header bar and select "New data cube".
  3. Select the source "druid: wikiticker-kafka" and ensure "Auto-fill dimensions and measures" is checked.
  4. Click "Next: configure data cube".
  5. Click "Create cube". You should see the confirmation message "Data cube created".
  6. View your new datacube by clicking the Home icon in the top-right and selecting the "Wikiticker Kafka" cube you just created.

Load your own Kafka topics

So far, you've loaded data using a supervisor spec that we've included in the distribution. Each supervisor spec is designed to work with a particular dataset. You can load your own datasets by writing a custom ingestion spec.

To customize supervised Kafka indexing service ingestion, you can copy the included quickstart/wikiticker-kafka-supervisor.json spec into your own file, edit it as needed, and create or shutdown supervisors as needed. There's no need to restart Imply or Druid services themselves. See the Druid documentation for details on possible configurations.

When loading your own Kafka topics, Druid will create at least one segment for every Kafka partition, for every segmentGranularity period. If your segmentGranularity period is "HOUR" and you have 3 Kafka partitions, then Druid will create at least one segment per hour. For best performance, we recommend avoiding creating too many segments by keeping your number of Kafka partitions appropriately sized. See the Druid Kafka indexing service documentation for more details.

Further reading