Ingest Kafka records and metadata by API
When ingesting data from Apache Kafka topics into Imply Polaris, you can ingest event metadata including the key, Kafka timestamp, and any headers in addition to the event value that stores the message itself.
For example, consider the following record produced to a Kafka topic:
{
"partition_id": 0,
"headers": [
{
"name": "Header-1",
"value": "SGVhZGVyLTE="
},
{
"name": "Header-2",
"value": "SGVhZGVyLTI="
}
],
"key": {
"type": "STRING",
"data": "exampleKey"
},
"value": {
"type": "JSON",
"data": {
"city": "San Francisco"
}
},
"timestamp": "2023-05-18T01:02:34Z"
}
If you specify the source data as JSON format, Polaris only ingests the event value
:
{"city": "San Francisco"}
However, if you don't set a default timestamp, Polaris treats the record as unparseable for because the event value doesn't contain a timestamp.
You can use the Kafka format to fill in the timestamp for Polaris using the event timestamp
as well as ingest the data in the event data
.
This topic shows you how to use the Kafka source format to ingest Kafka event data and its corresponding metadata.
Prerequisites
Before continuing with this topic, ensure you have the following:
- Familiarity with the basics of creating a streaming ingestion job.
- A connection that describes your Kafka data source.
- An API key with the
ManageIngestionJobs
permission. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Kafka input format
If you want an ingestion job to only ingest event values, define the data format such as nd-json
in source.formatSettings.format
.
If you want to ingest metadata as well as the message values, specify all of the following:
source.formatSettings.format
: set tokafka
source.formatSettings.valueFormat
: set to the data format of the event message
For example:
"source": {
...
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
}
}
}
Map the Kafka timestamp to the primary table timestamp
To use the Kafka fields in mappings, such as to parse the Kafka timestamp, include them in source.inputSchema
.
Even though the Kafka timestamp in the example is in ISO 8601 format, Polaris reads the timestamp in milliseconds since epoch format.
Therefore, declare the Kafka timestamp in the inputSchema
as follows:
- Set the
name
property to the same value astimestampColumnName
(kafka.timestamp
by default). - Set the
dataType
tolong
.
For example:
"inputSchema": [
{
"name": "kafka.timestamp",
"dataType": "long"
}
]
When mapping the Kafka timestamp to __time
, apply the MILLIS_TO_TIMESTAMP
operator—that is, MILLIS_TO_TIMESTAMP("kafka.timestamp")
.
For example:
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
}
]
Sample request
The following example request creates an ingestion job using the Kafka format and maps the Kafka timestamp to __time
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the ingestion job details.
Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T00:50:27.408275186Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "46c633ce-2c17-45b5-bcf9-e8d92ca6008b",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T00:50:27.408275186Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Map Kafka key and headers to new columns
In addition to the Kafka timestamp, events are associated with a key and, optionally, headers in the form of key-value pairs. You can extract this metadata to ingest into your table.
Parse event headers
To parse headers, you need all of the following:
Set the value of
headerFormat
insource.formatSettings
to the encoding used to parse header values, such asutf-8
. For example:"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8"
}You can optionally set the prefix applied to the Kafka headers in
source.formatSettings.headerLabelPrefix
. Note that the default value ofheaderLabelPrefix=kafka.header.
ends with a period. For instance, for headers with the keysh1
andh2
, Polaris prepends the header so that you usekafka.header.h1
andkafka.header.h2
in the input schema and mappings.Declare the header labels in
source.inputSchema
. For example:"inputSchema": [
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
]Map the header input fields to table columns in
mappings
. For example:"mappings": [
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
Parse event key
The key field associated with a record is typically used for partitioning in Kafka. For Polaris to parse and ingest the key, you need the following:
Update
source.formatSettings
to include the name and format of the Kafka key.The
keyColumnName
field controls the input field's name,kafka.key
by default. Use the key column name in the input schema and input expressions.In the format settings,
keyFormat
takes an object that describes the data format settings used to parse the key. If you store the key as a string, usecsv
as the key format. LeaveskipHeaderRows
as 0. You must include thecolumns
array; however, Polaris disregards the column names in favor ofkeyColumnName
.For example:
"formatSettings": {
...
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}For more complicated keys, you can use
regex
format to parse the key. Regardless of the key format, if your key gets parsed into multiple columns, Polaris only reads the first column. For examples of regex patterns, see Parse data using regular expressions.Declare the key in
source.inputSchema
. The default key column name iskafka.key
. If you rename it inkeyColumnName
, use the custom name in the input schema.The following example shows the Kafka key field in the input schema:
"inputSchema": [
{
"name": "kafka.key",
"dataType": "string"
},
]Map the key input field to a table column in
mappings
. For example:"mappings": [
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
]
For additional information on specifying the Kafka key format, see the Jobs API documentation.
Sample request
The following example request creates an ingestion job using the Kafka format and creates new columns for the Kafka key and headers:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the ingestion job details.
Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T16:37:36.193117466Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "e24b4162-7f86-4e35-8cc8-403b9bf6ccd6",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T16:37:36.193117466Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Learn more
See the following topics for more information: