Skip to main content

Ingest Kafka records and metadata by API

When ingesting data from Apache Kafka topics into Imply Polaris, you can ingest event metadata including the key, Kafka timestamp, and any headers in addition to the event value that stores the message itself.

For example, consider the following record produced to a Kafka topic:

{
"partition_id": 0,
"headers": [
{
"name": "Header-1",
"value": "SGVhZGVyLTE="
},
{
"name": "Header-2",
"value": "SGVhZGVyLTI="
}
],
"key": {
"type": "STRING",
"data": "exampleKey"
},
"value": {
"type": "JSON",
"data": {
"city": "San Francisco"
}
},
"timestamp": "2023-05-18T01:02:34Z"
}

If you specify the source data as JSON format, Polaris only ingests the event value:

{"city": "San Francisco"}

However, if you don't set a default timestamp, Polaris treats the record as unparseable for because the event value doesn't contain a timestamp. You can use the Kafka format to fill in the timestamp for Polaris using the event timestamp as well as ingest the data in the event data.

This topic shows you how to use the Kafka source format to ingest Kafka event data and its corresponding metadata.

Prerequisites

Before continuing with this topic, ensure you have the following:

  • Familiarity with the basics of creating a streaming ingestion job.
  • A connection that describes your Kafka data source.
  • An API key with the ManageIngestionJobs permission. In the examples below, the key value is stored in the variable named POLARIS_API_KEY. For information about how to obtain an API key and assign permissions, see API key authentication. For more information on permissions, visit Permissions reference.

You do not have to create a table before starting an ingestion job. When you set createTableIfNotExists to true in the ingestion job spec, Polaris automatically determines the table attributes from the job spec. For details, see Automatically created tables.

Kafka input format

If you want an ingestion job to only ingest event values, define the data format such as nd-json in source.formatSettings.format.

If you want to ingest metadata as well as the message values, specify all of the following:

  • source.formatSettings.format: set to kafka
  • source.formatSettings.valueFormat: set to the data format of the event message

For example:

"source": {
...
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
}
}
}

Map the Kafka timestamp to the primary table timestamp

To use the Kafka fields in mappings, such as to parse the Kafka timestamp, include them in source.inputSchema. Even though the Kafka timestamp in the example is in ISO 8601 format, Polaris reads the timestamp in milliseconds since epoch format. Therefore, declare the Kafka timestamp in the inputSchema as follows:

  • Set the name property to the same value as timestampColumnName (kafka.timestamp by default).
  • Set the dataType to long.

For example:

"inputSchema": [
{
"name": "kafka.timestamp",
"dataType": "long"
},
]

When mapping the Kafka timestamp to __time, apply the MILLIS_TO_TIMESTAMP operator—that is, MILLIS_TO_TIMESTAMP("kafka.timestamp"). For example:

"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
]

Sample request

The following example request creates an ingestion job using the Kafka format and maps the Kafka timestamp to __time:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp"
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
}
]
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T00:50:27.408275186Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "46c633ce-2c17-45b5-bcf9-e8d92ca6008b",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T00:50:27.408275186Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": null,
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": null,
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Map Kafka key and headers to new columns

In addition to the Kafka timestamp, events are associated with a key and, optionally, headers in the form of key-value pairs. You can extract this metadata to ingest into your table.

Parse event headers

To parse headers, you need all of the following:

  • Set the value of headerFormat in source.formatSettings to the encoding used to parse header values, such as utf-8. For example:

    "formatSettings": {
    "format": "kafka",
    "valueFormat": {
    "format": "nd-json"
    },
    "headerLabelPrefix": "kafka.header.",
    "headerFormat": "utf-8"
    }

    You can optionally set the prefix applied to the Kafka headers in source.formatSettings.headerLabelPrefix. Note that the default value of headerLabelPrefix=kafka.header. ends with a period. For instance, for headers with the keys h1 and h2, Polaris prepends the header so that you use kafka.header.h1 and kafka.header.h2 in the input schema and mappings.

  • Declare the header labels in source.inputSchema. For example:

    "inputSchema": [
    {
    "name": "kafka.header.Header-1",
    "dataType": "string"
    },
    {
    "name": "kafka.header.Header-2",
    "dataType": "string"
    }
    ]
  • Map the header input fields to table columns in mappings. For example:

     "mappings": [
    {
    "columnName": "kafka.header.Header-1",
    "expression": "\"kafka.header.Header-1\""
    },
    {
    "columnName": "kafka.header.Header-2",
    "expression": "\"kafka.header.Header-2\""
    }
    ]

Parse event key

