Skip to main content

Ingest Kafka records and metadata by API

When ingesting data from Apache Kafka topics into Imply Polaris, you can ingest event metadata including the key, Kafka timestamp, and any headers in addition to the event value that stores the message itself.

For example, consider the following record produced to a Kafka topic:

{
"partition_id": 0,
"headers": [
{
"name": "Header-1",
"value": "SGVhZGVyLTE="
},
{
"name": "Header-2",
"value": "SGVhZGVyLTI="
}
],
"key": {
"type": "STRING",
"data": "exampleKey"
},
"value": {
"type": "JSON",
"data": {
"city": "San Francisco"
}
},
"timestamp": "2023-05-18T01:02:34Z"
}

If you specify the source data as JSON format, Polaris only ingests the event value:

{"city": "San Francisco"}

However, if you don't set a default timestamp, Polaris treats the record as unparseable for because the event value doesn't contain a timestamp. You can use the Kafka format to fill in the timestamp for Polaris using the event timestamp as well as ingest the data in the event data.

This topic shows you how to use the Kafka source format with the Jobs API to ingest Kafka event data and its corresponding metadata. For details on configuration in the UI, see Ingest streaming metadata.

Prerequisites

Before continuing with this topic, ensure you have the following:

  • Familiarity with the basics of creating a streaming ingestion job.
  • A connection that describes your Kafka data source.
  • An API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Kafka input format

If you want an ingestion job to only ingest event values, define the data format such as nd-json in source.formatSettings.format.

If you want to ingest metadata as well as the message values, specify all of the following:

  • source.formatSettings.format: set to kafka
  • source.formatSettings.valueFormat: set to the data format of the event message

For example:

"source": {
...
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
}
}
}

For available data formats you can use for valueFormat, see the Jobs API documentation.

Map the Kafka timestamp to the primary table timestamp

To use the Kafka fields in mappings, such as to parse the Kafka timestamp, include them in source.inputSchema. Even though the Kafka timestamp in the example is in ISO 8601 format, Polaris reads the timestamp in milliseconds since epoch format. Therefore, declare the Kafka timestamp in the inputSchema as follows:

  • Set the name property to the same value as timestampColumnName (kafka.timestamp by default).
  • Set the dataType to long.

For example:

"inputSchema": [
{
"name": "kafka.timestamp",
"dataType": "long"
}
]

When mapping the Kafka timestamp to __time, apply the MILLIS_TO_TIMESTAMP operator—that is, MILLIS_TO_TIMESTAMP("kafka.timestamp"). For example:

"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
}
]

Sample request

The following example request creates an ingestion job using the Kafka format and maps the Kafka timestamp to __time:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T00:50:27.408275186Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "46c633ce-2c17-45b5-bcf9-e8d92ca6008b",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T00:50:27.408275186Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Map Kafka key and headers to new columns

In addition to the Kafka timestamp, events are associated with a key and, optionally, headers in the form of key-value pairs. You can extract this metadata to ingest into your table.

Parse event headers

To parse headers, complete all of the following:

  1. Set the value of headerFormat in source.formatSettings to the encoding used to parse header values, such as utf-8. For example:

    "formatSettings": {
    "format": "kafka",
    "valueFormat": {
    "format": "nd-json"
    },
    "headerLabelPrefix": "kafka.header.",
    "headerFormat": "utf-8"
    }

    You can optionally set the prefix applied to the Kafka headers in source.formatSettings.headerLabelPrefix. Note that the default value of headerLabelPrefix=kafka.header. ends with a period. For instance, for headers with the keys h1 and h2, Polaris prepends the header so that you use kafka.header.h1 and kafka.header.h2 in the input schema and mappings.

  2. Declare the header labels in source.inputSchema. For example:

    "inputSchema": [
    {
    "name": "kafka.header.Header-1",
    "dataType": "string"
    },
    {
    "name": "kafka.header.Header-2",
    "dataType": "string"
    }
    ]
  3. Map the header input fields to table columns in mappings. For example:

     "mappings": [
    {
    "columnName": "kafka.header.Header-1",
    "expression": "\"kafka.header.Header-1\""
    },
    {
    "columnName": "kafka.header.Header-2",
    "expression": "\"kafka.header.Header-2\""
    }
    ]

Parse event key

The key field associated with a record is typically used for partitioning in Kafka. For Polaris to parse and ingest the key, complete all of the following:

  1. Update source.formatSettings to include the name and format of the Kafka key.

    1. Supply the input field name in keyColumnName. By default, the name is kafka.key. Use the key column name in the input schema and input expressions.

    2. Next, describe the data format settings to parse the key in keyFormat. For details and examples, see Supported formats for Kafka keys.

  2. Declare the key in source.inputSchema. The default key column name is kafka.key. If you renamed it in keyColumnName, use the custom name in the input schema.

    The following example shows the Kafka key field in the input schema:

    "inputSchema": [
    {
    "name": "kafka.key",
    "dataType": "string"
    },
    ]
  3. Map the key input field to a table column in mappings. For example:

    "mappings": [
    {
    "columnName": "kafka.key",
    "expression": "\"kafka.key\""
    },
    ]

Supported formats for Kafka keys

For Kafka keys, you can use the CSV or regex format, or you can specify a schema registry connection to parse the key. For any format, if your key gets parsed into multiple columns, Polaris only reads the first column.

CSV format

If you store the key as a plain string, use csv as the key format. Leave skipHeaderRows as 0. You must include keyFormat.columns; however, Polaris disregards the column names in favor of keyColumnName.

For example:

"formatSettings": {
"format": "kafka",
...
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"kafka.key"
],
"format": "csv"
}
}

Regex format

To parse text out of a string key, use the regex format to parse the key.

For example:

"formatSettings": {
"format": "kafka",
...
"keyColumnName": "kafka.key",
"keyFormat": {
"format": "regex",
"columns": [
"kafka.key"
],
"pattern": "(.*)"
}
},

For regex examples, see Parse data using regular expressions.

Schema registry

If your key is in a format that requires decoding with a schema, use a schema registry connection for your key format. Create the Confluent Schema Registry connection before using it in an ingestion job.

The following example shows how to reference a schema registry connection to parse Kafka keys in Avro format:

"formatSettings": {
"format": "kafka",
...
"keyColumnName": "kafka.key",
"keyFormat": {
"format": "avro_stream",
"parseSchemaProvider": {
"type": "connection",
"connectionName": "example_connection"
}
}
},

If you also need to configure the value format using a schema registry connection, see Use the schema registry for the event value.

For additional information on specifying the Kafka key format, see the Jobs API documentation.

Sample request

The following example request creates an ingestion job using the Kafka format and creates new columns for the Kafka key and headers:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T16:37:36.193117466Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "e24b4162-7f86-4e35-8cc8-403b9bf6ccd6",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T16:37:36.193117466Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

See the following topics for more information: