Source input formats
Apache Druid can ingest denormalized data in JSON, CSV, or a delimited form such as TSV, or any custom format. While most examples in the documentation use data in JSON format, it is not difficult to configure Druid to ingest any other delimited data. We welcome any contributions to new formats.
This page lists all default and core extension data formats supported by Druid. For additional data formats supported with community extensions, please see our community extensions list.
Formatting data
The following samples show data formats that are natively supported in Druid:
JSON
{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "cancer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}
CSV
2013-08-31T01:02:33Z,"Gypsy Danger","en","nuclear","true","true","false","false","article","North America","United States","Bay Area","San Francisco",57,200,-143
2013-08-31T03:32:45Z,"Striker Eureka","en","speed","false","true","true","false","wikipedia","Australia","Australia","Cantebury","Syndey",459,129,330
2013-08-31T07:11:21Z,"Cherno Alpha","ru","masterYi","false","true","true","false","article","Asia","Russia","Oblast","Moscow",123,12,111
2013-08-31T11:58:39Z,"Crimson Typhoon","zh","triplets","true","false","true","false","wikipedia","Asia","China","Shanxi","Taiyuan",905,5,900
2013-08-31T12:41:27Z,"Coyote Tango","ja","cancer","true","false","true","false","wikipedia","Asia","Japan","Kanto","Tokyo",1,10,-9
TSV (Delimited)
2013-08-31T01:02:33Z "Gypsy Danger" "en" "nuclear" "true" "true" "false" "false" "article" "North America" "United States" "Bay Area" "San Francisco" 57 200 -143
2013-08-31T03:32:45Z "Striker Eureka" "en" "speed" "false" "true" "true" "false" "wikipedia" "Australia" "Australia" "Cantebury" "Syndey" 459 129 330
2013-08-31T07:11:21Z "Cherno Alpha" "ru" "masterYi" "false" "true" "true" "false" "article" "Asia" "Russia" "Oblast" "Moscow" 123 12 111
2013-08-31T11:58:39Z "Crimson Typhoon" "zh" "triplets" "true" "false" "true" "false" "wikipedia" "Asia" "China" "Shanxi" "Taiyuan" 905 5 900
2013-08-31T12:41:27Z "Coyote Tango" "ja" "cancer" "true" "false" "true" "false" "wikipedia" "Asia" "Japan" "Kanto" "Tokyo" 1 10 -9
Note that the CSV and TSV data do not contain column heads. This becomes important when you specify the data for ingesting.
Besides text formats, Druid also supports binary formats such as Orc and Parquet formats.
Custom formats
Druid supports custom text data formats and can use the Regex input format to parse them. However, be aware doing this to
parse data is less efficient than writing a native Java InputFormat extension, or using an external stream processor. We welcome contributions of new input formats.
Input format
You can use the inputFormat field to specify the data format for your input data.
All forms of Druid ingestion require some form of schema object. The format of the data to be ingested is specified using the inputFormat entry in your ioConfig.
JSON
Configure the JSON inputFormat to load JSON data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to json. | yes |
| flattenSpec | JSON Object | Specifies flattening configuration for nested JSON data. See flattenSpec for more info. | no |
| featureSpec | JSON Object | JSON parser features supported by Jackson, a JSON processor for Java. The features control parsing of the input JSON data. To enable a feature, map the feature name to a Boolean value of "true". For example: "featureSpec": {"ALLOW_SINGLE_QUOTES": true, "ALLOW_UNQUOTED_FIELD_NAMES": true} | no |
The following properties are specialized properties that only apply when the JSON inputFormat is used in streaming ingestion, and they are related to how parsing exceptions are handled. In streaming ingestion, multi-line JSON events can be ingested (i.e. where a single JSON event spans multiple lines). However, if a parsing exception occurs, all JSON events that are present in the same streaming record will be discarded.
| Field | Type | Description | Required |
|---|---|---|---|
| assumeNewlineDelimited | Boolean | If the input is known to be newline delimited JSON (each individual JSON event is contained in a single line, separated by newlines), setting this option to true allows for more flexible parsing exception handling. Only the lines with invalid JSON syntax will be discarded, while lines containing valid JSON events will still be ingested. | no (Default false) |
| useJsonNodeReader | Boolean | When ingesting multi-line JSON events, enabling this option will enable the use of a JSON parser which will retain any valid JSON events encountered within a streaming record prior to when a parsing exception occurred. | no (Default false) |
For example:
"ioConfig": {
"inputFormat": {
"type": "json"
},
...
}
CSV
Configure the CSV inputFormat to load CSV data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to csv. | yes |
| listDelimiter | String | A custom delimiter for multi-value dimensions. | no (default = ctrl+A) |
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if findColumnsFromHeader is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that skipHeaderRows will be applied before finding column names from the header. For example, if you set skipHeaderRows to 2 and findColumnsFromHeader to true, the task will skip the first two lines and then extract column information from the third line. columns will be ignored if this is set to true. | no (default = false if columns is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first skipHeaderRows rows. | no (default = 0) |
| tryParseNumbers | Boolean | If this is set, the task will attempt to parse numeric strings into long or double data type, in that order. This parsing also applies to values separated by listDelimiter. If the value cannot be parsed as a number, it is retained as a string. | no (default = false) |
For example:
"ioConfig": {
"inputFormat": {
"type": "csv",
"columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"]
},
...
}
TSV (Delimited)
Configure the TSV inputFormat to load TSV data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to tsv. | yes |
| delimiter | String | A custom delimiter for data values. | no (default = \t) |
| listDelimiter | String | A custom delimiter for multi-value dimensions. | no (default = ctrl+A) |
| columns | JSON array | Specifies the columns of the data. The columns should be in the same order with the columns of your data. | yes if findColumnsFromHeader is false or missing |
| findColumnsFromHeader | Boolean | If this is set, the task will find the column names from the header row. Note that skipHeaderRows will be applied before finding column names from the header. For example, if you set skipHeaderRows to 2 and findColumnsFromHeader to true, the task will skip the first two lines and then extract column information from the third line. columns will be ignored if this is set to true. | no (default = false if columns is set; otherwise null) |
| skipHeaderRows | Integer | If this is set, the task will skip the first skipHeaderRows rows. | no (default = 0) |
| tryParseNumbers | Boolean | If this is set, the task will attempt to parse numeric strings into long or double data type, in that order. This parsing also applies to values separated by listDelimiter. If the value cannot be parsed as a number, it is retained as a string. | no (default = false) |
Be sure to change the delimiter to the appropriate delimiter for your data. Like CSV, you must specify the columns and which subset of the columns you want indexed.
For example:
"ioConfig": {
"inputFormat": {
"type": "tsv",
"columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
"delimiter":"|"
},
...
}
Lines
Configure the Lines inputFormat to load line-oriented data where each line is treated as a single field:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to lines. | yes |
The Lines input format reads each line from the input as UTF-8 text, and creates a single column named line containing the entire line as a string.
This is useful for reading line-oriented data in a simple form for later processing.
For example:
"ioConfig": {
"inputFormat": {
"type": "lines"
},
...
}
ORC
To use the ORC input format, load the Druid Orc extension ( druid-orc-extensions).
Configure the ORC inputFormat to load ORC data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to orc. | yes |
| flattenSpec | JSON Object | Specifies flattening configuration for nested ORC data. Only 'path' expressions are supported ('jq' and 'tree' are unavailable). See flattenSpec for more info. | no |
| binaryAsString | Boolean | Specifies if the binary orc column which is not logically marked as a string should be treated as a UTF-8 encoded string. | no (default = false) |
For example:
"ioConfig": {
"inputFormat": {
"type": "orc",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nested",
"expr": "$.path.to.nested"
}
]
},
"binaryAsString": false
},
...
}
Parquet
To use the Parquet input format load the Druid Parquet extension (druid-parquet-extensions).
Configure the Parquet inputFormat to load Parquet data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
type | String | Set value to parquet. | yes |
flattenSpec | JSON Object | Define a flattenSpec to extract nested values from a Parquet file. Only 'path' expressions are supported ('jq' and 'tree' are unavailable). | no (default will auto-discover 'root' level properties) |
binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
For example:
"ioConfig": {
"inputFormat": {
"type": "parquet",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "nested",
"expr": "$.path.to.nested"
}
]
},
"binaryAsString": false
},
...
}
Avro Stream
To use the Avro Stream input format load the Druid Avro extension (druid-avro-extensions).
For more information on how Druid handles Avro types, see Avro Types section for
Configure the Avro inputFormat to load Avro data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to avro_stream. | yes |
| flattenSpec | JSON Object | Define a flattenSpec to extract nested values from a Avro record. Only 'path' expressions are supported ('jq' is unavailable). | no (default will auto-discover 'root' level properties) |
avroBytesDecoder | JSON Object | Specifies how to decode bytes to Avro record. | yes |
| binaryAsString | Boolean | Specifies if the bytes Avro column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
For example:
"ioConfig": {
"inputFormat": {
"type": "avro_stream",
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
},
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "someRecord_subInt",
"expr": "$.someRecord.subInt"
}
]
},
"binaryAsString": false
},
...
}
Avro Bytes Decoder
If type is not included, the avroBytesDecoder defaults to schema_repo.
Inline Schema Based Avro Bytes Decoder
The "schema_inline" decoder reads Avro records using a fixed schema and does not support schema migration. If you may need to migrate schemas in the future, consider one of the other decoders, all of which use a message header that allows the parser to identify the proper Avro schema for reading records.
This decoder can be used if all the input events can be read using the same schema. In this case, specify the schema in the input task JSON itself, as described below.
...
"avroBytesDecoder": {
"type": "schema_inline",
"schema": {
//your schema goes here, for example
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
}
}
...
Multiple Inline Schemas Based Avro Bytes Decoder
Use this decoder if different input events can have different read schemas. In this case, specify the schema in the input task JSON itself, as described below.
...
"avroBytesDecoder": {
"type": "multiple_schemas_inline",
"schemas": {
//your id -> schema map goes here, for example
"1": {
"namespace": "org.apache.druid.data",
"name": "User",
"type": "record",
"fields": [
{ "name": "FullName", "type": "string" },
{ "name": "Country", "type": "string" }
]
},
"2": {
"namespace": "org.apache.druid.otherdata",
"name": "UserIdentity",
"type": "record",
"fields": [
{ "name": "Name", "type": "string" },
{ "name": "Location", "type": "string" }
]
},
...
...
}
}
...
Note that it is essentially a map of integer schema ID to avro schema object. This parser assumes that record has following format. first 1 byte is version and must always be 1. next 4 bytes are integer schema ID serialized using big-endian byte order. remaining bytes contain serialized avro message.
SchemaRepo Based Avro Bytes Decoder
This Avro bytes decoder first extracts subject and id from the input message bytes, and then uses them to look up the Avro schema used to decode the Avro record from bytes. For details, see the schema repo. You need an HTTP service like schema repo to hold the Avro schema. For information on registering a schema on the message producer side, see org.apache.druid.data.input.avro.AvroStreamInputFormatTest#testParse().
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to schema_repo. | no |
| subjectAndIdConverter | JSON Object | Specifies how to extract the subject and id from message bytes. | yes |
| schemaRepository | JSON Object | Specifies how to look up the Avro schema from subject and id. | yes |
Avro-1124 Subject And Id Converter
This section describes the format of the subjectAndIdConverter object for the schema_repo Avro bytes decoder.
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to avro_1124. | no |
| topic | String | Specifies the topic of your Kafka stream. | yes |
Avro-1124 Schema Repository
This section describes the format of the schemaRepository object for the schema_repo Avro bytes decoder.
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to avro_1124_rest_client. | no |
| url | String | Specifies the endpoint URL of your Avro-1124 schema repository. | yes |
Confluent Schema Registry-based Avro Bytes Decoder
This Avro bytes decoder first extracts a unique id from input message bytes, and then uses it to look up the schema in the Schema Registry used to decode the Avro record from bytes.
For details, see the Schema Registry documentation and repository.
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to schema_registry. | no |
| url | String | Specifies the URL endpoint of the Schema Registry. | yes |
| capacity | Integer | Specifies the max size of the cache (default = Integer.MAX_VALUE). | no |
| urls | ARRAY<String> | Specifies the URL endpoints of the multiple Schema Registry instances. | yes (if url is not provided) |
| config | Json | To send additional configurations, configured for Schema Registry. This can be supplied via a DynamicConfigProvider | no |
| headers | Json | To send headers to the Schema Registry. This can be supplied via a DynamicConfigProvider | no |
For a single schema registry instance, use Field url or urls for multi instances.
Single Instance:
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"url" : <schema-registry-url>
}
...
Multiple Instances:
...
"avroBytesDecoder" : {
"type" : "schema_registry",
"urls" : [<schema-registry-url-1>, <schema-registry-url-2>, ...],
"config" : {
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "fred:letmein",
"schema.registry.ssl.truststore.location": "/some/secrets/kafka.client.truststore.jks",
"schema.registry.ssl.truststore.password": "<password>",
"schema.registry.ssl.keystore.location": "/some/secrets/kafka.client.keystore.jks",
"schema.registry.ssl.keystore.password": "<password>",
"schema.registry.ssl.key.password": "<password>",
"schema.registry.ssl.key.password",
...
},
"headers": {
"traceID" : "b29c5de2-0db4-490b-b421",
"timeStamp" : "1577191871865",
"druid.dynamic.config.provider":{
"type":"mapString",
"config":{
"registry.header.prop.1":"value.1",
"registry.header.prop.2":"value.2"
}
}
...
}
}
...
Parse exceptions
The following errors when reading records will be considered parse exceptions, which can be limited and logged with ingestion task configurations such as maxParseExceptions and maxSavedParseExceptions:
- Failure to retrieve a schema due to misconfiguration or corrupt records (invalid schema IDs)
- Failure to decode an Avro message
Avro OCF
To load the Avro OCF input format, load the Druid Avro extension (druid-avro-extensions).
See the Avro Types section for how Avro types are handled in Druid
Configure the Avro OCF inputFormat to load Avro OCF data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
| type | String | Set value to avro_ocf. | yes |
| flattenSpec | JSON Object | Define a flattenSpec to extract nested values from Avro records. Only 'path' expressions are supported ('jq' and 'tree' are unavailable). | no (default will auto-discover 'root' level properties) |
| schema | JSON Object | Define a reader schema to be used when parsing Avro records. This is useful when parsing multiple versions of Avro OCF file data. | no (default will decode using the writer schema contained in the OCF file) |
| binaryAsString | Boolean | Specifies if the bytes parquet column which is not logically marked as a string or enum type should be treated as a UTF-8 encoded string. | no (default = false) |
For example:
"ioConfig": {
"inputFormat": {
"type": "avro_ocf",
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "someRecord_subInt",
"expr": "$.someRecord.subInt"
}
]
},
"schema": {
"namespace": "org.apache.druid.data.input",
"name": "SomeDatum",
"type": "record",
"fields" : [
{ "name": "timestamp", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "id", "type": "long" },
{ "name": "someRecord", "type": {
"type": "record", "name": "MySubRecord", "fields": [
{ "name": "subInt", "type": "int"},
{ "name": "subLong", "type": "long"}
]
}}]
},
"binaryAsString": false
},
...
}
Protobuf
You need to include the druid-protobuf-extensions as an extension to use the Protobuf input format.
Configure the Protobuf inputFormat to load Protobuf data as follows:
| Field | Type | Description | Required |
|---|---|---|---|
type | String | Set value to protobuf. | yes |
flattenSpec | JSON Object | Define a flattenSpec to extract nested values from a Protobuf record. Note that only 'path' expression are supported ('jq' and 'tree' is unavailable). | no (default will auto-discover 'root' level properties) |
protoBytesDecoder | JSON Object | Specifies how to decode bytes to Protobuf record. | yes |
For example:
"ioConfig": {
"inputFormat": {
"type": "protobuf",
"protoBytesDecoder": {
"type": "file",
"descriptor": "file:///tmp/metrics.desc",
"protoMessageType": "Metrics"
}
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{
"type": "path",
"name": "someRecord_subInt",
"expr": "$.someRecord.subInt"
}
]
}
},
...
}
Kafka
The kafka input format lets you parse the Kafka metadata fields in addition to the Kafka payload value contents.
It should only be used when ingesting from Apache Kafka.
The kafka input format wraps around the payload parsing input format and augments the data it outputs with the Kafka event timestamp, topic name, event headers, and the key field that itself can be parsed using any available input format.
If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
This ensures that upgrading a Kafka ingestion to use the Kafka input format (by taking its existing input format and setting it as the valueFormat) can be done without losing any of the payload data.
Configure the Kafka inputFormat as follows:
| Field | Type | Description | Required | Default |
|---|---|---|---|---|
type | String | Set value to kafka. | yes | |
valueFormat | InputFormat | The input format to parse the Kafka value payload. | yes | |
timestampColumnName | String | The name of the column for the Kafka timestamp. | no | kafka.timestamp |
topicColumnName | String | The name of the column for the Kafka topic. This field is useful when ingesting data from multiple topics into same datasource. | no | kafka.topic |
headerColumnPrefix | String | The custom prefix for all the header columns. | no | kafka.header |
headerFormat | Object | Specifies how to parse the Kafka headers. Supports String types. Because Kafka header values are bytes, the parser decodes them as UTF-8 encoded strings. To change this behavior, implement your own parser based on the encoding style. Change the encoding type in KafkaStringHeaderFormat to match your custom implementation. See Header format for supported encoding formats. | no | |
keyFormat | InputFormat | The input format to parse the Kafka key. It only processes the first entry of the inputFormat field. If your key values are simple strings, you can use the tsv format to parse them. Note that for tsv,csv, and regex formats, you need to provide a columns array to make a valid input format. Only the first one is used, and its name will be ignored in favor of keyColumnName. | no | |
keyColumnName | String | The name of the column for the Kafka key. | no | kafka.key |
Header format
headerFormat supports the following encoding formats:
ISO-8859-1: ISO Latin Alphabet No. 1, that is, ISO-LATIN-1.US-ASCII: Seven-bit ASCII. Also known as ISO646-US. The Basic Latin block of the Unicode character set.UTF-8: Eight-bit UCS Transformation Format.UTF-16: Sixteen-bit UCS Transformation Format, byte order identified by an optional byte-order mark.UTF-16BE: Sixteen-bit UCS Transformation Format, big-endian byte order.UTF-16LE: Sixteen-bit UCS Transformation Format, little-endian byte order.headerColumnPrefix: Supply a prefix to the Kafka headers to avoid any conflicts with columns from the payload. The default iskafka.header..
Example
Using { "type": "json" } as the input format would only parse the payload value.
To parse the Kafka metadata in addition to the payload, use the kafka input format.
For example, consider the following structure for a Kafka message that represents an edit in a development environment:
- Kafka timestamp:
1680795276351 - Kafka topic:
wiki-edits - Kafka headers:
env=developmentzone=z1
- Kafka key:
wiki-edit - Kafka payload value:
{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}
You would configure it as follows:
"ioConfig": {
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "json"
},
"timestampColumnName": "kafka.timestamp",
"topicColumnName": "kafka.topic",
"headerFormat": {
"type": "string",
"encoding": "UTF-8"
},
"headerColumnPrefix": "kafka.header.",
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
},
"keyColumnName": "kafka.key",
}
}
You would parse the example message as follows:
{
"channel": "#sv.wikipedia",
"timestamp": "2016-06-27T00:00:11.080Z",
"page": "Salo Toraut",
"delta": 31,
"namespace": "Main",
"kafka.timestamp": 1680795276351,
"kafka.topic": "wiki-edits",
"kafka.header.env": "development",
"kafka.header.zone": "z1",
"kafka.key": "wiki-edit"
}
If you want to use kafka.timestamp as Druid's primary timestamp (__time), specify it as the value for column in the timestampSpec:
"timestampSpec": {
"column": "kafka.timestamp",
"format": "millis"
}
Similarly, if you want to use a timestamp extracted from the Kafka header:
"timestampSpec": {
"column": "kafka.header.myTimestampHeader",
"format": "millis"
}
Finally, add these Kafka metadata columns to the dimensionsSpec or set your dimensionsSpec to auto-detect columns.
The following supervisor spec demonstrates how to ingest the Kafka header, key, timestamp, and topic into Druid dimensions:
Click to view the example
{
"type": "kafka",
"spec": {
"ioConfig": {
"type": "kafka",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "wiki-edits",
"inputFormat": {
"type": "kafka",
"valueFormat": {
"type": "json"
},
"headerFormat": {
"type": "string"
},
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
}
},
"useEarliestOffset": true
},
"dataSchema": {
"dataSource": "wikiticker",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
"dimensionsSpec": "dimensionsSpec": {
"useSchemaDiscovery": true,
"includeAllDimensions": true
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
},
"tuningConfig": {
"type": "kafka"
}
}
}
After Druid ingests the data, you can query the Kafka metadata columns as follows:
SELECT
"kafka.header.env",
"kafka.key",
"kafka.timestamp",
"kafka.topic"
FROM "wikiticker"
This query returns:
kafka.header.env | kafka.key | kafka.timestamp | kafka.topic |
|---|---|---|---|
development | wiki-edit | 1680795276351 | wiki-edits |
Kinesis
The kinesis input format lets you parse the Kinesis metadata fields in addition to the Kinesis payload value contents.
It should only be used when ingesting from Kinesis.
The kinesis input format wraps around the payload parsing input format and augments the data it outputs with the Kinesis event timestamp and partition key, the ApproximateArrivalTimestamp and PartitionKey fields in the Kinesis record.
If there are conflicts between column names in the payload and those created from the metadata, the payload takes precedence.
This ensures that upgrading a Kinesis ingestion to use the Kinesis input format (by taking its existing input format and setting it as the valueFormat) can be done without losing any of the payload data.
Configure the Kinesis inputFormat as follows:
| Field | Type | Description | Required | Default |
|---|---|---|---|---|
type | String | Set value to kinesis. | yes | |
valueFormat | InputFormat | The input format to parse the Kinesis value payload. | yes | |
partitionKeyColumnName | String | The name of the column for the Kinesis partition key. This field is useful when ingesting data from multiple partitions into the same datasource. | no | kinesis.partitionKey |
timestampColumnName | String | The name of the column for the Kinesis timestamp. | no | kinesis.timestamp |
Example
Using { "type": "json" } as the input format would only parse the payload value.
To parse the Kinesis metadata in addition to the payload, use the kinesis input format.
For example, consider the following structure for a Kinesis record that represents an edit in a development environment:
- Kinesis timestamp:
1680795276351 - Kinesis partition key:
partition-1 - Kinesis payload value:
{"channel":"#sv.wikipedia","timestamp":"2016-06-27T00:00:11.080Z","page":"Salo Toraut","delta":31,"namespace":"Main"}
You would configure it as follows:
{
"ioConfig": {
"inputFormat": {
"type": "kinesis",
"valueFormat": {
"type": "json"
},
"timestampColumnName": "kinesis.timestamp",
"partitionKeyColumnName": "kinesis.partitionKey"
}
}
}
You would parse the example record as follows:
{
"channel": "#sv.wikipedia",
"timestamp": "2016-06-27T00:00:11.080Z",
"page": "Salo Toraut",
"delta": 31,
"namespace": "Main",
"kinesis.timestamp": 1680795276351,
"kinesis.partitionKey": "partition-1"
}
If you want to use kinesis.timestamp as Druid's primary timestamp (__time), specify it as the value for column in the timestampSpec:
"timestampSpec": {
"column": "kinesis.timestamp",
"format": "millis"
}
Finally, add these Kinesis metadata columns to the dimensionsSpec or set your dimensionsSpec to automatically detect columns.
The following supervisor spec demonstrates how to ingest the Kinesis timestamp, and partition key into Druid dimensions:
Click to view the example
{
"type": "kinesis",
"spec": {
"ioConfig": {
"type": "kinesis",
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"topic": "wiki-edits",
"inputFormat": {
"type": "kinesis",
"valueFormat": {
"type": "json"
},
"headerFormat": {
"type": "string"
},
"keyFormat": {
"type": "tsv",
"findColumnsFromHeader": false,
"columns": ["x"]
}
},
"useEarliestOffset": true
},
"dataSchema": {
"dataSource": "wikiticker",
"timestampSpec": {
"column": "timestamp",
"format": "posix"
},
"dimensionsSpec": {
"useSchemaDiscovery": true,
"includeAllDimensions": true
},
"granularitySpec": {
"queryGranularity": "none",
"rollup": false,
"segmentGranularity": "day"
}
},
"tuningConfig": {
"type": "kinesis"
}
}
}
After Druid ingests the data, you can query the Kinesis metadata columns as follows:
SELECT
"kinesis.timestamp",
"kinesis.partitionKey"
FROM "wikiticker"
This query returns:
kinesis.timestamp | kinesis.topic |
|---|---|
1680795276351 | partition-1 |
FlattenSpec
You can use the flattenSpec object to flatten nested data, as an alternative to the Druid nested columns feature, and for nested input formats unsupported by the feature. It is an object within the inputFormat object.
See Nested columns for information on ingesting and storing nested data in an Apache Druid column as a COMPLEX<json> data type.
Configure your flattenSpec as follows:
| Field | Description | Default |
|---|---|---|
| useFieldDiscovery | If true, interpret all root-level fields as available fields for usage by timestampSpec, transformSpec, dimensionsSpec, and metricsSpec.If false, only explicitly specified fields (see fields) will be available for use. | true |
| fields | Specifies the fields of interest and how they are accessed. See Field flattening specifications for more detail. | [] |
For example:
"flattenSpec": {
"useFieldDiscovery": true,
"fields": [
{ "name": "baz", "type": "root" },
{ "name": "foo_bar", "type": "path", "expr": "$.foo.bar" },
{ "name": "foo_other_bar", "type": "tree", "nodes": ["foo", "other", "bar"] },
{ "name": "first_food", "type": "jq", "expr": ".thing.food[1]" }
]
}
After Druid reads the input data records, it applies the flattenSpec before applying any other specs such as timestampSpec, transformSpec, dimensionsSpec, or metricsSpec. This makes it possible to extract timestamps from flattened data, for example, and to refer to flattened data in transformations, in your dimension list, and when generating metrics.
Flattening is only supported for data formats that support nesting, including avro, json, orc, and parquet.
Field flattening specifications
Each entry in the fields list can have the following components:
| Field | Description | Default |
|---|---|---|
| type | Options are as follows:
| none (required) |
| name | Name of the field after flattening. This name can be referred to by the timestampSpec, transformSpec, dimensionsSpec, and metricsSpec. | none (required) |
| expr | Expression for accessing the field while flattening. For type path, this should be JsonPath. For type jq, this should be jackson-jq notation. For other types, this parameter is ignored. | none (required for types path and jq) |
| nodes | For tree only. Multiple-expression field for accessing the field while flattening, representing the hierarchy of field names to read. For other types, this parameter must not be provided. | none (required for type tree) |
Notes on flattening
-
For convenience, when defining a root-level field, it is possible to define only the field name, as a string, instead of a JSON object. For example,
{"name": "baz", "type": "root"}is equivalent to"baz". -
Enabling
useFieldDiscoverywill only automatically detect "simple" fields at the root level that correspond to data types that Druid supports. This includes strings, numbers, and lists of strings or numbers. Other types will not be automatically detected, and must be specified explicitly in thefieldslist. -
Duplicate field
names are not allowed. An exception will be thrown. -
If
useFieldDiscoveryis enabled, any discovered field with the same name as one already defined in thefieldslist will be skipped, rather than added twice. -
JSONPath evaluator is useful for testing
path-type expressions. -
jackson-jq supports a subset of the full jq syntax. Please refer to the jackson-jq documentation for details.
-
JsonPath supports a bunch of functions, but not all of these functions are supported by Druid now. Following matrix shows the current supported JsonPath functions and corresponding data formats. Please also note the output data type of these functions.
Function Description Output type json orc avro parquet min() Provides the min value of an array of numbers Double ✓ ✓ ✓ ✓ max() Provides the max value of an array of numbers Double ✓ ✓ ✓ ✓ avg() Provides the average value of an array of numbers Double ✓ ✓ ✓ ✓ stddev() Provides the standard deviation value of an array of numbers Double ✓ ✓ ✓ ✓ length() Provides the length of an array Integer ✓ ✓ ✓ ✓ sum() Provides the sum value of an array of numbers Double ✓ ✓ ✓ ✓ concat(X) Provides a concatenated version of the path output with a new item like input ✓ ✗ ✗ ✗ append(X) add an item to the json path output array like input ✓ ✗ ✗ ✗ keys() Provides the property keys (An alternative for terminal tilde ~) Set<E> ✗ ✗ ✗ ✗