The key field associated with a record is typically used for partitioning in Kafka. For Polaris to parse and ingest the key, you need the following:

  • Update source.formatSettings to include the name and format of the Kafka key.

    The keyColumnName field controls the input field's name, kafka.key by default. Use the key column name in the input schema and input expressions.

    In the format settings, keyFormat takes an object that describes the data format settings used to parse the key. If you store the key as a string, use csv as the key format. Leave skipHeaderRows as 0. You must include the columns array; however, Polaris disregards the column names in favor of keyColumnName.

    For example:

    "formatSettings": {
    ...
    "keyColumnName": "kafka.key",
    "keyFormat": {
    "columns": [
    "key"
    ],
    "format": "csv"
    }
    }

    For more complicated keys, you can use regex format to parse the key. Regardless of the key format, if your key gets parsed into multiple columns, Polaris only reads the first column. For examples of regex patterns, see Parse data using regular expressions.

  • Declare the key in source.inputSchema. The default key column name is kafka.key. If you rename it in keyColumnName, use the custom name in the input schema.

    The following example shows the Kafka key field in the input schema:

    "inputSchema": [
    {
    "name": "kafka.key",
    "dataType": "string"
    },
    ]
  • Map the key input field to a table column in mappings. For example:

    "mappings": [
    {
    "columnName": "kafka.key",
    "expression": "\"kafka.key\""
    },
    ]

For additional information on specifying the Kafka key format, see the Jobs API documentation.

Sample request

The following example request creates an ingestion job using the Kafka format and creates new columns for the Kafka key and headers:

curl --location --request POST "https://ORGANIZATION_NAME.REGION.CLOUD_PROVIDER.api.imply.io/v1/projects/PROJECT_ID/jobs" \
--header "Authorization: Basic $POLARIS_API_KEY" \
--header "Content-Type: application/json" \
--data-raw '{
"type": "streaming",
"target": {
"type": "table",
"tableName": "example-table"
},
"createTableIfNotExists": true,
"source": {
"type": "connection",
"connectionName": "docs-demo",
"inputSchema": [
{
"name": "city",
"dataType": "string"
},
{
"name": "kafka.timestamp",
"dataType": "long"
},
{
"name": "kafka.key",
"dataType": "string"
},
{
"name": "kafka.header.Header-1",
"dataType": "string"
},
{
"name": "kafka.header.Header-2",
"dataType": "string"
}
],
"formatSettings": {
"format": "kafka",
"valueFormat": {
"format": "nd-json"
},
"timestampColumnName": "kafka.timestamp",
"headerLabelPrefix": "kafka.header.",
"headerFormat": "utf-8",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"format": "csv"
}
}
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")"
},
{
"columnName": "city",
"expression": "\"city\""
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\""
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\""
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\""
}
]
}'

Sample response

A successful request returns a 201 Created response and the ingestion job details.

Click to view the response
{
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"dimensionExclusions": [],
"useSchemaDiscovery": false,
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"readFromPoint": "latest",
"maxParseExceptions": 2147483647,
"createdBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"createdTimestamp": "2023-05-19T16:37:36.193117466Z",
"desiredExecutionStatus": "running",
"executionStatus": "pending",
"health": {
"status": "ok"
},
"id": "e24b4162-7f86-4e35-8cc8-403b9bf6ccd6",
"lastModifiedBy": {
"username": "api-key-pok_2zdsi...jfquyy",
"userId": "ff395800-a4df-4a64-bc94-xxxxxxxxxxxx"
},
"lastUpdatedTimestamp": "2023-05-19T16:37:36.193117466Z",
"spec": {
"source": {
"connectionName": "docs-demo",
"inputSchema": [
{
"dataType": "string",
"name": "city"
},
{
"dataType": "long",
"name": "kafka.timestamp"
},
{
"dataType": "string",
"name": "kafka.key"
},
{
"dataType": "string",
"name": "kafka.header.Header-1"
},
{
"dataType": "string",
"name": "kafka.header.Header-2"
}
],
"formatSettings": {
"valueFormat": {
"flattenSpec": {},
"format": "nd-json"
},
"headerFormat": "utf-8",
"headerLabelPrefix": "kafka.header.",
"keyColumnName": "kafka.key",
"keyFormat": {
"columns": [
"key"
],
"delimiter": null,
"listDelimiter": null,
"skipHeaderRows": 0,
"format": "csv"
},
"timestampColumnName": "kafka.timestamp",
"format": "kafka"
},
"type": "connection"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"dimensionExclusions": [],
"filterExpression": null,
"lateMessageRejectionPeriod": "P30D",
"mappings": [
{
"columnName": "__time",
"expression": "MILLIS_TO_TIMESTAMP(\"kafka.timestamp\")",
"isAggregation": null
},
{
"columnName": "city",
"expression": "\"city\"",
"isAggregation": null
},
{
"columnName": "kafka.key",
"expression": "\"kafka.key\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-1",
"expression": "\"kafka.header.Header-1\"",
"isAggregation": null
},
{
"columnName": "kafka.header.Header-2",
"expression": "\"kafka.header.Header-2\"",
"isAggregation": null
}
],
"maxParseExceptions": 2147483647,
"readFromPoint": "latest",
"useSchemaDiscovery": false,
"type": "streaming",
"desiredExecutionStatus": "running"
},
"target": {
"tableName": "example-table",
"type": "table",
"intervals": []
},
"createTableIfNotExists": true,
"type": "streaming",
"completedTimestamp": null,
"startedTimestamp": null
}

Learn more

See the following topics for more information: