Kafka ingestion tutorial
The Kafka indexing service enables you to ingest data into Imply from Apache Kafka. This service offers exactly-once ingestion guarantees as well as the ability to ingest historical data.
You can load data from Kafka in the Druid Console using the Apache Kafka data loader:
This tutorial guides you through the steps to:
- Set up an instance of Kafka and create a sample topic called "wikipedia".
- Configure the Druid Kafka indexing service to load data from the Kafka event stream.
- Load data into the Kafka "wikipedia" topic.
- Create a data cube in Imply Pivot.
The steps assume you have access to a running instance of Imply. If you don't, see the Quickstart for information on getting started.
The Druid Kafka indexing service requires access to read from an Apache Kafka topic. If you are running Imply Hybrid (formerly Imply Cloud), consider installing Kafka in the same VPC as your Druid cluster. For more information, see Imply Hybrid Security.
Step 1: Get Kafka
In a terminal window, download Kafka as follows:
curl -O https://archive.apache.org/dist/kafka/2.5.1/kafka_2.13-2.5.1.tgz tar -xzf kafka_2.13-2.5.1.tgz cd kafka_2.13-2.5.1
This directory is referred to as the Kafka home for the rest of this tutorial.
Imply and Kafka both rely on ZooKeeper. If you are running this tutorial on the same machine where you are running an Imply single machine instance, such as the Quickstart on-prem installation, modify the default configuration to avoid port conflicts. Change the default port for the Kafka ZooKeeper from 2181 to 2180 where it appears in the following files:
<kafka_home>/config/zookeeper.properties
<kafka_home>/config/server.properties
Start Kafka's ZooKeeper as follows:
./bin/zookeeper-server-start.sh config/zookeeper.properties
Open a new terminal window and navigate to the Kafka home directory.
Start a Kafka broker as follows:
./bin/kafka-server-start.sh config/server.properties
From another terminal window, run this command to create a Kafka topic called wikipedia, the topic to which you will send data:
./bin/kafka-topics.sh --create --zookeeper localhost:2180 --replication-factor 1 --partitions 1 --topic wikipedia
Kafka returns a message when it successfully adds the topic:
Created topic wikipedia
.
Step 2: Enable Druid Kafka ingestion
You can use Druid's Kafka indexing service to ingest messages from your newly created wikipedia topic. To start the service, navigate to the Imply directory and submit a supervisor spec to the Druid overlord as follows:
curl -XPOST -H'Content-Type: application/json' -d @quickstart/wikipedia-kafka-supervisor.json http://localhost:8090/druid/indexer/v1/supervisor
If you are not using a locally running Imply instance:
- Copy the contents of the following listing to a file.
- Modify the value for
bootstrap.servers
to the address and port where Druid can access the Kafka broker. - Post the updated file to the URL where your Druid Overlord process is running.
{
"type": "kafka",
"spec" : {
"dataSchema": {
"dataSource": "wikipedia",
"timestampSpec": {
"column": "time",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{
"name": "added",
"type": "long"
},
{
"name": "deleted",
"type": "long"
},
{
"name": "delta",
"type": "long"
}
]
},
"metricsSpec": [],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "DAY",
"queryGranularity": "NONE",
"rollup": false
}
},
"tuningConfig": {
"type": "kafka",
"reportParseExceptions": false
},
"ioConfig": {
"topic": "wikipedia",
"inputFormat": {
"type": "json"
},
"replicas": 1,
"taskDuration": "PT10M",
"completionTimeout": "PT20M",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
}
}
}
}
When the Overlord successfully creates the supervisor, it returns a response containing the ID of the supervisor. In this case: {"id":"wikipedia-kafka"}
.
For more details about what's going on here, check out the Druid Kafka indexing service documentation.
Step 3: Load historical data
Now it's time to launch a console producer for your topic and send some data!
Navigate to your Kafka directory.
Modify the following command to replace {PATH_TO_IMPLY} with the path to your Imply directory:
export KAFKA_OPTS="-Dfile.encoding=UTF-8" ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wikipedia < {PATH_TO_IMPLY}/quickstart/wikipedia-2016-06-27-sampled.json
Run the command to post sample events to the wikipedia Kafka topic.
The Kafka indexing reads the events from the topic and ingests them into Druid.
Within the Druid console, navigate to the Datasources page to verify that the new wikipedia-kafka datasource appears:
Click the segments link to see the list of segments generated by ingestion. Notice that the ingestion spec specified partitioning to
HOUR
, so there is a segment for each hour, around 22 based on the data ingested.
That's it! You can now query the historical data in the console, but first try ingesting real-time data, as follows.
Step 4: Load real-time data
Exploring historical Wikipedia edits is useful, but it is even more interesting to explore trends on Wikipedia happening right now.
To do this, you can download a helper application to parse events from Wikimedia's IRC feeds and post them to the wikipedia Kafka topic from the previous step, as follows:
From a terminal, run the following commands to download and extract the helper application:
curl -O https://static.imply.io/quickstart/wikiticker-0.8.tar.gz tar -xzf wikiticker-0.8.tar.gz cd wikiticker-0.8-SNAPSHOT
Now run wikiticker, passing "wikipedia" as the
-topic
parameter:bin/wikiticker -J-Dfile.encoding=UTF-8 -out kafka -topic wikipedia
After a few moments, look for an additional segment of real-time data in the datasource created.
As Kafka data is sent to Druid, you can immediately query it. To see the latest data sent by wikiticker, set a time floor for the latest hour, as shown:
Step 5: Build a data cube
Next, try configuring a data cube in Pivot:
Navigate to Pivot at http://localhost:9095. You should see your wikipedia-kafka datasource:
Click on the data source and then click Create a data cube, and confirm when prompted.
Click Go to data cube.
You can now slice and dice and explore the data like you would any data cube:
Next steps
So far, you've loaded data using an ingestion spec included in the Imply distribution. Each ingestion spec is designed for a particular dataset. You can load your own datasets by writing a custom ingestion spec.
To write your own ingestion spec, you can start by copying the content of the quickstart/wikipedia-kafka-supervisor.json
file (or copying from the listing above) into your own file as a starting point, and editing it as needed.
Alternatively, use the Druid data loader UI to generate the ingestion spec by clicking Apache Kafka from the Load Data page.
The steps for configuring Kafka ingestion in the data loader are similar to those for batch file ingestion, as described in the Quickstart. However, you will use the Kafka bootstrap server as the source, as shown:
As a starting point, you can keep most settings at their default values. At the Tune step, however, you must choose whether to retrieve the earliest or latest offsets in Kafka by choosing False or True for the Input tuning.
When you load your own Kafka topics, Druid creates at least one segment for every Kafka partition for every segmentGranularity
period. If your segmentGranularity
period is "HOUR" and you have three Kafka partitions, then Druid creates at least one segment per hour.
For best performance, keep your number of Kafka partitions appropriately sized to avoid creating too many segments.
For more information, see Druid's Kafka indexing service documentation for more details.