Loading data (Kafka)

The Druid Kafka indexing service provides support for ingesting data from Kafka. This service offers exactly-once ingestion guarantees as well as the ability to ingest historical data. Additionally, this runs as part of the core Druid services and does not require any additional processes.

The Kafka indexing service, uses supervisors which run on the overlord and manage the creation and lifetime of Kafka indexing tasks. This indexing service can handle non-recent events and provides exactly-once ingestion semantics.

The Kafka indexing service is provided as the druid-kafka-indexing-service core extension (see Including Extensions). Please note that this is currently designated as an experimental feature and is subject to the usual experimental caveats.

Submitting a Supervisor Spec

A supervisor for a datasource is started by submitting a supervisor spec via the Apache Kafka option under Continuous ingestion in the + Add Datasets view of a cluster:

+ Add Datasets

Apache Kafka

A supervisor spec can also be submitted via HTTP POST to http://<OVERLORD_IP>:<OVERLORD_PORT>/druid/indexer/v1/supervisor, for example:

curl -X POST -H 'Content-Type: application/json' -d @supervisor-spec.json http://localhost:8090/druid/indexer/v1/supervisor

A sample supervisor spec is shown below:

{
  "type": "kafka",
  "dataSchema": {
    "dataSource": "wikiticker-kafka",
    "parser": {
      "type": "string",
      "parseSpec": {
        "format": "json",
        "timestampSpec": {
          "column": "timestamp",
          "format": "auto"
        },
        "dimensionsSpec": {
          "dimensions": [
            "isRobot",
            "channel",
            "flags",
            "isUnpatrolled",
            "page",
            "diffUrl",
            "comment",
            "isNew",
            "isMinor",
            "user",
            "namespace",
            { "name" : "commentLength", "type" : "long" },
            { "name" : "deltaBucket", "type" : "long" },
            "cityName",
            "countryIsoCode",
            "countryName",
            "isAnonymous",
            "metroCode",
            "regionIsoCode",
            "regionName"
          ]
        }
      }
    },
    "metricsSpec" : [
      { "type" : "count", "name" : "count", "type" : "count" },
      { "type" : "longSum", "name" : "added", "fieldName" : "added" },
      { "type" : "longSum", "name" : "deleted", "fieldName" : "deleted" },
      { "type" : "longSum", "name" : "delta", "fieldName" : "delta" },
      { "type" : "hyperUnique", "name" : "user_unique", "fieldName" : "user" }
    ],
    "granularitySpec": {
      "type": "uniform",
      "segmentGranularity": "HOUR",
      "queryGranularity": "NONE",
      "rollup": true
    }
  },
  "ioConfig": {
    "topic": "wikiticker",
    "consumerProperties": {
      "bootstrap.servers": "myKafkaBroker:9092"
    }
  }
}

Please refer to the Kafka indexing service documentation for more details.

Overview

Deploy

Manage Data

Query Data

Visualize

Configure

Misc