Ingest Kafka records and metadata by API
Project-less regional API resources have been deprecated and will be removed by the end of September 2024.
You must include the project ID in the URL for all regional API calls in projects created after September 29, 2023.
For example: https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID
For backward compatibility, you can continue to use project-less regional API resources on projects created prior to September 29, 2023. We strongly recommend updating your regional API calls to include the project ID prior to September 2024. See the API migration guide for more information.
When ingesting data from Apache Kafka topics into Imply Polaris, you can ingest event metadata including the key, Kafka timestamp, and any headers in addition to the event value that stores the message itself.
For example, consider the following record produced to a Kafka topic:
{
"partition_id": 0,
"headers": [
{
"name": "Header-1",
"value": "SGVhZGVyLTE="
},
{
"name": "Header-2",
"value": "SGVhZGVyLTI="
}
],
"key": {
"type": "BINARY",
"data": "aWFtZ3Jvb3QK"
},
"value": {
"type": "JSON",
"data": {
"city": "San Francisco"
}
},
"timestamp": "2023-05-18T01:02:34Z"
}
If you specify the source data as JSON format, Polaris only ingests the event value
:
{"city": "San Francisco"}
However, if you don't set a default timestamp, Polaris treats the record as unparseable for because the event value doesn't contain a timestamp.
You can use the Kafka format to fill in the timestamp for Polaris using the event timestamp
as well as ingest the data in the event data
.
This topic shows you how to use the Kafka source format to ingest Kafka event data and its corresponding metadata.
Prerequisites
Before continuing with this topic, ensure you have the following:
- Familiarity with the basics of creating a streaming ingestion job.
- A connection that describes your Kafka data source.
- An API key with the
ManageIngestionJobs
permission. In the examples below, the key value is stored in the variable namedPOLARIS_API_KEY
. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.
You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists
to true
in the ingestion job spec, Polaris automatically determines the table attributes from the job spec.
For details, see Automatically created tables.
Map the Kafka timestamp to the primary table timestamp
Define the source format as Kafka in source.formatSettings.format
of the request payload to create an ingestion job.
You must also set souce.formatSettings.valueFormat
to the data format of the event value.
For example:
"source": {
...
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
}
}
},
To use the Kafka fields in mappings, such as to parse the Kafka timestamp, include them in source.inputSchema
.
Even though the Kafka timestamp in the example is in ISO 8601 format, Polaris reads the timestamp in milliseconds since epoch format.
Therefore, declare the Kafka timestamp in the inputSchema
as follows:
- Set the
name
property to the same value astimestampColumnName
(kafka.timestamp
by default). - Set the
dataType
tolong
.
For example:
"inputSchema": [
{
"name": "kafka.timestamp",
"dataType": "long"
},
]
When mapping the Kafka timestamp to __time
, apply the MILLIS_TO_TIMESTAMP
operator—that is, MILLIS_TO_TIMESTAMP("kafka.timestamp")
.
For example:
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
]
Sample request
The following example request creates an ingestion job using the Kafka format and maps the Kafka timestamp to __time
:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the ingestion job details.
Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T00:50:27.408275186Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "46c633ce-2c17-45b5-bcf9-e8d92ca6008b",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T00:50:27.408275186Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Map Kafka key and headers to new columns
In addition to the Kafka timestamp, events are associated with a key and, optionally, headers in the form of key-value pairs. You can extract this metadata to ingest into your table.
To parse headers, you need all of the following:
- Set the value of
headerFormat
insource.formatSettings
to the encoding used to parse header values, such asutf-8
. For example:"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
} - Declare the header labels in
source.inputSchema
. For example:"inputSchema": [
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
] - Map the header input fields to table columns in
mappings
. For example:"mappings": [
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
You can optionally set the prefix applied to the Kafka headers in source.formatSettings.headerLabelPrefix
.
Note that the default value of headerLabelPrefix=kafka.header.
ends with a period.
For instance, for headers with the keys h1
and h2
, Polaris prepends the header so that you use kafka.header.h1
and kafka.header.h2
in the input schema and mappings.
The key field associated with a record is typically used for partitioning in Kafka. For Polaris to parse and ingest the key, you need the following:
- Set the value of
keyFormat
insource.formatSettings
. ThekeyFormat
field takes an object that describes the data format settings used to parse the key. If you store the key as a string, use the CSV input format. You must also setcolumns
for Polaris to parse the key. For example:"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
} - Declare the key in
source.inputSchema
. For example:"inputSchema": [
{
"name": "kafka.key",
"dataType": "string"
},
] - Map the key input field to a table column in
mappings
. For example:"mappings": [
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
]
You can optionally set the name of the Kafka key input field in source.formatSettings.keyColumnName
.
Use the value of this field when declaring the key in the input schema.
For additional information on specifying the Kafka key format, see the Jobs API documentation.
Sample request
The following example request creates an ingestion job using the Kafka format and creates new columns for the Kafka key and headers:
- cURL
- Python
curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--user ${POLARIS_API_KEY}: \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
}'
import os
import requests
import json
url = "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs"
apikey = os.getenv("POLARIS_API_KEY")
payload = json.dumps({
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": True,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
})
headers = {
'Authorization': f'Basic {apikey}',
'Content-Type': 'application/json'
}
response = requests.request("POST", url, headers=headers, data=payload)
print(response.text)
Sample response
A successful request returns a 201 Created
response and the ingestion job details.
Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T16:37:36.193117466Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "e24b4162-7f86-4e35-8cc8-403b9bf6ccd6",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T16:37:36.193117466Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}
Learn more
See the following topics for more information: