Loading data (Kafka)

Getting started

The Druid Kafka indexing service provides support for ingesting data from Kafka. This service offers exactly-once ingestion guarantees as well as the ability to ingest historical data. Additionally, this runs as part of the core Druid services and does not require any additional processes.

The Kafka indexing service, uses supervisors which run on the overlord and manage the creation and lifetime of Kafka indexing tasks. This indexing service can handle non-recent events and provides exactly-once ingestion semantics.

Submitting a Supervisor Spec

A supervisor for a datasource is started by submitting a supervisor spec via the Apache Kafka option under Continuous ingestion in the + Add Datasets view of a cluster:

+ Add Datasets

Apache Kafka

A supervisor spec can also be submitted via HTTP POST to http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor, for example:

curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor

A sample supervisor spec is shown below:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "wikiticker-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [
            "isRobot",
            "channel",
            "flags",
            "isUnpatrolled",
            "page",
            "diffUrl",
            "comment",
            "isNew",
            "isMinor",
            "user",
            "namespace",
            { "name" : "commentLength", "type" : "long" },
            { "name" : "deltaBucket", "type" : "long" },
            "cityName",
            "countryIsoCode",
            "countryName",
            "isAnonymous",
            "metroCode",
            "regionIsoCode",
            "regionName"
          ]
        }
      }
    },
    "metricsSpec" : [
      { "type" : "count", "name" : "count", "type" : "count" },
      { "type" : "longSum", "name" : "added", "fieldName" : "added" },
      { "type" : "longSum", "name" : "deleted", "fieldName" : "deleted" },
      { "type" : "longSum", "name" : "delta", "fieldName" : "delta" },
      { "type" : "hyperUnique", "name" : "user_unique", "fieldName" : "user" }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE",
      "rollup": true
    }
  },
  "ioConfig": {
    "topic": "wikiticker",
    "consumerProperties": {
      "bootstrap.servers": "myKafkaBroker:9092"
    }
  }
}

More information

Please refer to Druid's Kafka indexing service documentation for more details.

Overview

Deploy

Manage Data

Query Data

Visualize

Configure

Misc